1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the job queue handling.
32
33 """
34
35 import logging
36 import errno
37 import time
38 import weakref
39 import threading
40 import itertools
41 import operator
42 import os
43
44 try:
45
46 from pyinotify import pyinotify
47 except ImportError:
48 import pyinotify
49
50 from ganeti import asyncnotifier
51 from ganeti import constants
52 from ganeti import serializer
53 from ganeti import locking
54 from ganeti import luxi
55 from ganeti import opcodes
56 from ganeti import opcodes_base
57 from ganeti import errors
58 from ganeti import mcpu
59 from ganeti import utils
60 from ganeti import jstore
61 import ganeti.rpc.node as rpc
62 from ganeti import runtime
63 from ganeti import netutils
64 from ganeti import compat
65 from ganeti import ht
66 from ganeti import query
67 from ganeti import qlang
68 from ganeti import pathutils
69 from ganeti import vcluster
70 from ganeti.cmdlib import cluster
71
72
73
74 _GetIdAttr = operator.attrgetter("id")
78 """Special exception to cancel a job.
79
80 """
81
84 """Returns the current timestamp.
85
86 @rtype: tuple
87 @return: the current time in the (seconds, microseconds) format
88
89 """
90 return utils.SplitTime(time.time())
91
99
102 """Encapsulates an opcode object.
103
104 @ivar log: holds the execution log and consists of tuples
105 of the form C{(log_serial, timestamp, level, message)}
106 @ivar input: the OpCode we encapsulate
107 @ivar status: the current status
108 @ivar result: the result of the LU execution
109 @ivar start_timestamp: timestamp for the start of the execution
110 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
111 @ivar stop_timestamp: timestamp for the end of the execution
112
113 """
114 __slots__ = ["input", "status", "result", "log", "priority",
115 "start_timestamp", "exec_timestamp", "end_timestamp",
116 "__weakref__"]
117
119 """Initializes instances of this class.
120
121 @type op: L{opcodes.OpCode}
122 @param op: the opcode we encapsulate
123
124 """
125 self.input = op
126 self.status = constants.OP_STATUS_QUEUED
127 self.result = None
128 self.log = []
129 self.start_timestamp = None
130 self.exec_timestamp = None
131 self.end_timestamp = None
132
133
134 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
135
136 @classmethod
138 """Restore the _QueuedOpCode from the serialized form.
139
140 @type state: dict
141 @param state: the serialized state
142 @rtype: _QueuedOpCode
143 @return: a new _QueuedOpCode instance
144
145 """
146 obj = _QueuedOpCode.__new__(cls)
147 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
148 obj.status = state["status"]
149 obj.result = state["result"]
150 obj.log = state["log"]
151 obj.start_timestamp = state.get("start_timestamp", None)
152 obj.exec_timestamp = state.get("exec_timestamp", None)
153 obj.end_timestamp = state.get("end_timestamp", None)
154 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
155 return obj
156
158 """Serializes this _QueuedOpCode.
159
160 @rtype: dict
161 @return: the dictionary holding the serialized state
162
163 """
164 return {
165 "input": self.input.__getstate__(),
166 "status": self.status,
167 "result": self.result,
168 "log": self.log,
169 "start_timestamp": self.start_timestamp,
170 "exec_timestamp": self.exec_timestamp,
171 "end_timestamp": self.end_timestamp,
172 "priority": self.priority,
173 }
174
177 """In-memory job representation.
178
179 This is what we use to track the user-submitted jobs. Locking must
180 be taken care of by users of this class.
181
182 @type queue: L{JobQueue}
183 @ivar queue: the parent queue
184 @ivar id: the job ID
185 @type ops: list
186 @ivar ops: the list of _QueuedOpCode that constitute the job
187 @type log_serial: int
188 @ivar log_serial: holds the index for the next log entry
189 @ivar received_timestamp: the timestamp for when the job was received
190 @ivar start_timestmap: the timestamp for start of execution
191 @ivar end_timestamp: the timestamp for end of execution
192 @ivar writable: Whether the job is allowed to be modified
193
194 """
195
196 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
197 "received_timestamp", "start_timestamp", "end_timestamp",
198 "writable", "archived",
199 "livelock", "process_id",
200 "__weakref__"]
201
203 """Extend the reason trail
204
205 Add the reason for all the opcodes of this job to be executed.
206
207 """
208 count = 0
209 for queued_op in self.ops:
210 op = queued_op.input
211 if pickup:
212 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
213 else:
214 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
215 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
216 reason_src_prefix)
217 reason_text = "job=%d;index=%d" % (self.id, count)
218 reason = getattr(op, "reason", [])
219 reason.append((reason_src, reason_text, utils.EpochNano()))
220 op.reason = reason
221 count = count + 1
222
223 - def __init__(self, queue, job_id, ops, writable):
224 """Constructor for the _QueuedJob.
225
226 @type queue: L{JobQueue}
227 @param queue: our parent queue
228 @type job_id: job_id
229 @param job_id: our job id
230 @type ops: list
231 @param ops: the list of opcodes we hold, which will be encapsulated
232 in _QueuedOpCodes
233 @type writable: bool
234 @param writable: Whether job can be modified
235
236 """
237 if not ops:
238 raise errors.GenericError("A job needs at least one opcode")
239
240 self.queue = queue
241 self.id = int(job_id)
242 self.ops = [_QueuedOpCode(op) for op in ops]
243 self.AddReasons()
244 self.log_serial = 0
245 self.received_timestamp = TimeStampNow()
246 self.start_timestamp = None
247 self.end_timestamp = None
248 self.archived = False
249 self.livelock = None
250 self.process_id = None
251
252 self.writable = None
253
254 self._InitInMemory(self, writable)
255
256 assert not self.archived, "New jobs can not be marked as archived"
257
258 @staticmethod
260 """Initializes in-memory variables.
261
262 """
263 obj.writable = writable
264 obj.ops_iter = None
265 obj.cur_opctx = None
266
268 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
269 "id=%s" % self.id,
270 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
271
272 return "<%s at %#x>" % (" ".join(status), id(self))
273
274 @classmethod
275 - def Restore(cls, queue, state, writable, archived):
276 """Restore a _QueuedJob from serialized state:
277
278 @type queue: L{JobQueue}
279 @param queue: to which queue the restored job belongs
280 @type state: dict
281 @param state: the serialized state
282 @type writable: bool
283 @param writable: Whether job can be modified
284 @type archived: bool
285 @param archived: Whether job was already archived
286 @rtype: _JobQueue
287 @return: the restored _JobQueue instance
288
289 """
290 obj = _QueuedJob.__new__(cls)
291 obj.queue = queue
292 obj.id = int(state["id"])
293 obj.received_timestamp = state.get("received_timestamp", None)
294 obj.start_timestamp = state.get("start_timestamp", None)
295 obj.end_timestamp = state.get("end_timestamp", None)
296 obj.archived = archived
297 obj.livelock = state.get("livelock", None)
298 obj.process_id = state.get("process_id", None)
299 if obj.process_id is not None:
300 obj.process_id = int(obj.process_id)
301
302 obj.ops = []
303 obj.log_serial = 0
304 for op_state in state["ops"]:
305 op = _QueuedOpCode.Restore(op_state)
306 for log_entry in op.log:
307 obj.log_serial = max(obj.log_serial, log_entry[0])
308 obj.ops.append(op)
309
310 cls._InitInMemory(obj, writable)
311
312 return obj
313
315 """Serialize the _JobQueue instance.
316
317 @rtype: dict
318 @return: the serialized state
319
320 """
321 return {
322 "id": self.id,
323 "ops": [op.Serialize() for op in self.ops],
324 "start_timestamp": self.start_timestamp,
325 "end_timestamp": self.end_timestamp,
326 "received_timestamp": self.received_timestamp,
327 "livelock": self.livelock,
328 "process_id": self.process_id,
329 }
330
383
385 """Gets the current priority for this job.
386
387 Only unfinished opcodes are considered. When all are done, the default
388 priority is used.
389
390 @rtype: int
391
392 """
393 priorities = [op.priority for op in self.ops
394 if op.status not in constants.OPS_FINALIZED]
395
396 if not priorities:
397
398 return constants.OP_PRIO_DEFAULT
399
400 return min(priorities)
401
403 """Selectively returns the log entries.
404
405 @type newer_than: None or int
406 @param newer_than: if this is None, return all log entries,
407 otherwise return only the log entries with serial higher
408 than this value
409 @rtype: list
410 @return: the list of the log entries selected
411
412 """
413 if newer_than is None:
414 serial = -1
415 else:
416 serial = newer_than
417
418 entries = []
419 for op in self.ops:
420 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
421
422 return entries
423
425 """Mark unfinished opcodes with a given status and result.
426
427 This is an utility function for marking all running or waiting to
428 be run opcodes with a given status. Opcodes which are already
429 finalised are not changed.
430
431 @param status: a given opcode status
432 @param result: the opcode result
433
434 """
435 not_marked = True
436 for op in self.ops:
437 if op.status in constants.OPS_FINALIZED:
438 assert not_marked, "Finalized opcodes found after non-finalized ones"
439 continue
440 op.status = status
441 op.result = result
442 not_marked = False
443
445 """Marks the job as finalized.
446
447 """
448 self.end_timestamp = TimeStampNow()
449
474
476 """Changes the job priority.
477
478 @type priority: int
479 @param priority: New priority
480 @rtype: tuple; (bool, string)
481 @return: Boolean describing whether job's priority was successfully changed
482 and a text message
483
484 """
485 status = self.CalcStatus()
486
487 if status in constants.JOBS_FINALIZED:
488 return (False, "Job %s is finished" % self.id)
489 elif status == constants.JOB_STATUS_CANCELING:
490 return (False, "Job %s is cancelling" % self.id)
491 else:
492 assert status in (constants.JOB_STATUS_QUEUED,
493 constants.JOB_STATUS_WAITING,
494 constants.JOB_STATUS_RUNNING)
495
496 changed = False
497 for op in self.ops:
498 if (op.status == constants.OP_STATUS_RUNNING or
499 op.status in constants.OPS_FINALIZED):
500 assert not changed, \
501 ("Found opcode for which priority should not be changed after"
502 " priority has been changed for previous opcodes")
503 continue
504
505 assert op.status in (constants.OP_STATUS_QUEUED,
506 constants.OP_STATUS_WAITING)
507
508 changed = True
509
510
511 op.priority = priority
512
513 if changed:
514 return (True, ("Priorities of pending opcodes for job %s have been"
515 " changed to %s" % (self.id, priority)))
516 else:
517 return (False, "Job %s had no pending opcodes" % self.id)
518
520 """Sets the job's process ID
521
522 @type pid: int
523 @param pid: the process ID
524
525 """
526 status = self.CalcStatus()
527
528 if status in (constants.JOB_STATUS_QUEUED,
529 constants.JOB_STATUS_WAITING):
530 if self.process_id is not None:
531 logging.warning("Replacing the process id %s of job %s with %s",
532 self.process_id, self.id, pid)
533 self.process_id = pid
534 else:
535 logging.warning("Can set pid only for queued/waiting jobs")
536
539
541 """Initializes this class.
542
543 @type queue: L{JobQueue}
544 @param queue: Job queue
545 @type job: L{_QueuedJob}
546 @param job: Job object
547 @type op: L{_QueuedOpCode}
548 @param op: OpCode
549
550 """
551 super(_OpExecCallbacks, self).__init__()
552
553 assert queue, "Queue is missing"
554 assert job, "Job is missing"
555 assert op, "Opcode is missing"
556
557 self._queue = queue
558 self._job = job
559 self._op = op
560
562 """Raises an exception to cancel the job if asked to.
563
564 """
565
566 if self._op.status == constants.OP_STATUS_CANCELING:
567 logging.debug("Canceling opcode")
568 raise CancelJob()
569
571 """Mark the opcode as running, not lock-waiting.
572
573 This is called from the mcpu code as a notifier function, when the LU is
574 finally about to start the Exec() method. Of course, to have end-user
575 visible results, the opcode must be initially (before calling into
576 Processor.ExecOpCode) set to OP_STATUS_WAITING.
577
578 """
579 assert self._op in self._job.ops
580 assert self._op.status in (constants.OP_STATUS_WAITING,
581 constants.OP_STATUS_CANCELING)
582
583
584 self._CheckCancel()
585
586 logging.debug("Opcode is now running")
587
588 self._op.status = constants.OP_STATUS_RUNNING
589 self._op.exec_timestamp = TimeStampNow()
590
591
592 self._queue.UpdateJobUnlocked(self._job)
593
595 """Mark opcode again as lock-waiting.
596
597 This is called from the mcpu code just after calling PrepareRetry.
598 The opcode will now again acquire locks (more, hopefully).
599
600 """
601 self._op.status = constants.OP_STATUS_WAITING
602 logging.debug("Opcode will be retried. Back to waiting.")
603
605 """Internal feedback append function, with locks
606
607 """
608 self._job.log_serial += 1
609 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
610 self._queue.UpdateJobUnlocked(self._job, replicate=False)
611
628
640
642 """Submits jobs for processing.
643
644 See L{JobQueue.SubmitManyJobs}.
645
646 """
647
648 return self._queue.SubmitManyJobs(jobs)
649
661
665 """Initializes this class.
666
667 """
668 self._fn = fn
669 self._next = None
670
672 """Gets the next timeout if necessary.
673
674 """
675 if self._next is None:
676 self._next = self._fn()
677
679 """Returns the next timeout.
680
681 """
682 self._Advance()
683 return self._next
684
686 """Returns the current timeout and advances the internal state.
687
688 """
689 self._Advance()
690 result = self._next
691 self._next = None
692 return result
693
694
695 -class _OpExecContext:
696 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
697 """Initializes this class.
698
699 """
700 self.op = op
701 self.index = index
702 self.log_prefix = log_prefix
703 self.summary = op.input.Summary()
704
705
706 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
707 self.jobdeps = op.input.depends[:]
708 else:
709 self.jobdeps = None
710
711 self._timeout_strategy_factory = timeout_strategy_factory
712 self._ResetTimeoutStrategy()
713
715 """Creates a new timeout strategy.
716
717 """
718 self._timeout_strategy = \
719 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
720
722 """Checks whether priority can and should be increased.
723
724 Called when locks couldn't be acquired.
725
726 """
727 op = self.op
728
729
730
731 if (self._timeout_strategy.Peek() is None and
732 op.priority > constants.OP_PRIO_HIGHEST):
733 logging.debug("Increasing priority")
734 op.priority -= 1
735 self._ResetTimeoutStrategy()
736 return True
737
738 return False
739
741 """Returns the next lock acquire timeout.
742
743 """
744 return self._timeout_strategy.Next()
745
748 (DEFER,
749 WAITDEP,
750 FINISHED) = range(1, 4)
751
754 """Initializes this class.
755
756 """
757 self.queue = queue
758 self.opexec_fn = opexec_fn
759 self.job = job
760 self._timeout_strategy_factory = _timeout_strategy_factory
761
762 @staticmethod
764 """Locates the next opcode to run.
765
766 @type job: L{_QueuedJob}
767 @param job: Job object
768 @param timeout_strategy_factory: Callable to create new timeout strategy
769
770 """
771
772
773
774
775 if job.ops_iter is None:
776 job.ops_iter = enumerate(job.ops)
777
778
779 while True:
780 try:
781 (idx, op) = job.ops_iter.next()
782 except StopIteration:
783 raise errors.ProgrammerError("Called for a finished job")
784
785 if op.status == constants.OP_STATUS_RUNNING:
786
787 raise errors.ProgrammerError("Called for job marked as running")
788
789 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
790 timeout_strategy_factory)
791
792 if op.status not in constants.OPS_FINALIZED:
793 return opctx
794
795
796
797
798
799 logging.info("%s: opcode %s already processed, skipping",
800 opctx.log_prefix, opctx.summary)
801
802 @staticmethod
837
838 @staticmethod
840 """Checks if an opcode has dependencies and if so, processes them.
841
842 @type queue: L{JobQueue}
843 @param queue: Queue object
844 @type job: L{_QueuedJob}
845 @param job: Job object
846 @type opctx: L{_OpExecContext}
847 @param opctx: Opcode execution context
848 @rtype: bool
849 @return: Whether opcode will be re-scheduled by dependency tracker
850
851 """
852 op = opctx.op
853
854 result = False
855
856 while opctx.jobdeps:
857 (dep_job_id, dep_status) = opctx.jobdeps[0]
858
859 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
860 dep_status)
861 assert ht.TNonEmptyString(depmsg), "No dependency message"
862
863 logging.info("%s: %s", opctx.log_prefix, depmsg)
864
865 if depresult == _JobDependencyManager.CONTINUE:
866
867 opctx.jobdeps.pop(0)
868
869 elif depresult == _JobDependencyManager.WAIT:
870
871
872 result = True
873 break
874
875 elif depresult == _JobDependencyManager.CANCEL:
876
877 job.Cancel()
878 assert op.status == constants.OP_STATUS_CANCELING
879 break
880
881 elif depresult in (_JobDependencyManager.WRONGSTATUS,
882 _JobDependencyManager.ERROR):
883
884 op.status = constants.OP_STATUS_ERROR
885 op.result = _EncodeOpError(errors.OpExecError(depmsg))
886 break
887
888 else:
889 raise errors.ProgrammerError("Unknown dependency result '%s'" %
890 depresult)
891
892 return result
893
895 """Processes one opcode and returns the result.
896
897 """
898 op = opctx.op
899
900 assert op.status in (constants.OP_STATUS_WAITING,
901 constants.OP_STATUS_CANCELING)
902
903
904 if op.status == constants.OP_STATUS_CANCELING:
905 return (constants.OP_STATUS_CANCELING, None)
906
907 timeout = opctx.GetNextLockTimeout()
908
909 try:
910
911 result = self.opexec_fn(op.input,
912 _OpExecCallbacks(self.queue, self.job, op),
913 timeout=timeout)
914 except mcpu.LockAcquireTimeout:
915 assert timeout is not None, "Received timeout for blocking acquire"
916 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
917
918 assert op.status in (constants.OP_STATUS_WAITING,
919 constants.OP_STATUS_CANCELING)
920
921
922 if op.status == constants.OP_STATUS_CANCELING:
923 return (constants.OP_STATUS_CANCELING, None)
924
925
926 return (constants.OP_STATUS_WAITING, None)
927 except CancelJob:
928 logging.exception("%s: Canceling job", opctx.log_prefix)
929 assert op.status == constants.OP_STATUS_CANCELING
930 return (constants.OP_STATUS_CANCELING, None)
931
932 except Exception, err:
933 logging.exception("%s: Caught exception in %s",
934 opctx.log_prefix, opctx.summary)
935 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
936 else:
937 logging.debug("%s: %s successful",
938 opctx.log_prefix, opctx.summary)
939 return (constants.OP_STATUS_SUCCESS, result)
940
942 """Continues execution of a job.
943
944 @param _nextop_fn: Callback function for tests
945 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
946 be deferred and C{WAITDEP} if the dependency manager
947 (L{_JobDependencyManager}) will re-schedule the job when appropriate
948
949 """
950 queue = self.queue
951 job = self.job
952
953 logging.debug("Processing job %s", job.id)
954
955 try:
956 opcount = len(job.ops)
957
958 assert job.writable, "Expected writable job"
959
960
961 if job.CalcStatus() in constants.JOBS_FINALIZED:
962 return self.FINISHED
963
964
965 if job.cur_opctx:
966 opctx = job.cur_opctx
967 job.cur_opctx = None
968 else:
969 if __debug__ and _nextop_fn:
970 _nextop_fn()
971 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
972
973 op = opctx.op
974
975
976 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
977 constants.OP_STATUS_CANCELING)
978 for i in job.ops[opctx.index + 1:])
979
980 assert op.status in (constants.OP_STATUS_QUEUED,
981 constants.OP_STATUS_WAITING,
982 constants.OP_STATUS_CANCELING)
983
984 assert (op.priority <= constants.OP_PRIO_LOWEST and
985 op.priority >= constants.OP_PRIO_HIGHEST)
986
987 waitjob = None
988
989 if op.status != constants.OP_STATUS_CANCELING:
990 assert op.status in (constants.OP_STATUS_QUEUED,
991 constants.OP_STATUS_WAITING)
992
993
994 if self._MarkWaitlock(job, op):
995
996 queue.UpdateJobUnlocked(job)
997
998 assert op.status == constants.OP_STATUS_WAITING
999 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1000 assert job.start_timestamp and op.start_timestamp
1001 assert waitjob is None
1002
1003
1004 waitjob = self._CheckDependencies(queue, job, opctx)
1005
1006 assert op.status in (constants.OP_STATUS_WAITING,
1007 constants.OP_STATUS_CANCELING,
1008 constants.OP_STATUS_ERROR)
1009
1010 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1011 constants.OP_STATUS_ERROR)):
1012 logging.info("%s: opcode %s waiting for locks",
1013 opctx.log_prefix, opctx.summary)
1014
1015 assert not opctx.jobdeps, "Not all dependencies were removed"
1016
1017 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1018
1019 op.status = op_status
1020 op.result = op_result
1021
1022 assert not waitjob
1023
1024 if op.status in (constants.OP_STATUS_WAITING,
1025 constants.OP_STATUS_QUEUED):
1026
1027
1028 assert not op.end_timestamp
1029 else:
1030
1031 op.end_timestamp = TimeStampNow()
1032
1033 if op.status == constants.OP_STATUS_CANCELING:
1034 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1035 for i in job.ops[opctx.index:])
1036 else:
1037 assert op.status in constants.OPS_FINALIZED
1038
1039 if op.status == constants.OP_STATUS_QUEUED:
1040
1041 assert not waitjob
1042
1043 finalize = False
1044
1045
1046 job.cur_opctx = None
1047
1048
1049 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1050
1051 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1052 finalize = False
1053
1054 if not waitjob and opctx.CheckPriorityIncrease():
1055
1056 queue.UpdateJobUnlocked(job)
1057
1058
1059 job.cur_opctx = opctx
1060
1061 assert (op.priority <= constants.OP_PRIO_LOWEST and
1062 op.priority >= constants.OP_PRIO_HIGHEST)
1063
1064
1065 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1066
1067 else:
1068
1069 assert (opctx.index == 0 or
1070 compat.all(i.status == constants.OP_STATUS_SUCCESS
1071 for i in job.ops[:opctx.index]))
1072
1073
1074 job.cur_opctx = None
1075
1076 if op.status == constants.OP_STATUS_SUCCESS:
1077 finalize = False
1078
1079 elif op.status == constants.OP_STATUS_ERROR:
1080
1081
1082
1083
1084 to_encode = errors.OpExecError("Preceding opcode failed")
1085 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1086 _EncodeOpError(to_encode))
1087 finalize = True
1088 elif op.status == constants.OP_STATUS_CANCELING:
1089 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1090 "Job canceled by request")
1091 finalize = True
1092
1093 else:
1094 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1095
1096 if opctx.index == (opcount - 1):
1097
1098 finalize = True
1099
1100 if finalize:
1101
1102 job.Finalize()
1103
1104
1105
1106 queue.UpdateJobUnlocked(job)
1107
1108 assert not waitjob
1109
1110 if finalize:
1111 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1112 return self.FINISHED
1113
1114 assert not waitjob or queue.depmgr.JobWaiting(job)
1115
1116 if waitjob:
1117 return self.WAITDEP
1118 else:
1119 return self.DEFER
1120 finally:
1121 assert job.writable, "Job became read-only while being processed"
1122
1125 """Keeps track of job dependencies.
1126
1127 """
1128 (WAIT,
1129 ERROR,
1130 CANCEL,
1131 CONTINUE,
1132 WRONGSTATUS) = range(1, 6)
1133
1135 """Initializes this class.
1136
1137 """
1138 self._getstatus_fn = getstatus_fn
1139
1140 self._waiters = {}
1141
1143 """Checks if a job is waiting.
1144
1145 """
1146 return compat.any(job in jobs
1147 for jobs in self._waiters.values())
1148
1150 """Checks if a dependency job has the requested status.
1151
1152 If the other job is not yet in a finalized status, the calling job will be
1153 notified (re-added to the workerpool) at a later point.
1154
1155 @type job: L{_QueuedJob}
1156 @param job: Job object
1157 @type dep_job_id: int
1158 @param dep_job_id: ID of dependency job
1159 @type dep_status: list
1160 @param dep_status: Required status
1161
1162 """
1163 assert ht.TJobId(job.id)
1164 assert ht.TJobId(dep_job_id)
1165 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1166
1167 if job.id == dep_job_id:
1168 return (self.ERROR, "Job can't depend on itself")
1169
1170
1171 try:
1172 status = self._getstatus_fn(dep_job_id)
1173 except errors.JobLost, err:
1174 return (self.ERROR, "Dependency error: %s" % err)
1175
1176 assert status in constants.JOB_STATUS_ALL
1177
1178 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1179
1180 if status not in constants.JOBS_FINALIZED:
1181
1182 job_id_waiters.add(job)
1183 return (self.WAIT,
1184 "Need to wait for job %s, wanted status '%s'" %
1185 (dep_job_id, dep_status))
1186
1187
1188 if job in job_id_waiters:
1189 job_id_waiters.remove(job)
1190
1191 if (status == constants.JOB_STATUS_CANCELED and
1192 constants.JOB_STATUS_CANCELED not in dep_status):
1193 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1194
1195 elif not dep_status or status in dep_status:
1196 return (self.CONTINUE,
1197 "Dependency job %s finished with status '%s'" %
1198 (dep_job_id, status))
1199
1200 else:
1201 return (self.WRONGSTATUS,
1202 "Dependency job %s finished with status '%s',"
1203 " not one of '%s' as required" %
1204 (dep_job_id, status, utils.CommaJoin(dep_status)))
1205
1207 """Remove all jobs without actual waiters.
1208
1209 """
1210 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1211 if not waiters]:
1212 del self._waiters[job_id]
1213
1216 """Queue used to manage the jobs.
1217
1218 """
1220 """Constructor for JobQueue.
1221
1222 The constructor will initialize the job queue object and then
1223 start loading the current jobs from disk, either for starting them
1224 (if they were queue) or for aborting them (if they were already
1225 running).
1226
1227 @type context: GanetiContext
1228 @param context: the context object for access to the configuration
1229 data and other ganeti objects
1230
1231 """
1232 self.context = context
1233 self._memcache = weakref.WeakValueDictionary()
1234 self._my_hostname = netutils.Hostname.GetSysName()
1235
1236
1237 self._nodes = dict((n.name, n.primary_ip)
1238 for n in cfg.GetAllNodesInfo().values()
1239 if n.master_candidate)
1240
1241
1242 self._nodes.pop(self._my_hostname, None)
1243
1244
1245 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies)
1246
1248 """Gets RPC runner with context.
1249
1250 """
1251 return rpc.JobQueueRunner(self.context, address_list)
1252
1253 @staticmethod
1255 """Verifies the status of an RPC call.
1256
1257 Since we aim to keep consistency should this node (the current
1258 master) fail, we will log errors if our rpc fail, and especially
1259 log the case when more than half of the nodes fails.
1260
1261 @param result: the data as returned from the rpc call
1262 @type nodes: list
1263 @param nodes: the list of nodes we made the call to
1264 @type failmsg: str
1265 @param failmsg: the identifier to be used for logging
1266
1267 """
1268 failed = []
1269 success = []
1270
1271 for node in nodes:
1272 msg = result[node].fail_msg
1273 if msg:
1274 failed.append(node)
1275 logging.error("RPC call %s (%s) failed on node %s: %s",
1276 result[node].call, failmsg, node, msg)
1277 else:
1278 success.append(node)
1279
1280
1281 if (len(success) + 1) < len(failed):
1282
1283 logging.error("More than half of the nodes failed")
1284
1286 """Helper for returning the node name/ip list.
1287
1288 @rtype: (list, list)
1289 @return: a tuple of two lists, the first one with the node
1290 names and the second one with the node addresses
1291
1292 """
1293
1294 name_list = self._nodes.keys()
1295 addr_list = [self._nodes[name] for name in name_list]
1296 return name_list, addr_list
1297
1299 """Writes a file locally and then replicates it to all nodes.
1300
1301 This function will replace the contents of a file on the local
1302 node and then replicate it to all the other nodes we have.
1303
1304 @type file_name: str
1305 @param file_name: the path of the file to be replicated
1306 @type data: str
1307 @param data: the new contents of the file
1308 @type replicate: boolean
1309 @param replicate: whether to spread the changes to the remote nodes
1310
1311 """
1312 getents = runtime.GetEnts()
1313 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1314 gid=getents.daemons_gid,
1315 mode=constants.JOB_QUEUE_FILES_PERMS)
1316
1317 if replicate:
1318 names, addrs = self._GetNodeIp()
1319 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1320 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1321
1323 """Renames a file locally and then replicate the change.
1324
1325 This function will rename a file in the local queue directory
1326 and then replicate this rename to all the other nodes we have.
1327
1328 @type rename: list of (old, new)
1329 @param rename: List containing tuples mapping old to new names
1330
1331 """
1332
1333 for old, new in rename:
1334 utils.RenameFile(old, new, mkdir=True)
1335
1336
1337 names, addrs = self._GetNodeIp()
1338 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1339 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1340
1341 @staticmethod
1343 """Returns the job file for a given job id.
1344
1345 @type job_id: str
1346 @param job_id: the job identifier
1347 @rtype: str
1348 @return: the path to the job file
1349
1350 """
1351 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1352
1353 @staticmethod
1355 """Returns the archived job file for a give job id.
1356
1357 @type job_id: str
1358 @param job_id: the job identifier
1359 @rtype: str
1360 @return: the path to the archived job file
1361
1362 """
1363 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1364 jstore.GetArchiveDirectory(job_id),
1365 "job-%s" % job_id)
1366
1367 @staticmethod
1369 """Build list of directories containing job files.
1370
1371 @type archived: bool
1372 @param archived: Whether to include directories for archived jobs
1373 @rtype: list
1374
1375 """
1376 result = [pathutils.QUEUE_DIR]
1377
1378 if archived:
1379 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1380 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1381 utils.ListVisibleFiles(archive_path)))
1382
1383 return result
1384
1385 @classmethod
1387 """Return all known job IDs.
1388
1389 The method only looks at disk because it's a requirement that all
1390 jobs are present on disk (so in the _memcache we don't have any
1391 extra IDs).
1392
1393 @type sort: boolean
1394 @param sort: perform sorting on the returned job ids
1395 @rtype: list
1396 @return: the list of job IDs
1397
1398 """
1399 jlist = []
1400
1401 for path in cls._DetermineJobDirectories(archived):
1402 for filename in utils.ListVisibleFiles(path):
1403 m = constants.JOB_FILE_RE.match(filename)
1404 if m:
1405 jlist.append(int(m.group(1)))
1406
1407 if sort:
1408 jlist.sort()
1409 return jlist
1410
1412 """Loads a job from the disk or memory.
1413
1414 Given a job id, this will return the cached job object if
1415 existing, or try to load the job from the disk. If loading from
1416 disk, it will also add the job to the cache.
1417
1418 @type job_id: int
1419 @param job_id: the job id
1420 @rtype: L{_QueuedJob} or None
1421 @return: either None or the job object
1422
1423 """
1424 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1425
1426 job = self._memcache.get(job_id, None)
1427 if job:
1428 logging.debug("Found job %s in memcache", job_id)
1429 assert job.writable, "Found read-only job in memcache"
1430 return job
1431
1432 try:
1433 job = JobQueue._LoadJobFromDisk(self, job_id, False)
1434 if job is None:
1435 return job
1436 except errors.JobFileCorrupted:
1437 old_path = self._GetJobPath(job_id)
1438 new_path = self._GetArchivedJobPath(job_id)
1439 if old_path == new_path:
1440
1441 logging.exception("Can't parse job %s", job_id)
1442 else:
1443
1444 logging.exception("Can't parse job %s, will archive.", job_id)
1445 self._RenameFilesUnlocked([(old_path, new_path)])
1446 return None
1447
1448 assert job.writable, "Job just loaded is not writable"
1449
1450 self._memcache[job_id] = job
1451 logging.debug("Added job %s to the cache", job_id)
1452 return job
1453
1454 @staticmethod
1456 """Load the given job file from disk.
1457
1458 Given a job file, read, load and restore it in a _QueuedJob format.
1459
1460 @type job_id: int
1461 @param job_id: job identifier
1462 @type try_archived: bool
1463 @param try_archived: Whether to try loading an archived job
1464 @rtype: L{_QueuedJob} or None
1465 @return: either None or the job object
1466
1467 """
1468 path_functions = [(JobQueue._GetJobPath, False)]
1469
1470 if try_archived:
1471 path_functions.append((JobQueue._GetArchivedJobPath, True))
1472
1473 raw_data = None
1474 archived = None
1475
1476 for (fn, archived) in path_functions:
1477 filepath = fn(job_id)
1478 logging.debug("Loading job from %s", filepath)
1479 try:
1480 raw_data = utils.ReadFile(filepath)
1481 except EnvironmentError, err:
1482 if err.errno != errno.ENOENT:
1483 raise
1484 else:
1485 break
1486
1487 if not raw_data:
1488 logging.debug("No data available for job %s", job_id)
1489 return None
1490
1491 if writable is None:
1492 writable = not archived
1493
1494 try:
1495 data = serializer.LoadJson(raw_data)
1496 job = _QueuedJob.Restore(queue, data, writable, archived)
1497 except Exception, err:
1498 raise errors.JobFileCorrupted(err)
1499
1500 return job
1501
1502 @staticmethod
1504 """Load the given job file from disk.
1505
1506 Given a job file, read, load and restore it in a _QueuedJob format.
1507 In case of error reading the job, it gets returned as None, and the
1508 exception is logged.
1509
1510 @type job_id: int
1511 @param job_id: job identifier
1512 @type try_archived: bool
1513 @param try_archived: Whether to try loading an archived job
1514 @rtype: L{_QueuedJob} or None
1515 @return: either None or the job object
1516
1517 """
1518 try:
1519 return JobQueue._LoadJobFromDisk(queue, job_id, try_archived,
1520 writable=writable)
1521 except (errors.JobFileCorrupted, EnvironmentError):
1522 logging.exception("Can't load/parse job %s", job_id)
1523 return None
1524
1525 @classmethod
1531
1532 @staticmethod
1534 """Resolves relative job IDs in dependencies.
1535
1536 @type resolve_fn: callable
1537 @param resolve_fn: Function to resolve a relative job ID
1538 @type deps: list
1539 @param deps: Dependencies
1540 @rtype: tuple; (boolean, string or list)
1541 @return: If successful (first tuple item), the returned list contains
1542 resolved job IDs along with the requested status; if not successful,
1543 the second element is an error message
1544
1545 """
1546 result = []
1547
1548 for (dep_job_id, dep_status) in deps:
1549 if ht.TRelativeJobId(dep_job_id):
1550 assert ht.TInt(dep_job_id) and dep_job_id < 0
1551 try:
1552 job_id = resolve_fn(dep_job_id)
1553 except IndexError:
1554
1555 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
1556 else:
1557 job_id = dep_job_id
1558
1559 result.append((job_id, dep_status))
1560
1561 return (True, result)
1562
1564 """Gets the status of a job for dependencies.
1565
1566 @type job_id: int
1567 @param job_id: Job ID
1568 @raise errors.JobLost: If job can't be found
1569
1570 """
1571
1572
1573
1574 job = JobQueue.SafeLoadJobFromDisk(self, job_id, True, writable=False)
1575
1576 if job:
1577 assert not job.writable, "Got writable job"
1578
1579 if job:
1580 return job.CalcStatus()
1581
1582 raise errors.JobLost("Job %s not found" % job_id)
1583
1585 """Update a job's on disk storage.
1586
1587 After a job has been modified, this function needs to be called in
1588 order to write the changes to disk and replicate them to the other
1589 nodes.
1590
1591 @type job: L{_QueuedJob}
1592 @param job: the changed job
1593 @type replicate: boolean
1594 @param replicate: whether to replicate the change to remote nodes
1595
1596 """
1597 if __debug__:
1598 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1599 assert (finalized ^ (job.end_timestamp is None))
1600 assert job.writable, "Can't update read-only job"
1601 assert not job.archived, "Can't update archived job"
1602
1603 filename = self._GetJobPath(job.id)
1604 data = serializer.DumpJson(job.Serialize())
1605 logging.debug("Writing job %s to %s", job.id, filename)
1606 self._UpdateJobQueueFile(filename, data, replicate)
1607
1629
1631 """Cancels a job.
1632
1633 This will only succeed if the job has not started yet.
1634
1635 @type job_id: int
1636 @param job_id: job ID of job to be cancelled.
1637
1638 """
1639 logging.info("Cancelling job %s", job_id)
1640
1641 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1642
1644 """Changes a job's priority.
1645
1646 @type job_id: int
1647 @param job_id: ID of the job whose priority should be changed
1648 @type priority: int
1649 @param priority: New priority
1650
1651 """
1652 logging.info("Changing priority of job %s to %s", job_id, priority)
1653
1654 if priority not in constants.OP_PRIO_SUBMIT_VALID:
1655 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1656 raise errors.GenericError("Invalid priority %s, allowed are %s" %
1657 (priority, allowed))
1658
1659 def fn(job):
1660 (success, msg) = job.ChangePriority(priority)
1661 return (success, msg)
1662
1663 return self._ModifyJobUnlocked(job_id, fn)
1664
1666 """Modifies a job.
1667
1668 @type job_id: int
1669 @param job_id: Job ID
1670 @type mod_fn: callable
1671 @param mod_fn: Modifying function, receiving job object as parameter,
1672 returning tuple of (status boolean, message string)
1673
1674 """
1675 job = self._LoadJobUnlocked(job_id)
1676 if not job:
1677 logging.debug("Job %s not found", job_id)
1678 return (False, "Job %s not found" % job_id)
1679
1680 assert job.writable, "Can't modify read-only job"
1681 assert not job.archived, "Can't modify archived job"
1682
1683 (success, msg) = mod_fn(job)
1684
1685 if success:
1686
1687
1688 self.UpdateJobUnlocked(job)
1689
1690 return (success, msg)
1691