1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the job queue handling.
32
33 """
34
35 import logging
36 import errno
37 import time
38 import weakref
39 import threading
40 import itertools
41 import operator
42 import os
43
44 try:
45
46 from pyinotify import pyinotify
47 except ImportError:
48 import pyinotify
49
50 from ganeti import asyncnotifier
51 from ganeti import constants
52 from ganeti import serializer
53 from ganeti import locking
54 from ganeti import luxi
55 from ganeti import opcodes
56 from ganeti import opcodes_base
57 from ganeti import errors
58 from ganeti import mcpu
59 from ganeti import utils
60 from ganeti import jstore
61 import ganeti.rpc.node as rpc
62 from ganeti import runtime
63 from ganeti import netutils
64 from ganeti import compat
65 from ganeti import ht
66 from ganeti import query
67 from ganeti import qlang
68 from ganeti import pathutils
69 from ganeti import vcluster
70 from ganeti.cmdlib import cluster
71
72
73
74 _GetIdAttr = operator.attrgetter("id")
78 """Special exception to cancel a job.
79
80 """
81
84 """Returns the current timestamp.
85
86 @rtype: tuple
87 @return: the current time in the (seconds, microseconds) format
88
89 """
90 return utils.SplitTime(time.time())
91
99
102 """Encapsulates an opcode object.
103
104 @ivar log: holds the execution log and consists of tuples
105 of the form C{(log_serial, timestamp, level, message)}
106 @ivar input: the OpCode we encapsulate
107 @ivar status: the current status
108 @ivar result: the result of the LU execution
109 @ivar start_timestamp: timestamp for the start of the execution
110 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
111 @ivar stop_timestamp: timestamp for the end of the execution
112
113 """
114 __slots__ = ["input", "status", "result", "log", "priority",
115 "start_timestamp", "exec_timestamp", "end_timestamp",
116 "__weakref__"]
117
119 """Initializes instances of this class.
120
121 @type op: L{opcodes.OpCode}
122 @param op: the opcode we encapsulate
123
124 """
125 self.input = op
126 self.status = constants.OP_STATUS_QUEUED
127 self.result = None
128 self.log = []
129 self.start_timestamp = None
130 self.exec_timestamp = None
131 self.end_timestamp = None
132
133
134 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
135
136 @classmethod
138 """Restore the _QueuedOpCode from the serialized form.
139
140 @type state: dict
141 @param state: the serialized state
142 @rtype: _QueuedOpCode
143 @return: a new _QueuedOpCode instance
144
145 """
146 obj = _QueuedOpCode.__new__(cls)
147 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
148 obj.status = state["status"]
149 obj.result = state["result"]
150 obj.log = state["log"]
151 obj.start_timestamp = state.get("start_timestamp", None)
152 obj.exec_timestamp = state.get("exec_timestamp", None)
153 obj.end_timestamp = state.get("end_timestamp", None)
154 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
155 return obj
156
158 """Serializes this _QueuedOpCode.
159
160 @rtype: dict
161 @return: the dictionary holding the serialized state
162
163 """
164 return {
165 "input": self.input.__getstate__(),
166 "status": self.status,
167 "result": self.result,
168 "log": self.log,
169 "start_timestamp": self.start_timestamp,
170 "exec_timestamp": self.exec_timestamp,
171 "end_timestamp": self.end_timestamp,
172 "priority": self.priority,
173 }
174
177 """In-memory job representation.
178
179 This is what we use to track the user-submitted jobs. Locking must
180 be taken care of by users of this class.
181
182 @type queue: L{JobQueue}
183 @ivar queue: the parent queue
184 @ivar id: the job ID
185 @type ops: list
186 @ivar ops: the list of _QueuedOpCode that constitute the job
187 @type log_serial: int
188 @ivar log_serial: holds the index for the next log entry
189 @ivar received_timestamp: the timestamp for when the job was received
190 @ivar start_timestmap: the timestamp for start of execution
191 @ivar end_timestamp: the timestamp for end of execution
192 @ivar writable: Whether the job is allowed to be modified
193
194 """
195
196 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
197 "received_timestamp", "start_timestamp", "end_timestamp",
198 "writable", "archived",
199 "livelock", "process_id",
200 "__weakref__"]
201
203 """Extend the reason trail
204
205 Add the reason for all the opcodes of this job to be executed.
206
207 """
208 count = 0
209 for queued_op in self.ops:
210 op = queued_op.input
211 if pickup:
212 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
213 else:
214 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
215 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
216 reason_src_prefix)
217 reason_text = "job=%d;index=%d" % (self.id, count)
218 reason = getattr(op, "reason", [])
219 reason.append((reason_src, reason_text, utils.EpochNano()))
220 op.reason = reason
221 count = count + 1
222
223 - def __init__(self, queue, job_id, ops, writable):
224 """Constructor for the _QueuedJob.
225
226 @type queue: L{JobQueue}
227 @param queue: our parent queue
228 @type job_id: job_id
229 @param job_id: our job id
230 @type ops: list
231 @param ops: the list of opcodes we hold, which will be encapsulated
232 in _QueuedOpCodes
233 @type writable: bool
234 @param writable: Whether job can be modified
235
236 """
237 if not ops:
238 raise errors.GenericError("A job needs at least one opcode")
239
240 self.queue = queue
241 self.id = int(job_id)
242 self.ops = [_QueuedOpCode(op) for op in ops]
243 self.AddReasons()
244 self.log_serial = 0
245 self.received_timestamp = TimeStampNow()
246 self.start_timestamp = None
247 self.end_timestamp = None
248 self.archived = False
249 self.livelock = None
250 self.process_id = None
251
252 self._InitInMemory(self, writable)
253
254 assert not self.archived, "New jobs can not be marked as archived"
255
256 @staticmethod
258 """Initializes in-memory variables.
259
260 """
261 obj.writable = writable
262 obj.ops_iter = None
263 obj.cur_opctx = None
264
266 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
267 "id=%s" % self.id,
268 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
269
270 return "<%s at %#x>" % (" ".join(status), id(self))
271
272 @classmethod
273 - def Restore(cls, queue, state, writable, archived):
274 """Restore a _QueuedJob from serialized state:
275
276 @type queue: L{JobQueue}
277 @param queue: to which queue the restored job belongs
278 @type state: dict
279 @param state: the serialized state
280 @type writable: bool
281 @param writable: Whether job can be modified
282 @type archived: bool
283 @param archived: Whether job was already archived
284 @rtype: _JobQueue
285 @return: the restored _JobQueue instance
286
287 """
288 obj = _QueuedJob.__new__(cls)
289 obj.queue = queue
290 obj.id = int(state["id"])
291 obj.received_timestamp = state.get("received_timestamp", None)
292 obj.start_timestamp = state.get("start_timestamp", None)
293 obj.end_timestamp = state.get("end_timestamp", None)
294 obj.archived = archived
295 obj.livelock = state.get("livelock", None)
296 obj.process_id = state.get("process_id", None)
297 if obj.process_id is not None:
298 obj.process_id = int(obj.process_id)
299
300 obj.ops = []
301 obj.log_serial = 0
302 for op_state in state["ops"]:
303 op = _QueuedOpCode.Restore(op_state)
304 for log_entry in op.log:
305 obj.log_serial = max(obj.log_serial, log_entry[0])
306 obj.ops.append(op)
307
308 cls._InitInMemory(obj, writable)
309
310 return obj
311
313 """Serialize the _JobQueue instance.
314
315 @rtype: dict
316 @return: the serialized state
317
318 """
319 return {
320 "id": self.id,
321 "ops": [op.Serialize() for op in self.ops],
322 "start_timestamp": self.start_timestamp,
323 "end_timestamp": self.end_timestamp,
324 "received_timestamp": self.received_timestamp,
325 "livelock": self.livelock,
326 "process_id": self.process_id,
327 }
328
381
383 """Gets the current priority for this job.
384
385 Only unfinished opcodes are considered. When all are done, the default
386 priority is used.
387
388 @rtype: int
389
390 """
391 priorities = [op.priority for op in self.ops
392 if op.status not in constants.OPS_FINALIZED]
393
394 if not priorities:
395
396 return constants.OP_PRIO_DEFAULT
397
398 return min(priorities)
399
401 """Selectively returns the log entries.
402
403 @type newer_than: None or int
404 @param newer_than: if this is None, return all log entries,
405 otherwise return only the log entries with serial higher
406 than this value
407 @rtype: list
408 @return: the list of the log entries selected
409
410 """
411 if newer_than is None:
412 serial = -1
413 else:
414 serial = newer_than
415
416 entries = []
417 for op in self.ops:
418 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
419
420 return entries
421
423 """Mark unfinished opcodes with a given status and result.
424
425 This is an utility function for marking all running or waiting to
426 be run opcodes with a given status. Opcodes which are already
427 finalised are not changed.
428
429 @param status: a given opcode status
430 @param result: the opcode result
431
432 """
433 not_marked = True
434 for op in self.ops:
435 if op.status in constants.OPS_FINALIZED:
436 assert not_marked, "Finalized opcodes found after non-finalized ones"
437 continue
438 op.status = status
439 op.result = result
440 not_marked = False
441
443 """Marks the job as finalized.
444
445 """
446 self.end_timestamp = TimeStampNow()
447
472
474 """Changes the job priority.
475
476 @type priority: int
477 @param priority: New priority
478 @rtype: tuple; (bool, string)
479 @return: Boolean describing whether job's priority was successfully changed
480 and a text message
481
482 """
483 status = self.CalcStatus()
484
485 if status in constants.JOBS_FINALIZED:
486 return (False, "Job %s is finished" % self.id)
487 elif status == constants.JOB_STATUS_CANCELING:
488 return (False, "Job %s is cancelling" % self.id)
489 else:
490 assert status in (constants.JOB_STATUS_QUEUED,
491 constants.JOB_STATUS_WAITING,
492 constants.JOB_STATUS_RUNNING)
493
494 changed = False
495 for op in self.ops:
496 if (op.status == constants.OP_STATUS_RUNNING or
497 op.status in constants.OPS_FINALIZED):
498 assert not changed, \
499 ("Found opcode for which priority should not be changed after"
500 " priority has been changed for previous opcodes")
501 continue
502
503 assert op.status in (constants.OP_STATUS_QUEUED,
504 constants.OP_STATUS_WAITING)
505
506 changed = True
507
508
509 op.priority = priority
510
511 if changed:
512 return (True, ("Priorities of pending opcodes for job %s have been"
513 " changed to %s" % (self.id, priority)))
514 else:
515 return (False, "Job %s had no pending opcodes" % self.id)
516
518 """Sets the job's process ID
519
520 @type pid: int
521 @param pid: the process ID
522
523 """
524 status = self.CalcStatus()
525
526 if status in (constants.JOB_STATUS_QUEUED,
527 constants.JOB_STATUS_WAITING):
528 if self.process_id is not None:
529 logging.warning("Replacing the process id %s of job %s with %s",
530 self.process_id, self.id, pid)
531 self.process_id = pid
532 else:
533 logging.warning("Can set pid only for queued/waiting jobs")
534
537
539 """Initializes this class.
540
541 @type queue: L{JobQueue}
542 @param queue: Job queue
543 @type job: L{_QueuedJob}
544 @param job: Job object
545 @type op: L{_QueuedOpCode}
546 @param op: OpCode
547
548 """
549 super(_OpExecCallbacks, self).__init__()
550
551 assert queue, "Queue is missing"
552 assert job, "Job is missing"
553 assert op, "Opcode is missing"
554
555 self._queue = queue
556 self._job = job
557 self._op = op
558
560 """Raises an exception to cancel the job if asked to.
561
562 """
563
564 if self._op.status == constants.OP_STATUS_CANCELING:
565 logging.debug("Canceling opcode")
566 raise CancelJob()
567
569 """Mark the opcode as running, not lock-waiting.
570
571 This is called from the mcpu code as a notifier function, when the LU is
572 finally about to start the Exec() method. Of course, to have end-user
573 visible results, the opcode must be initially (before calling into
574 Processor.ExecOpCode) set to OP_STATUS_WAITING.
575
576 """
577 assert self._op in self._job.ops
578 assert self._op.status in (constants.OP_STATUS_WAITING,
579 constants.OP_STATUS_CANCELING)
580
581
582 self._CheckCancel()
583
584 logging.debug("Opcode is now running")
585
586 self._op.status = constants.OP_STATUS_RUNNING
587 self._op.exec_timestamp = TimeStampNow()
588
589
590 self._queue.UpdateJobUnlocked(self._job)
591
593 """Mark opcode again as lock-waiting.
594
595 This is called from the mcpu code just after calling PrepareRetry.
596 The opcode will now again acquire locks (more, hopefully).
597
598 """
599 self._op.status = constants.OP_STATUS_WAITING
600 logging.debug("Opcode will be retried. Back to waiting.")
601
603 """Internal feedback append function, with locks
604
605 """
606 self._job.log_serial += 1
607 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
608 self._queue.UpdateJobUnlocked(self._job, replicate=False)
609
626
638
640 """Submits jobs for processing.
641
642 See L{JobQueue.SubmitManyJobs}.
643
644 """
645
646 return self._queue.SubmitManyJobs(jobs)
647
659
663 """Initializes this class.
664
665 """
666 self._fn = fn
667 self._next = None
668
670 """Gets the next timeout if necessary.
671
672 """
673 if self._next is None:
674 self._next = self._fn()
675
677 """Returns the next timeout.
678
679 """
680 self._Advance()
681 return self._next
682
684 """Returns the current timeout and advances the internal state.
685
686 """
687 self._Advance()
688 result = self._next
689 self._next = None
690 return result
691
692
693 -class _OpExecContext:
694 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
695 """Initializes this class.
696
697 """
698 self.op = op
699 self.index = index
700 self.log_prefix = log_prefix
701 self.summary = op.input.Summary()
702
703
704 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
705 self.jobdeps = op.input.depends[:]
706 else:
707 self.jobdeps = None
708
709 self._timeout_strategy_factory = timeout_strategy_factory
710 self._ResetTimeoutStrategy()
711
713 """Creates a new timeout strategy.
714
715 """
716 self._timeout_strategy = \
717 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
718
720 """Checks whether priority can and should be increased.
721
722 Called when locks couldn't be acquired.
723
724 """
725 op = self.op
726
727
728
729 if (self._timeout_strategy.Peek() is None and
730 op.priority > constants.OP_PRIO_HIGHEST):
731 logging.debug("Increasing priority")
732 op.priority -= 1
733 self._ResetTimeoutStrategy()
734 return True
735
736 return False
737
739 """Returns the next lock acquire timeout.
740
741 """
742 return self._timeout_strategy.Next()
743
746 (DEFER,
747 WAITDEP,
748 FINISHED) = range(1, 4)
749
752 """Initializes this class.
753
754 """
755 self.queue = queue
756 self.opexec_fn = opexec_fn
757 self.job = job
758 self._timeout_strategy_factory = _timeout_strategy_factory
759
760 @staticmethod
762 """Locates the next opcode to run.
763
764 @type job: L{_QueuedJob}
765 @param job: Job object
766 @param timeout_strategy_factory: Callable to create new timeout strategy
767
768 """
769
770
771
772
773 if job.ops_iter is None:
774 job.ops_iter = enumerate(job.ops)
775
776
777 while True:
778 try:
779 (idx, op) = job.ops_iter.next()
780 except StopIteration:
781 raise errors.ProgrammerError("Called for a finished job")
782
783 if op.status == constants.OP_STATUS_RUNNING:
784
785 raise errors.ProgrammerError("Called for job marked as running")
786
787 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
788 timeout_strategy_factory)
789
790 if op.status not in constants.OPS_FINALIZED:
791 return opctx
792
793
794
795
796
797 logging.info("%s: opcode %s already processed, skipping",
798 opctx.log_prefix, opctx.summary)
799
800 @staticmethod
835
836 @staticmethod
838 """Checks if an opcode has dependencies and if so, processes them.
839
840 @type queue: L{JobQueue}
841 @param queue: Queue object
842 @type job: L{_QueuedJob}
843 @param job: Job object
844 @type opctx: L{_OpExecContext}
845 @param opctx: Opcode execution context
846 @rtype: bool
847 @return: Whether opcode will be re-scheduled by dependency tracker
848
849 """
850 op = opctx.op
851
852 result = False
853
854 while opctx.jobdeps:
855 (dep_job_id, dep_status) = opctx.jobdeps[0]
856
857 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
858 dep_status)
859 assert ht.TNonEmptyString(depmsg), "No dependency message"
860
861 logging.info("%s: %s", opctx.log_prefix, depmsg)
862
863 if depresult == _JobDependencyManager.CONTINUE:
864
865 opctx.jobdeps.pop(0)
866
867 elif depresult == _JobDependencyManager.WAIT:
868
869
870 result = True
871 break
872
873 elif depresult == _JobDependencyManager.CANCEL:
874
875 job.Cancel()
876 assert op.status == constants.OP_STATUS_CANCELING
877 break
878
879 elif depresult in (_JobDependencyManager.WRONGSTATUS,
880 _JobDependencyManager.ERROR):
881
882 op.status = constants.OP_STATUS_ERROR
883 op.result = _EncodeOpError(errors.OpExecError(depmsg))
884 break
885
886 else:
887 raise errors.ProgrammerError("Unknown dependency result '%s'" %
888 depresult)
889
890 return result
891
893 """Processes one opcode and returns the result.
894
895 """
896 op = opctx.op
897
898 assert op.status in (constants.OP_STATUS_WAITING,
899 constants.OP_STATUS_CANCELING)
900
901
902 if op.status == constants.OP_STATUS_CANCELING:
903 return (constants.OP_STATUS_CANCELING, None)
904
905 timeout = opctx.GetNextLockTimeout()
906
907 try:
908
909 result = self.opexec_fn(op.input,
910 _OpExecCallbacks(self.queue, self.job, op),
911 timeout=timeout)
912 except mcpu.LockAcquireTimeout:
913 assert timeout is not None, "Received timeout for blocking acquire"
914 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
915
916 assert op.status in (constants.OP_STATUS_WAITING,
917 constants.OP_STATUS_CANCELING)
918
919
920 if op.status == constants.OP_STATUS_CANCELING:
921 return (constants.OP_STATUS_CANCELING, None)
922
923
924 return (constants.OP_STATUS_WAITING, None)
925 except CancelJob:
926 logging.exception("%s: Canceling job", opctx.log_prefix)
927 assert op.status == constants.OP_STATUS_CANCELING
928 return (constants.OP_STATUS_CANCELING, None)
929
930 except Exception, err:
931 logging.exception("%s: Caught exception in %s",
932 opctx.log_prefix, opctx.summary)
933 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
934 else:
935 logging.debug("%s: %s successful",
936 opctx.log_prefix, opctx.summary)
937 return (constants.OP_STATUS_SUCCESS, result)
938
940 """Continues execution of a job.
941
942 @param _nextop_fn: Callback function for tests
943 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
944 be deferred and C{WAITDEP} if the dependency manager
945 (L{_JobDependencyManager}) will re-schedule the job when appropriate
946
947 """
948 queue = self.queue
949 job = self.job
950
951 logging.debug("Processing job %s", job.id)
952
953 try:
954 opcount = len(job.ops)
955
956 assert job.writable, "Expected writable job"
957
958
959 if job.CalcStatus() in constants.JOBS_FINALIZED:
960 return self.FINISHED
961
962
963 if job.cur_opctx:
964 opctx = job.cur_opctx
965 job.cur_opctx = None
966 else:
967 if __debug__ and _nextop_fn:
968 _nextop_fn()
969 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
970
971 op = opctx.op
972
973
974 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
975 constants.OP_STATUS_CANCELING)
976 for i in job.ops[opctx.index + 1:])
977
978 assert op.status in (constants.OP_STATUS_QUEUED,
979 constants.OP_STATUS_WAITING,
980 constants.OP_STATUS_CANCELING)
981
982 assert (op.priority <= constants.OP_PRIO_LOWEST and
983 op.priority >= constants.OP_PRIO_HIGHEST)
984
985 waitjob = None
986
987 if op.status != constants.OP_STATUS_CANCELING:
988 assert op.status in (constants.OP_STATUS_QUEUED,
989 constants.OP_STATUS_WAITING)
990
991
992 if self._MarkWaitlock(job, op):
993
994 queue.UpdateJobUnlocked(job)
995
996 assert op.status == constants.OP_STATUS_WAITING
997 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
998 assert job.start_timestamp and op.start_timestamp
999 assert waitjob is None
1000
1001
1002 waitjob = self._CheckDependencies(queue, job, opctx)
1003
1004 assert op.status in (constants.OP_STATUS_WAITING,
1005 constants.OP_STATUS_CANCELING,
1006 constants.OP_STATUS_ERROR)
1007
1008 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1009 constants.OP_STATUS_ERROR)):
1010 logging.info("%s: opcode %s waiting for locks",
1011 opctx.log_prefix, opctx.summary)
1012
1013 assert not opctx.jobdeps, "Not all dependencies were removed"
1014
1015 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1016
1017 op.status = op_status
1018 op.result = op_result
1019
1020 assert not waitjob
1021
1022 if op.status in (constants.OP_STATUS_WAITING,
1023 constants.OP_STATUS_QUEUED):
1024
1025
1026 assert not op.end_timestamp
1027 else:
1028
1029 op.end_timestamp = TimeStampNow()
1030
1031 if op.status == constants.OP_STATUS_CANCELING:
1032 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1033 for i in job.ops[opctx.index:])
1034 else:
1035 assert op.status in constants.OPS_FINALIZED
1036
1037 if op.status == constants.OP_STATUS_QUEUED:
1038
1039 assert not waitjob
1040
1041 finalize = False
1042
1043
1044 job.cur_opctx = None
1045
1046
1047 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1048
1049 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1050 finalize = False
1051
1052 if not waitjob and opctx.CheckPriorityIncrease():
1053
1054 queue.UpdateJobUnlocked(job)
1055
1056
1057 job.cur_opctx = opctx
1058
1059 assert (op.priority <= constants.OP_PRIO_LOWEST and
1060 op.priority >= constants.OP_PRIO_HIGHEST)
1061
1062
1063 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1064
1065 else:
1066
1067 assert (opctx.index == 0 or
1068 compat.all(i.status == constants.OP_STATUS_SUCCESS
1069 for i in job.ops[:opctx.index]))
1070
1071
1072 job.cur_opctx = None
1073
1074 if op.status == constants.OP_STATUS_SUCCESS:
1075 finalize = False
1076
1077 elif op.status == constants.OP_STATUS_ERROR:
1078
1079
1080
1081
1082 to_encode = errors.OpExecError("Preceding opcode failed")
1083 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1084 _EncodeOpError(to_encode))
1085 finalize = True
1086 elif op.status == constants.OP_STATUS_CANCELING:
1087 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1088 "Job canceled by request")
1089 finalize = True
1090
1091 else:
1092 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1093
1094 if opctx.index == (opcount - 1):
1095
1096 finalize = True
1097
1098 if finalize:
1099
1100 job.Finalize()
1101
1102
1103
1104 queue.UpdateJobUnlocked(job)
1105
1106 assert not waitjob
1107
1108 if finalize:
1109 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1110 return self.FINISHED
1111
1112 assert not waitjob or queue.depmgr.JobWaiting(job)
1113
1114 if waitjob:
1115 return self.WAITDEP
1116 else:
1117 return self.DEFER
1118 finally:
1119 assert job.writable, "Job became read-only while being processed"
1120
1123 """Keeps track of job dependencies.
1124
1125 """
1126 (WAIT,
1127 ERROR,
1128 CANCEL,
1129 CONTINUE,
1130 WRONGSTATUS) = range(1, 6)
1131
1133 """Initializes this class.
1134
1135 """
1136 self._getstatus_fn = getstatus_fn
1137
1138 self._waiters = {}
1139
1141 """Checks if a job is waiting.
1142
1143 """
1144 return compat.any(job in jobs
1145 for jobs in self._waiters.values())
1146
1148 """Checks if a dependency job has the requested status.
1149
1150 If the other job is not yet in a finalized status, the calling job will be
1151 notified (re-added to the workerpool) at a later point.
1152
1153 @type job: L{_QueuedJob}
1154 @param job: Job object
1155 @type dep_job_id: int
1156 @param dep_job_id: ID of dependency job
1157 @type dep_status: list
1158 @param dep_status: Required status
1159
1160 """
1161 assert ht.TJobId(job.id)
1162 assert ht.TJobId(dep_job_id)
1163 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1164
1165 if job.id == dep_job_id:
1166 return (self.ERROR, "Job can't depend on itself")
1167
1168
1169 try:
1170 status = self._getstatus_fn(dep_job_id)
1171 except errors.JobLost, err:
1172 return (self.ERROR, "Dependency error: %s" % err)
1173
1174 assert status in constants.JOB_STATUS_ALL
1175
1176 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1177
1178 if status not in constants.JOBS_FINALIZED:
1179
1180 job_id_waiters.add(job)
1181 return (self.WAIT,
1182 "Need to wait for job %s, wanted status '%s'" %
1183 (dep_job_id, dep_status))
1184
1185
1186 if job in job_id_waiters:
1187 job_id_waiters.remove(job)
1188
1189 if (status == constants.JOB_STATUS_CANCELED and
1190 constants.JOB_STATUS_CANCELED not in dep_status):
1191 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1192
1193 elif not dep_status or status in dep_status:
1194 return (self.CONTINUE,
1195 "Dependency job %s finished with status '%s'" %
1196 (dep_job_id, status))
1197
1198 else:
1199 return (self.WRONGSTATUS,
1200 "Dependency job %s finished with status '%s',"
1201 " not one of '%s' as required" %
1202 (dep_job_id, status, utils.CommaJoin(dep_status)))
1203
1205 """Remove all jobs without actual waiters.
1206
1207 """
1208 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1209 if not waiters]:
1210 del self._waiters[job_id]
1211
1214 """Queue used to manage the jobs.
1215
1216 """
1218 """Constructor for JobQueue.
1219
1220 The constructor will initialize the job queue object and then
1221 start loading the current jobs from disk, either for starting them
1222 (if they were queue) or for aborting them (if they were already
1223 running).
1224
1225 @type context: GanetiContext
1226 @param context: the context object for access to the configuration
1227 data and other ganeti objects
1228
1229 """
1230 self.context = context
1231 self._memcache = weakref.WeakValueDictionary()
1232 self._my_hostname = netutils.Hostname.GetSysName()
1233
1234
1235 self._nodes = dict((n.name, n.primary_ip)
1236 for n in cfg.GetAllNodesInfo().values()
1237 if n.master_candidate)
1238
1239
1240 self._nodes.pop(self._my_hostname, None)
1241
1242
1243 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies)
1244
1246 """Gets RPC runner with context.
1247
1248 """
1249 return rpc.JobQueueRunner(self.context, address_list)
1250
1251 @staticmethod
1253 """Verifies the status of an RPC call.
1254
1255 Since we aim to keep consistency should this node (the current
1256 master) fail, we will log errors if our rpc fail, and especially
1257 log the case when more than half of the nodes fails.
1258
1259 @param result: the data as returned from the rpc call
1260 @type nodes: list
1261 @param nodes: the list of nodes we made the call to
1262 @type failmsg: str
1263 @param failmsg: the identifier to be used for logging
1264
1265 """
1266 failed = []
1267 success = []
1268
1269 for node in nodes:
1270 msg = result[node].fail_msg
1271 if msg:
1272 failed.append(node)
1273 logging.error("RPC call %s (%s) failed on node %s: %s",
1274 result[node].call, failmsg, node, msg)
1275 else:
1276 success.append(node)
1277
1278
1279 if (len(success) + 1) < len(failed):
1280
1281 logging.error("More than half of the nodes failed")
1282
1284 """Helper for returning the node name/ip list.
1285
1286 @rtype: (list, list)
1287 @return: a tuple of two lists, the first one with the node
1288 names and the second one with the node addresses
1289
1290 """
1291
1292 name_list = self._nodes.keys()
1293 addr_list = [self._nodes[name] for name in name_list]
1294 return name_list, addr_list
1295
1297 """Writes a file locally and then replicates it to all nodes.
1298
1299 This function will replace the contents of a file on the local
1300 node and then replicate it to all the other nodes we have.
1301
1302 @type file_name: str
1303 @param file_name: the path of the file to be replicated
1304 @type data: str
1305 @param data: the new contents of the file
1306 @type replicate: boolean
1307 @param replicate: whether to spread the changes to the remote nodes
1308
1309 """
1310 getents = runtime.GetEnts()
1311 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1312 gid=getents.daemons_gid,
1313 mode=constants.JOB_QUEUE_FILES_PERMS)
1314
1315 if replicate:
1316 names, addrs = self._GetNodeIp()
1317 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1318 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1319
1321 """Renames a file locally and then replicate the change.
1322
1323 This function will rename a file in the local queue directory
1324 and then replicate this rename to all the other nodes we have.
1325
1326 @type rename: list of (old, new)
1327 @param rename: List containing tuples mapping old to new names
1328
1329 """
1330
1331 for old, new in rename:
1332 utils.RenameFile(old, new, mkdir=True)
1333
1334
1335 names, addrs = self._GetNodeIp()
1336 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1337 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1338
1339 @staticmethod
1341 """Returns the job file for a given job id.
1342
1343 @type job_id: str
1344 @param job_id: the job identifier
1345 @rtype: str
1346 @return: the path to the job file
1347
1348 """
1349 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1350
1351 @staticmethod
1353 """Returns the archived job file for a give job id.
1354
1355 @type job_id: str
1356 @param job_id: the job identifier
1357 @rtype: str
1358 @return: the path to the archived job file
1359
1360 """
1361 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1362 jstore.GetArchiveDirectory(job_id),
1363 "job-%s" % job_id)
1364
1365 @staticmethod
1367 """Build list of directories containing job files.
1368
1369 @type archived: bool
1370 @param archived: Whether to include directories for archived jobs
1371 @rtype: list
1372
1373 """
1374 result = [pathutils.QUEUE_DIR]
1375
1376 if archived:
1377 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1378 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1379 utils.ListVisibleFiles(archive_path)))
1380
1381 return result
1382
1383 @classmethod
1385 """Return all known job IDs.
1386
1387 The method only looks at disk because it's a requirement that all
1388 jobs are present on disk (so in the _memcache we don't have any
1389 extra IDs).
1390
1391 @type sort: boolean
1392 @param sort: perform sorting on the returned job ids
1393 @rtype: list
1394 @return: the list of job IDs
1395
1396 """
1397 jlist = []
1398
1399 for path in cls._DetermineJobDirectories(archived):
1400 for filename in utils.ListVisibleFiles(path):
1401 m = constants.JOB_FILE_RE.match(filename)
1402 if m:
1403 jlist.append(int(m.group(1)))
1404
1405 if sort:
1406 jlist.sort()
1407 return jlist
1408
1410 """Loads a job from the disk or memory.
1411
1412 Given a job id, this will return the cached job object if
1413 existing, or try to load the job from the disk. If loading from
1414 disk, it will also add the job to the cache.
1415
1416 @type job_id: int
1417 @param job_id: the job id
1418 @rtype: L{_QueuedJob} or None
1419 @return: either None or the job object
1420
1421 """
1422 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1423
1424 job = self._memcache.get(job_id, None)
1425 if job:
1426 logging.debug("Found job %s in memcache", job_id)
1427 assert job.writable, "Found read-only job in memcache"
1428 return job
1429
1430 try:
1431 job = self._LoadJobFromDisk(job_id, False)
1432 if job is None:
1433 return job
1434 except errors.JobFileCorrupted:
1435 old_path = self._GetJobPath(job_id)
1436 new_path = self._GetArchivedJobPath(job_id)
1437 if old_path == new_path:
1438
1439 logging.exception("Can't parse job %s", job_id)
1440 else:
1441
1442 logging.exception("Can't parse job %s, will archive.", job_id)
1443 self._RenameFilesUnlocked([(old_path, new_path)])
1444 return None
1445
1446 assert job.writable, "Job just loaded is not writable"
1447
1448 self._memcache[job_id] = job
1449 logging.debug("Added job %s to the cache", job_id)
1450 return job
1451
1453 """Load the given job file from disk.
1454
1455 Given a job file, read, load and restore it in a _QueuedJob format.
1456
1457 @type job_id: int
1458 @param job_id: job identifier
1459 @type try_archived: bool
1460 @param try_archived: Whether to try loading an archived job
1461 @rtype: L{_QueuedJob} or None
1462 @return: either None or the job object
1463
1464 """
1465 path_functions = [(self._GetJobPath, False)]
1466
1467 if try_archived:
1468 path_functions.append((self._GetArchivedJobPath, True))
1469
1470 raw_data = None
1471 archived = None
1472
1473 for (fn, archived) in path_functions:
1474 filepath = fn(job_id)
1475 logging.debug("Loading job from %s", filepath)
1476 try:
1477 raw_data = utils.ReadFile(filepath)
1478 except EnvironmentError, err:
1479 if err.errno != errno.ENOENT:
1480 raise
1481 else:
1482 break
1483
1484 if not raw_data:
1485 logging.debug("No data available for job %s", job_id)
1486 return None
1487
1488 if writable is None:
1489 writable = not archived
1490
1491 try:
1492 data = serializer.LoadJson(raw_data)
1493 job = _QueuedJob.Restore(self, data, writable, archived)
1494 except Exception, err:
1495 raise errors.JobFileCorrupted(err)
1496
1497 return job
1498
1500 """Load the given job file from disk.
1501
1502 Given a job file, read, load and restore it in a _QueuedJob format.
1503 In case of error reading the job, it gets returned as None, and the
1504 exception is logged.
1505
1506 @type job_id: int
1507 @param job_id: job identifier
1508 @type try_archived: bool
1509 @param try_archived: Whether to try loading an archived job
1510 @rtype: L{_QueuedJob} or None
1511 @return: either None or the job object
1512
1513 """
1514 try:
1515 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1516 except (errors.JobFileCorrupted, EnvironmentError):
1517 logging.exception("Can't load/parse job %s", job_id)
1518 return None
1519
1520 @classmethod
1526
1527 @staticmethod
1529 """Resolves relative job IDs in dependencies.
1530
1531 @type resolve_fn: callable
1532 @param resolve_fn: Function to resolve a relative job ID
1533 @type deps: list
1534 @param deps: Dependencies
1535 @rtype: tuple; (boolean, string or list)
1536 @return: If successful (first tuple item), the returned list contains
1537 resolved job IDs along with the requested status; if not successful,
1538 the second element is an error message
1539
1540 """
1541 result = []
1542
1543 for (dep_job_id, dep_status) in deps:
1544 if ht.TRelativeJobId(dep_job_id):
1545 assert ht.TInt(dep_job_id) and dep_job_id < 0
1546 try:
1547 job_id = resolve_fn(dep_job_id)
1548 except IndexError:
1549
1550 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
1551 else:
1552 job_id = dep_job_id
1553
1554 result.append((job_id, dep_status))
1555
1556 return (True, result)
1557
1559 """Gets the status of a job for dependencies.
1560
1561 @type job_id: int
1562 @param job_id: Job ID
1563 @raise errors.JobLost: If job can't be found
1564
1565 """
1566
1567
1568
1569 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1570
1571 if job:
1572 assert not job.writable, "Got writable job"
1573
1574 if job:
1575 return job.CalcStatus()
1576
1577 raise errors.JobLost("Job %s not found" % job_id)
1578
1580 """Update a job's on disk storage.
1581
1582 After a job has been modified, this function needs to be called in
1583 order to write the changes to disk and replicate them to the other
1584 nodes.
1585
1586 @type job: L{_QueuedJob}
1587 @param job: the changed job
1588 @type replicate: boolean
1589 @param replicate: whether to replicate the change to remote nodes
1590
1591 """
1592 if __debug__:
1593 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1594 assert (finalized ^ (job.end_timestamp is None))
1595 assert job.writable, "Can't update read-only job"
1596 assert not job.archived, "Can't update archived job"
1597
1598 filename = self._GetJobPath(job.id)
1599 data = serializer.DumpJson(job.Serialize())
1600 logging.debug("Writing job %s to %s", job.id, filename)
1601 self._UpdateJobQueueFile(filename, data, replicate)
1602
1604 """Checks if a job has been finalized.
1605
1606 @type job_id: int
1607 @param job_id: Job identifier
1608 @rtype: boolean
1609 @return: True if the job has been finalized,
1610 False if the timeout has been reached,
1611 None if the job doesn't exist
1612
1613 """
1614 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1615 if job is not None:
1616 return job.CalcStatus() in constants.JOBS_FINALIZED
1617 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed:
1618
1619
1620
1621 return True
1622 else:
1623 return None
1624
1626 """Cancels a job.
1627
1628 This will only succeed if the job has not started yet.
1629
1630 @type job_id: int
1631 @param job_id: job ID of job to be cancelled.
1632
1633 """
1634 logging.info("Cancelling job %s", job_id)
1635
1636 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1637
1639 """Changes a job's priority.
1640
1641 @type job_id: int
1642 @param job_id: ID of the job whose priority should be changed
1643 @type priority: int
1644 @param priority: New priority
1645
1646 """
1647 logging.info("Changing priority of job %s to %s", job_id, priority)
1648
1649 if priority not in constants.OP_PRIO_SUBMIT_VALID:
1650 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1651 raise errors.GenericError("Invalid priority %s, allowed are %s" %
1652 (priority, allowed))
1653
1654 def fn(job):
1655 (success, msg) = job.ChangePriority(priority)
1656 return (success, msg)
1657
1658 return self._ModifyJobUnlocked(job_id, fn)
1659
1661 """Modifies a job.
1662
1663 @type job_id: int
1664 @param job_id: Job ID
1665 @type mod_fn: callable
1666 @param mod_fn: Modifying function, receiving job object as parameter,
1667 returning tuple of (status boolean, message string)
1668
1669 """
1670 job = self._LoadJobUnlocked(job_id)
1671 if not job:
1672 logging.debug("Job %s not found", job_id)
1673 return (False, "Job %s not found" % job_id)
1674
1675 assert job.writable, "Can't modify read-only job"
1676 assert not job.archived, "Can't modify archived job"
1677
1678 (success, msg) = mod_fn(job)
1679
1680 if success:
1681
1682
1683 self.UpdateJobUnlocked(job)
1684
1685 return (success, msg)
1686