1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the job queue handling.
32
33 Locking: there's a single, large lock in the L{JobQueue} class. It's
34 used by all other classes in this module.
35
36 @var JOBQUEUE_THREADS: the number of worker threads we start for
37 processing jobs
38
39 """
40
41 import logging
42 import errno
43 import time
44 import weakref
45 import threading
46 import itertools
47 import operator
48
49 try:
50
51 from pyinotify import pyinotify
52 except ImportError:
53 import pyinotify
54
55 from ganeti import asyncnotifier
56 from ganeti import constants
57 from ganeti import serializer
58 from ganeti import workerpool
59 from ganeti import locking
60 from ganeti import opcodes
61 from ganeti import opcodes_base
62 from ganeti import errors
63 from ganeti import mcpu
64 from ganeti import utils
65 from ganeti import jstore
66 from ganeti import rpc
67 from ganeti import runtime
68 from ganeti import netutils
69 from ganeti import compat
70 from ganeti import ht
71 from ganeti import query
72 from ganeti import qlang
73 from ganeti import pathutils
74 from ganeti import vcluster
75
76
77 JOBQUEUE_THREADS = 25
78
79
80 _LOCK = "_lock"
81 _QUEUE = "_queue"
82
83
84 _GetIdAttr = operator.attrgetter("id")
88 """Special exception to cancel a job.
89
90 """
91
94 """Special exception to abort a job when the job queue is shutting down.
95
96 """
97
100 """Returns the current timestamp.
101
102 @rtype: tuple
103 @return: the current time in the (seconds, microseconds) format
104
105 """
106 return utils.SplitTime(time.time())
107
115
118 """Wrapper for job queries.
119
120 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
121
122 """
128
130 """Executes a job query using cached field list.
131
132 """
133 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
134
137 """Encapsulates an opcode object.
138
139 @ivar log: holds the execution log and consists of tuples
140 of the form C{(log_serial, timestamp, level, message)}
141 @ivar input: the OpCode we encapsulate
142 @ivar status: the current status
143 @ivar result: the result of the LU execution
144 @ivar start_timestamp: timestamp for the start of the execution
145 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
146 @ivar stop_timestamp: timestamp for the end of the execution
147
148 """
149 __slots__ = ["input", "status", "result", "log", "priority",
150 "start_timestamp", "exec_timestamp", "end_timestamp",
151 "__weakref__"]
152
154 """Initializes instances of this class.
155
156 @type op: L{opcodes.OpCode}
157 @param op: the opcode we encapsulate
158
159 """
160 self.input = op
161 self.status = constants.OP_STATUS_QUEUED
162 self.result = None
163 self.log = []
164 self.start_timestamp = None
165 self.exec_timestamp = None
166 self.end_timestamp = None
167
168
169 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
170
171 @classmethod
173 """Restore the _QueuedOpCode from the serialized form.
174
175 @type state: dict
176 @param state: the serialized state
177 @rtype: _QueuedOpCode
178 @return: a new _QueuedOpCode instance
179
180 """
181 obj = _QueuedOpCode.__new__(cls)
182 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
183 obj.status = state["status"]
184 obj.result = state["result"]
185 obj.log = state["log"]
186 obj.start_timestamp = state.get("start_timestamp", None)
187 obj.exec_timestamp = state.get("exec_timestamp", None)
188 obj.end_timestamp = state.get("end_timestamp", None)
189 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
190 return obj
191
193 """Serializes this _QueuedOpCode.
194
195 @rtype: dict
196 @return: the dictionary holding the serialized state
197
198 """
199 return {
200 "input": self.input.__getstate__(),
201 "status": self.status,
202 "result": self.result,
203 "log": self.log,
204 "start_timestamp": self.start_timestamp,
205 "exec_timestamp": self.exec_timestamp,
206 "end_timestamp": self.end_timestamp,
207 "priority": self.priority,
208 }
209
212 """In-memory job representation.
213
214 This is what we use to track the user-submitted jobs. Locking must
215 be taken care of by users of this class.
216
217 @type queue: L{JobQueue}
218 @ivar queue: the parent queue
219 @ivar id: the job ID
220 @type ops: list
221 @ivar ops: the list of _QueuedOpCode that constitute the job
222 @type log_serial: int
223 @ivar log_serial: holds the index for the next log entry
224 @ivar received_timestamp: the timestamp for when the job was received
225 @ivar start_timestmap: the timestamp for start of execution
226 @ivar end_timestamp: the timestamp for end of execution
227 @ivar writable: Whether the job is allowed to be modified
228
229 """
230
231 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
232 "received_timestamp", "start_timestamp", "end_timestamp",
233 "__weakref__", "processor_lock", "writable", "archived"]
234
236 """Extend the reason trail
237
238 Add the reason for all the opcodes of this job to be executed.
239
240 """
241 count = 0
242 for queued_op in self.ops:
243 op = queued_op.input
244 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
245 reason_text = "job=%d;index=%d" % (self.id, count)
246 reason = getattr(op, "reason", [])
247 reason.append((reason_src, reason_text, utils.EpochNano()))
248 op.reason = reason
249 count = count + 1
250
251 - def __init__(self, queue, job_id, ops, writable):
252 """Constructor for the _QueuedJob.
253
254 @type queue: L{JobQueue}
255 @param queue: our parent queue
256 @type job_id: job_id
257 @param job_id: our job id
258 @type ops: list
259 @param ops: the list of opcodes we hold, which will be encapsulated
260 in _QueuedOpCodes
261 @type writable: bool
262 @param writable: Whether job can be modified
263
264 """
265 if not ops:
266 raise errors.GenericError("A job needs at least one opcode")
267
268 self.queue = queue
269 self.id = int(job_id)
270 self.ops = [_QueuedOpCode(op) for op in ops]
271 self._AddReasons()
272 self.log_serial = 0
273 self.received_timestamp = TimeStampNow()
274 self.start_timestamp = None
275 self.end_timestamp = None
276 self.archived = False
277
278 self._InitInMemory(self, writable)
279
280 assert not self.archived, "New jobs can not be marked as archived"
281
282 @staticmethod
284 """Initializes in-memory variables.
285
286 """
287 obj.writable = writable
288 obj.ops_iter = None
289 obj.cur_opctx = None
290
291
292 if writable:
293 obj.processor_lock = threading.Lock()
294 else:
295 obj.processor_lock = None
296
298 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
299 "id=%s" % self.id,
300 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
301
302 return "<%s at %#x>" % (" ".join(status), id(self))
303
304 @classmethod
305 - def Restore(cls, queue, state, writable, archived):
306 """Restore a _QueuedJob from serialized state:
307
308 @type queue: L{JobQueue}
309 @param queue: to which queue the restored job belongs
310 @type state: dict
311 @param state: the serialized state
312 @type writable: bool
313 @param writable: Whether job can be modified
314 @type archived: bool
315 @param archived: Whether job was already archived
316 @rtype: _JobQueue
317 @return: the restored _JobQueue instance
318
319 """
320 obj = _QueuedJob.__new__(cls)
321 obj.queue = queue
322 obj.id = int(state["id"])
323 obj.received_timestamp = state.get("received_timestamp", None)
324 obj.start_timestamp = state.get("start_timestamp", None)
325 obj.end_timestamp = state.get("end_timestamp", None)
326 obj.archived = archived
327
328 obj.ops = []
329 obj.log_serial = 0
330 for op_state in state["ops"]:
331 op = _QueuedOpCode.Restore(op_state)
332 for log_entry in op.log:
333 obj.log_serial = max(obj.log_serial, log_entry[0])
334 obj.ops.append(op)
335
336 cls._InitInMemory(obj, writable)
337
338 return obj
339
341 """Serialize the _JobQueue instance.
342
343 @rtype: dict
344 @return: the serialized state
345
346 """
347 return {
348 "id": self.id,
349 "ops": [op.Serialize() for op in self.ops],
350 "start_timestamp": self.start_timestamp,
351 "end_timestamp": self.end_timestamp,
352 "received_timestamp": self.received_timestamp,
353 }
354
407
409 """Gets the current priority for this job.
410
411 Only unfinished opcodes are considered. When all are done, the default
412 priority is used.
413
414 @rtype: int
415
416 """
417 priorities = [op.priority for op in self.ops
418 if op.status not in constants.OPS_FINALIZED]
419
420 if not priorities:
421
422 return constants.OP_PRIO_DEFAULT
423
424 return min(priorities)
425
427 """Selectively returns the log entries.
428
429 @type newer_than: None or int
430 @param newer_than: if this is None, return all log entries,
431 otherwise return only the log entries with serial higher
432 than this value
433 @rtype: list
434 @return: the list of the log entries selected
435
436 """
437 if newer_than is None:
438 serial = -1
439 else:
440 serial = newer_than
441
442 entries = []
443 for op in self.ops:
444 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
445
446 return entries
447
449 """Returns information about a job.
450
451 @type fields: list
452 @param fields: names of fields to return
453 @rtype: list
454 @return: list with one element for each field
455 @raise errors.OpExecError: when an invalid field
456 has been passed
457
458 """
459 return _SimpleJobQuery(fields)(self)
460
462 """Mark unfinished opcodes with a given status and result.
463
464 This is an utility function for marking all running or waiting to
465 be run opcodes with a given status. Opcodes which are already
466 finalised are not changed.
467
468 @param status: a given opcode status
469 @param result: the opcode result
470
471 """
472 not_marked = True
473 for op in self.ops:
474 if op.status in constants.OPS_FINALIZED:
475 assert not_marked, "Finalized opcodes found after non-finalized ones"
476 continue
477 op.status = status
478 op.result = result
479 not_marked = False
480
482 """Marks the job as finalized.
483
484 """
485 self.end_timestamp = TimeStampNow()
486
511
513 """Changes the job priority.
514
515 @type priority: int
516 @param priority: New priority
517 @rtype: tuple; (bool, string)
518 @return: Boolean describing whether job's priority was successfully changed
519 and a text message
520
521 """
522 status = self.CalcStatus()
523
524 if status in constants.JOBS_FINALIZED:
525 return (False, "Job %s is finished" % self.id)
526 elif status == constants.JOB_STATUS_CANCELING:
527 return (False, "Job %s is cancelling" % self.id)
528 else:
529 assert status in (constants.JOB_STATUS_QUEUED,
530 constants.JOB_STATUS_WAITING,
531 constants.JOB_STATUS_RUNNING)
532
533 changed = False
534 for op in self.ops:
535 if (op.status == constants.OP_STATUS_RUNNING or
536 op.status in constants.OPS_FINALIZED):
537 assert not changed, \
538 ("Found opcode for which priority should not be changed after"
539 " priority has been changed for previous opcodes")
540 continue
541
542 assert op.status in (constants.OP_STATUS_QUEUED,
543 constants.OP_STATUS_WAITING)
544
545 changed = True
546
547
548 op.priority = priority
549
550 if changed:
551 return (True, ("Priorities of pending opcodes for job %s have been"
552 " changed to %s" % (self.id, priority)))
553 else:
554 return (False, "Job %s had no pending opcodes" % self.id)
555
558
560 """Initializes this class.
561
562 @type queue: L{JobQueue}
563 @param queue: Job queue
564 @type job: L{_QueuedJob}
565 @param job: Job object
566 @type op: L{_QueuedOpCode}
567 @param op: OpCode
568
569 """
570 super(_OpExecCallbacks, self).__init__()
571
572 assert queue, "Queue is missing"
573 assert job, "Job is missing"
574 assert op, "Opcode is missing"
575
576 self._queue = queue
577 self._job = job
578 self._op = op
579
593
594 @locking.ssynchronized(_QUEUE, shared=1)
596 """Mark the opcode as running, not lock-waiting.
597
598 This is called from the mcpu code as a notifier function, when the LU is
599 finally about to start the Exec() method. Of course, to have end-user
600 visible results, the opcode must be initially (before calling into
601 Processor.ExecOpCode) set to OP_STATUS_WAITING.
602
603 """
604 assert self._op in self._job.ops
605 assert self._op.status in (constants.OP_STATUS_WAITING,
606 constants.OP_STATUS_CANCELING)
607
608
609 self._CheckCancel()
610
611 logging.debug("Opcode is now running")
612
613 self._op.status = constants.OP_STATUS_RUNNING
614 self._op.exec_timestamp = TimeStampNow()
615
616
617 self._queue.UpdateJobUnlocked(self._job)
618
619 @locking.ssynchronized(_QUEUE, shared=1)
621 """Internal feedback append function, with locks
622
623 """
624 self._job.log_serial += 1
625 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
626 self._queue.UpdateJobUnlocked(self._job, replicate=False)
627
644
656
658 """Submits jobs for processing.
659
660 See L{JobQueue.SubmitManyJobs}.
661
662 """
663
664 return self._queue.SubmitManyJobs(jobs)
665
668 - def __init__(self, fields, prev_job_info, prev_log_serial):
669 """Initializes this class.
670
671 @type fields: list of strings
672 @param fields: Fields requested by LUXI client
673 @type prev_job_info: string
674 @param prev_job_info: previous job info, as passed by the LUXI client
675 @type prev_log_serial: string
676 @param prev_log_serial: previous job serial, as passed by the LUXI client
677
678 """
679 self._squery = _SimpleJobQuery(fields)
680 self._prev_job_info = prev_job_info
681 self._prev_log_serial = prev_log_serial
682
719
722 - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
723 """Initializes this class.
724
725 @type filename: string
726 @param filename: Path to job file
727 @raises errors.InotifyError: if the notifier cannot be setup
728
729 """
730 self._wm = _inotify_wm_cls()
731 self._inotify_handler = \
732 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
733 self._notifier = \
734 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
735 try:
736 self._inotify_handler.enable()
737 except Exception:
738
739 self._notifier.stop()
740 raise
741
743 """Callback for inotify.
744
745 """
746 if not notifier_enabled:
747 self._inotify_handler.enable()
748
749 - def Wait(self, timeout):
750 """Waits for the job file to change.
751
752 @type timeout: float
753 @param timeout: Timeout in seconds
754 @return: Whether there have been events
755
756 """
757 assert timeout >= 0
758 have_events = self._notifier.check_events(timeout * 1000)
759 if have_events:
760 self._notifier.read_events()
761 self._notifier.process_events()
762 return have_events
763
765 """Closes underlying notifier and its file descriptor.
766
767 """
768 self._notifier.stop()
769
773 """Initializes this class.
774
775 @type filename: string
776 @param filename: Path to job file
777
778 """
779 self._filewaiter = None
780 self._filename = filename
781 self._waiter_cls = _waiter_cls
782
783 - def Wait(self, timeout):
784 """Waits for a job to change.
785
786 @type timeout: float
787 @param timeout: Timeout in seconds
788 @return: Whether there have been events
789
790 """
791 if self._filewaiter:
792 return self._filewaiter.Wait(timeout)
793
794
795
796
797
798 self._filewaiter = self._waiter_cls(self._filename)
799
800 return True
801
803 """Closes underlying waiter.
804
805 """
806 if self._filewaiter:
807 self._filewaiter.Close()
808
811 """Helper class using inotify to wait for changes in a job file.
812
813 This class takes a previous job status and serial, and alerts the client when
814 the current job status has changed.
815
816 """
817 @staticmethod
819 if counter.next() > 0:
820
821
822
823 time.sleep(0.1)
824
825 job = job_load_fn()
826 if not job:
827 raise errors.JobLost()
828
829 result = check_fn(job)
830 if result is None:
831 raise utils.RetryAgain()
832
833 return result
834
835 - def __call__(self, filename, job_load_fn,
836 fields, prev_job_info, prev_log_serial, timeout,
837 _waiter_cls=_JobChangesWaiter):
838 """Waits for changes on a job.
839
840 @type filename: string
841 @param filename: File on which to wait for changes
842 @type job_load_fn: callable
843 @param job_load_fn: Function to load job
844 @type fields: list of strings
845 @param fields: Which fields to check for changes
846 @type prev_job_info: list or None
847 @param prev_job_info: Last job information returned
848 @type prev_log_serial: int
849 @param prev_log_serial: Last job message serial number
850 @type timeout: float
851 @param timeout: maximum time to wait in seconds
852
853 """
854 counter = itertools.count()
855 try:
856 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
857 waiter = _waiter_cls(filename)
858 try:
859 return utils.Retry(compat.partial(self._CheckForChanges,
860 counter, job_load_fn, check_fn),
861 utils.RETRY_REMAINING_TIME, timeout,
862 wait_fn=waiter.Wait)
863 finally:
864 waiter.Close()
865 except errors.JobLost:
866 return None
867 except utils.RetryTimeout:
868 return constants.JOB_NOTCHANGED
869
881
885 """Initializes this class.
886
887 """
888 self._fn = fn
889 self._next = None
890
892 """Gets the next timeout if necessary.
893
894 """
895 if self._next is None:
896 self._next = self._fn()
897
899 """Returns the next timeout.
900
901 """
902 self._Advance()
903 return self._next
904
906 """Returns the current timeout and advances the internal state.
907
908 """
909 self._Advance()
910 result = self._next
911 self._next = None
912 return result
913
914
915 -class _OpExecContext:
916 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
917 """Initializes this class.
918
919 """
920 self.op = op
921 self.index = index
922 self.log_prefix = log_prefix
923 self.summary = op.input.Summary()
924
925
926 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
927 self.jobdeps = op.input.depends[:]
928 else:
929 self.jobdeps = None
930
931 self._timeout_strategy_factory = timeout_strategy_factory
932 self._ResetTimeoutStrategy()
933
935 """Creates a new timeout strategy.
936
937 """
938 self._timeout_strategy = \
939 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
940
942 """Checks whether priority can and should be increased.
943
944 Called when locks couldn't be acquired.
945
946 """
947 op = self.op
948
949
950
951 if (self._timeout_strategy.Peek() is None and
952 op.priority > constants.OP_PRIO_HIGHEST):
953 logging.debug("Increasing priority")
954 op.priority -= 1
955 self._ResetTimeoutStrategy()
956 return True
957
958 return False
959
961 """Returns the next lock acquire timeout.
962
963 """
964 return self._timeout_strategy.Next()
965
968 (DEFER,
969 WAITDEP,
970 FINISHED) = range(1, 4)
971
974 """Initializes this class.
975
976 """
977 self.queue = queue
978 self.opexec_fn = opexec_fn
979 self.job = job
980 self._timeout_strategy_factory = _timeout_strategy_factory
981
982 @staticmethod
984 """Locates the next opcode to run.
985
986 @type job: L{_QueuedJob}
987 @param job: Job object
988 @param timeout_strategy_factory: Callable to create new timeout strategy
989
990 """
991
992
993
994
995 if job.ops_iter is None:
996 job.ops_iter = enumerate(job.ops)
997
998
999 while True:
1000 try:
1001 (idx, op) = job.ops_iter.next()
1002 except StopIteration:
1003 raise errors.ProgrammerError("Called for a finished job")
1004
1005 if op.status == constants.OP_STATUS_RUNNING:
1006
1007 raise errors.ProgrammerError("Called for job marked as running")
1008
1009 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
1010 timeout_strategy_factory)
1011
1012 if op.status not in constants.OPS_FINALIZED:
1013 return opctx
1014
1015
1016
1017
1018
1019 logging.info("%s: opcode %s already processed, skipping",
1020 opctx.log_prefix, opctx.summary)
1021
1022 @staticmethod
1024 """Marks an opcode as waiting for locks.
1025
1026 The job's start timestamp is also set if necessary.
1027
1028 @type job: L{_QueuedJob}
1029 @param job: Job object
1030 @type op: L{_QueuedOpCode}
1031 @param op: Opcode object
1032
1033 """
1034 assert op in job.ops
1035 assert op.status in (constants.OP_STATUS_QUEUED,
1036 constants.OP_STATUS_WAITING)
1037
1038 update = False
1039
1040 op.result = None
1041
1042 if op.status == constants.OP_STATUS_QUEUED:
1043 op.status = constants.OP_STATUS_WAITING
1044 update = True
1045
1046 if op.start_timestamp is None:
1047 op.start_timestamp = TimeStampNow()
1048 update = True
1049
1050 if job.start_timestamp is None:
1051 job.start_timestamp = op.start_timestamp
1052 update = True
1053
1054 assert op.status == constants.OP_STATUS_WAITING
1055
1056 return update
1057
1058 @staticmethod
1060 """Checks if an opcode has dependencies and if so, processes them.
1061
1062 @type queue: L{JobQueue}
1063 @param queue: Queue object
1064 @type job: L{_QueuedJob}
1065 @param job: Job object
1066 @type opctx: L{_OpExecContext}
1067 @param opctx: Opcode execution context
1068 @rtype: bool
1069 @return: Whether opcode will be re-scheduled by dependency tracker
1070
1071 """
1072 op = opctx.op
1073
1074 result = False
1075
1076 while opctx.jobdeps:
1077 (dep_job_id, dep_status) = opctx.jobdeps[0]
1078
1079 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1080 dep_status)
1081 assert ht.TNonEmptyString(depmsg), "No dependency message"
1082
1083 logging.info("%s: %s", opctx.log_prefix, depmsg)
1084
1085 if depresult == _JobDependencyManager.CONTINUE:
1086
1087 opctx.jobdeps.pop(0)
1088
1089 elif depresult == _JobDependencyManager.WAIT:
1090
1091
1092 result = True
1093 break
1094
1095 elif depresult == _JobDependencyManager.CANCEL:
1096
1097 job.Cancel()
1098 assert op.status == constants.OP_STATUS_CANCELING
1099 break
1100
1101 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1102 _JobDependencyManager.ERROR):
1103
1104 op.status = constants.OP_STATUS_ERROR
1105 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1106 break
1107
1108 else:
1109 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1110 depresult)
1111
1112 return result
1113
1115 """Processes one opcode and returns the result.
1116
1117 """
1118 op = opctx.op
1119
1120 assert op.status in (constants.OP_STATUS_WAITING,
1121 constants.OP_STATUS_CANCELING)
1122
1123
1124 if op.status == constants.OP_STATUS_CANCELING:
1125 return (constants.OP_STATUS_CANCELING, None)
1126
1127 timeout = opctx.GetNextLockTimeout()
1128
1129 try:
1130
1131 result = self.opexec_fn(op.input,
1132 _OpExecCallbacks(self.queue, self.job, op),
1133 timeout=timeout)
1134 except mcpu.LockAcquireTimeout:
1135 assert timeout is not None, "Received timeout for blocking acquire"
1136 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1137
1138 assert op.status in (constants.OP_STATUS_WAITING,
1139 constants.OP_STATUS_CANCELING)
1140
1141
1142 if op.status == constants.OP_STATUS_CANCELING:
1143 return (constants.OP_STATUS_CANCELING, None)
1144
1145
1146 if not self.queue.AcceptingJobsUnlocked():
1147 return (constants.OP_STATUS_QUEUED, None)
1148
1149
1150 return (constants.OP_STATUS_WAITING, None)
1151 except CancelJob:
1152 logging.exception("%s: Canceling job", opctx.log_prefix)
1153 assert op.status == constants.OP_STATUS_CANCELING
1154 return (constants.OP_STATUS_CANCELING, None)
1155
1156 except QueueShutdown:
1157 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1158
1159 assert op.status == constants.OP_STATUS_WAITING
1160
1161
1162 return (constants.OP_STATUS_QUEUED, None)
1163
1164 except Exception, err:
1165 logging.exception("%s: Caught exception in %s",
1166 opctx.log_prefix, opctx.summary)
1167 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1168 else:
1169 logging.debug("%s: %s successful",
1170 opctx.log_prefix, opctx.summary)
1171 return (constants.OP_STATUS_SUCCESS, result)
1172
1174 """Continues execution of a job.
1175
1176 @param _nextop_fn: Callback function for tests
1177 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1178 be deferred and C{WAITDEP} if the dependency manager
1179 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1180
1181 """
1182 queue = self.queue
1183 job = self.job
1184
1185 logging.debug("Processing job %s", job.id)
1186
1187 queue.acquire(shared=1)
1188 try:
1189 opcount = len(job.ops)
1190
1191 assert job.writable, "Expected writable job"
1192
1193
1194 if job.CalcStatus() in constants.JOBS_FINALIZED:
1195 return self.FINISHED
1196
1197
1198 if job.cur_opctx:
1199 opctx = job.cur_opctx
1200 job.cur_opctx = None
1201 else:
1202 if __debug__ and _nextop_fn:
1203 _nextop_fn()
1204 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1205
1206 op = opctx.op
1207
1208
1209 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1210 constants.OP_STATUS_CANCELING)
1211 for i in job.ops[opctx.index + 1:])
1212
1213 assert op.status in (constants.OP_STATUS_QUEUED,
1214 constants.OP_STATUS_WAITING,
1215 constants.OP_STATUS_CANCELING)
1216
1217 assert (op.priority <= constants.OP_PRIO_LOWEST and
1218 op.priority >= constants.OP_PRIO_HIGHEST)
1219
1220 waitjob = None
1221
1222 if op.status != constants.OP_STATUS_CANCELING:
1223 assert op.status in (constants.OP_STATUS_QUEUED,
1224 constants.OP_STATUS_WAITING)
1225
1226
1227 if self._MarkWaitlock(job, op):
1228
1229 queue.UpdateJobUnlocked(job)
1230
1231 assert op.status == constants.OP_STATUS_WAITING
1232 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1233 assert job.start_timestamp and op.start_timestamp
1234 assert waitjob is None
1235
1236
1237 waitjob = self._CheckDependencies(queue, job, opctx)
1238
1239 assert op.status in (constants.OP_STATUS_WAITING,
1240 constants.OP_STATUS_CANCELING,
1241 constants.OP_STATUS_ERROR)
1242
1243 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1244 constants.OP_STATUS_ERROR)):
1245 logging.info("%s: opcode %s waiting for locks",
1246 opctx.log_prefix, opctx.summary)
1247
1248 assert not opctx.jobdeps, "Not all dependencies were removed"
1249
1250 queue.release()
1251 try:
1252 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1253 finally:
1254 queue.acquire(shared=1)
1255
1256 op.status = op_status
1257 op.result = op_result
1258
1259 assert not waitjob
1260
1261 if op.status in (constants.OP_STATUS_WAITING,
1262 constants.OP_STATUS_QUEUED):
1263
1264
1265 assert not op.end_timestamp
1266 else:
1267
1268 op.end_timestamp = TimeStampNow()
1269
1270 if op.status == constants.OP_STATUS_CANCELING:
1271 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1272 for i in job.ops[opctx.index:])
1273 else:
1274 assert op.status in constants.OPS_FINALIZED
1275
1276 if op.status == constants.OP_STATUS_QUEUED:
1277
1278 assert not waitjob
1279
1280 finalize = False
1281
1282
1283 job.cur_opctx = None
1284
1285
1286 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1287
1288 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1289 finalize = False
1290
1291 if not waitjob and opctx.CheckPriorityIncrease():
1292
1293 queue.UpdateJobUnlocked(job)
1294
1295
1296 job.cur_opctx = opctx
1297
1298 assert (op.priority <= constants.OP_PRIO_LOWEST and
1299 op.priority >= constants.OP_PRIO_HIGHEST)
1300
1301
1302 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1303
1304 else:
1305
1306 assert (opctx.index == 0 or
1307 compat.all(i.status == constants.OP_STATUS_SUCCESS
1308 for i in job.ops[:opctx.index]))
1309
1310
1311 job.cur_opctx = None
1312
1313 if op.status == constants.OP_STATUS_SUCCESS:
1314 finalize = False
1315
1316 elif op.status == constants.OP_STATUS_ERROR:
1317
1318 assert errors.GetEncodedError(job.ops[opctx.index].result)
1319
1320 to_encode = errors.OpExecError("Preceding opcode failed")
1321 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1322 _EncodeOpError(to_encode))
1323 finalize = True
1324
1325
1326 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1327 errors.GetEncodedError(i.result)
1328 for i in job.ops[opctx.index:])
1329
1330 elif op.status == constants.OP_STATUS_CANCELING:
1331 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1332 "Job canceled by request")
1333 finalize = True
1334
1335 else:
1336 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1337
1338 if opctx.index == (opcount - 1):
1339
1340 finalize = True
1341
1342 if finalize:
1343
1344 job.Finalize()
1345
1346
1347
1348 queue.UpdateJobUnlocked(job)
1349
1350 assert not waitjob
1351
1352 if finalize:
1353 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1354 return self.FINISHED
1355
1356 assert not waitjob or queue.depmgr.JobWaiting(job)
1357
1358 if waitjob:
1359 return self.WAITDEP
1360 else:
1361 return self.DEFER
1362 finally:
1363 assert job.writable, "Job became read-only while being processed"
1364 queue.release()
1365
1388
1391 """The actual job workers.
1392
1393 """
1395 """Job executor.
1396
1397 @type job: L{_QueuedJob}
1398 @param job: the job to be processed
1399
1400 """
1401 assert job.writable, "Expected writable job"
1402
1403
1404
1405
1406 job.processor_lock.acquire()
1407 try:
1408 return self._RunTaskInner(job)
1409 finally:
1410 job.processor_lock.release()
1411
1432
1433 @staticmethod
1435 """Updates the worker thread name to include a short summary of the opcode.
1436
1437 @param setname_fn: Callable setting worker thread name
1438 @param execop_fn: Callable for executing opcode (usually
1439 L{mcpu.Processor.ExecOpCode})
1440
1441 """
1442 setname_fn(op)
1443 try:
1444 return execop_fn(op, *args, **kwargs)
1445 finally:
1446 setname_fn(None)
1447
1448 @staticmethod
1450 """Sets the worker thread name.
1451
1452 @type job: L{_QueuedJob}
1453 @type op: L{opcodes.OpCode}
1454
1455 """
1456 parts = ["Job%s" % job.id]
1457
1458 if op:
1459 parts.append(op.TinySummary())
1460
1461 return "/".join(parts)
1462
1465 """Simple class implementing a job-processing workerpool.
1466
1467 """
1473
1476 """Keeps track of job dependencies.
1477
1478 """
1479 (WAIT,
1480 ERROR,
1481 CANCEL,
1482 CONTINUE,
1483 WRONGSTATUS) = range(1, 6)
1484
1485 - def __init__(self, getstatus_fn, enqueue_fn):
1486 """Initializes this class.
1487
1488 """
1489 self._getstatus_fn = getstatus_fn
1490 self._enqueue_fn = enqueue_fn
1491
1492 self._waiters = {}
1493 self._lock = locking.SharedLock("JobDepMgr")
1494
1495 @locking.ssynchronized(_LOCK, shared=1)
1497 """Retrieves information about waiting jobs.
1498
1499 @type requested: set
1500 @param requested: Requested information, see C{query.LQ_*}
1501
1502 """
1503
1504
1505
1506 return [("job/%s" % job_id, None, None,
1507 [("job", [job.id for job in waiters])])
1508 for job_id, waiters in self._waiters.items()
1509 if waiters]
1510
1511 @locking.ssynchronized(_LOCK, shared=1)
1513 """Checks if a job is waiting.
1514
1515 """
1516 return compat.any(job in jobs
1517 for jobs in self._waiters.values())
1518
1519 @locking.ssynchronized(_LOCK)
1521 """Checks if a dependency job has the requested status.
1522
1523 If the other job is not yet in a finalized status, the calling job will be
1524 notified (re-added to the workerpool) at a later point.
1525
1526 @type job: L{_QueuedJob}
1527 @param job: Job object
1528 @type dep_job_id: int
1529 @param dep_job_id: ID of dependency job
1530 @type dep_status: list
1531 @param dep_status: Required status
1532
1533 """
1534 assert ht.TJobId(job.id)
1535 assert ht.TJobId(dep_job_id)
1536 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1537
1538 if job.id == dep_job_id:
1539 return (self.ERROR, "Job can't depend on itself")
1540
1541
1542 try:
1543 status = self._getstatus_fn(dep_job_id)
1544 except errors.JobLost, err:
1545 return (self.ERROR, "Dependency error: %s" % err)
1546
1547 assert status in constants.JOB_STATUS_ALL
1548
1549 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1550
1551 if status not in constants.JOBS_FINALIZED:
1552
1553 job_id_waiters.add(job)
1554 return (self.WAIT,
1555 "Need to wait for job %s, wanted status '%s'" %
1556 (dep_job_id, dep_status))
1557
1558
1559 if job in job_id_waiters:
1560 job_id_waiters.remove(job)
1561
1562 if (status == constants.JOB_STATUS_CANCELED and
1563 constants.JOB_STATUS_CANCELED not in dep_status):
1564 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1565
1566 elif not dep_status or status in dep_status:
1567 return (self.CONTINUE,
1568 "Dependency job %s finished with status '%s'" %
1569 (dep_job_id, status))
1570
1571 else:
1572 return (self.WRONGSTATUS,
1573 "Dependency job %s finished with status '%s',"
1574 " not one of '%s' as required" %
1575 (dep_job_id, status, utils.CommaJoin(dep_status)))
1576
1578 """Remove all jobs without actual waiters.
1579
1580 """
1581 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1582 if not waiters]:
1583 del self._waiters[job_id]
1584
1586 """Notifies all jobs waiting for a certain job ID.
1587
1588 @attention: Do not call until L{CheckAndRegister} returned a status other
1589 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1590 @type job_id: int
1591 @param job_id: Job ID
1592
1593 """
1594 assert ht.TJobId(job_id)
1595
1596 self._lock.acquire()
1597 try:
1598 self._RemoveEmptyWaitersUnlocked()
1599
1600 jobs = self._waiters.pop(job_id, None)
1601 finally:
1602 self._lock.release()
1603
1604 if jobs:
1605
1606 logging.debug("Re-adding %s jobs which were waiting for job %s",
1607 len(jobs), job_id)
1608 self._enqueue_fn(jobs)
1609
1612 """Decorator for "public" functions.
1613
1614 This function should be used for all 'public' functions. That is,
1615 functions usually called from other classes. Note that this should
1616 be applied only to methods (not plain functions), since it expects
1617 that the decorated function is called with a first argument that has
1618 a '_queue_filelock' argument.
1619
1620 @warning: Use this decorator only after locking.ssynchronized
1621
1622 Example::
1623 @locking.ssynchronized(_LOCK)
1624 @_RequireOpenQueue
1625 def Example(self):
1626 pass
1627
1628 """
1629 def wrapper(self, *args, **kwargs):
1630
1631 assert self._queue_filelock is not None, "Queue should be open"
1632 return fn(self, *args, **kwargs)
1633 return wrapper
1634
1637 """Decorator checking for a non-drained queue.
1638
1639 To be used with functions submitting new jobs.
1640
1641 """
1642 def wrapper(self, *args, **kwargs):
1643 """Wrapper function.
1644
1645 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1646
1647 """
1648
1649
1650
1651 if self._drained:
1652 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1653
1654 if not self._accepting_jobs:
1655 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1656
1657 return fn(self, *args, **kwargs)
1658 return wrapper
1659
1662 """Queue used to manage the jobs.
1663
1664 """
1666 """Constructor for JobQueue.
1667
1668 The constructor will initialize the job queue object and then
1669 start loading the current jobs from disk, either for starting them
1670 (if they were queue) or for aborting them (if they were already
1671 running).
1672
1673 @type context: GanetiContext
1674 @param context: the context object for access to the configuration
1675 data and other ganeti objects
1676
1677 """
1678 self.context = context
1679 self._memcache = weakref.WeakValueDictionary()
1680 self._my_hostname = netutils.Hostname.GetSysName()
1681
1682
1683
1684
1685
1686
1687 self._lock = locking.SharedLock("JobQueue")
1688
1689 self.acquire = self._lock.acquire
1690 self.release = self._lock.release
1691
1692
1693 self._accepting_jobs = True
1694
1695
1696
1697 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1698
1699
1700 self._last_serial = jstore.ReadSerial()
1701 assert self._last_serial is not None, ("Serial file was modified between"
1702 " check in jstore and here")
1703
1704
1705 self._nodes = dict((n.name, n.primary_ip)
1706 for n in self.context.cfg.GetAllNodesInfo().values()
1707 if n.master_candidate)
1708
1709
1710 self._nodes.pop(self._my_hostname, None)
1711
1712
1713
1714 self._queue_size = None
1715 self._UpdateQueueSizeUnlocked()
1716 assert ht.TInt(self._queue_size)
1717 self._drained = jstore.CheckDrainFlag()
1718
1719
1720 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1721 self._EnqueueJobs)
1722 self.context.glm.AddToLockMonitor(self.depmgr)
1723
1724
1725 self._wpool = _JobQueueWorkerPool(self)
1726 try:
1727 self._InspectQueue()
1728 except:
1729 self._wpool.TerminateWorkers()
1730 raise
1731
1732 @locking.ssynchronized(_LOCK)
1733 @_RequireOpenQueue
1735 """Loads the whole job queue and resumes unfinished jobs.
1736
1737 This function needs the lock here because WorkerPool.AddTask() may start a
1738 job while we're still doing our work.
1739
1740 """
1741 logging.info("Inspecting job queue")
1742
1743 restartjobs = []
1744
1745 all_job_ids = self._GetJobIDsUnlocked()
1746 jobs_count = len(all_job_ids)
1747 lastinfo = time.time()
1748 for idx, job_id in enumerate(all_job_ids):
1749
1750 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1751 idx == (jobs_count - 1)):
1752 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1753 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1754 lastinfo = time.time()
1755
1756 job = self._LoadJobUnlocked(job_id)
1757
1758
1759 if job is None:
1760 continue
1761
1762 status = job.CalcStatus()
1763
1764 if status == constants.JOB_STATUS_QUEUED:
1765 restartjobs.append(job)
1766
1767 elif status in (constants.JOB_STATUS_RUNNING,
1768 constants.JOB_STATUS_WAITING,
1769 constants.JOB_STATUS_CANCELING):
1770 logging.warning("Unfinished job %s found: %s", job.id, job)
1771
1772 if status == constants.JOB_STATUS_WAITING:
1773
1774 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1775 restartjobs.append(job)
1776 else:
1777 to_encode = errors.OpExecError("Unclean master daemon shutdown")
1778 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1779 _EncodeOpError(to_encode))
1780 job.Finalize()
1781
1782 self.UpdateJobUnlocked(job)
1783
1784 if restartjobs:
1785 logging.info("Restarting %s jobs", len(restartjobs))
1786 self._EnqueueJobsUnlocked(restartjobs)
1787
1788 logging.info("Job queue inspection finished")
1789
1791 """Gets RPC runner with context.
1792
1793 """
1794 return rpc.JobQueueRunner(self.context, address_list)
1795
1796 @locking.ssynchronized(_LOCK)
1797 @_RequireOpenQueue
1799 """Register a new node with the queue.
1800
1801 @type node: L{objects.Node}
1802 @param node: the node object to be added
1803
1804 """
1805 node_name = node.name
1806 assert node_name != self._my_hostname
1807
1808
1809 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1810 msg = result.fail_msg
1811 if msg:
1812 logging.warning("Cannot cleanup queue directory on node %s: %s",
1813 node_name, msg)
1814
1815 if not node.master_candidate:
1816
1817 self._nodes.pop(node_name, None)
1818
1819 return
1820
1821
1822 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1823
1824
1825 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1826
1827
1828 addrs = [node.primary_ip]
1829
1830 for file_name in files:
1831
1832 content = utils.ReadFile(file_name)
1833
1834 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1835 file_name, content)
1836 msg = result[node_name].fail_msg
1837 if msg:
1838 logging.error("Failed to upload file %s to node %s: %s",
1839 file_name, node_name, msg)
1840
1841
1842 result = \
1843 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1844 self._drained)
1845 msg = result[node_name].fail_msg
1846 if msg:
1847 logging.error("Failed to set queue drained flag on node %s: %s",
1848 node_name, msg)
1849
1850 self._nodes[node_name] = node.primary_ip
1851
1852 @locking.ssynchronized(_LOCK)
1853 @_RequireOpenQueue
1855 """Callback called when removing nodes from the cluster.
1856
1857 @type node_name: str
1858 @param node_name: the name of the node to remove
1859
1860 """
1861 self._nodes.pop(node_name, None)
1862
1863 @staticmethod
1865 """Verifies the status of an RPC call.
1866
1867 Since we aim to keep consistency should this node (the current
1868 master) fail, we will log errors if our rpc fail, and especially
1869 log the case when more than half of the nodes fails.
1870
1871 @param result: the data as returned from the rpc call
1872 @type nodes: list
1873 @param nodes: the list of nodes we made the call to
1874 @type failmsg: str
1875 @param failmsg: the identifier to be used for logging
1876
1877 """
1878 failed = []
1879 success = []
1880
1881 for node in nodes:
1882 msg = result[node].fail_msg
1883 if msg:
1884 failed.append(node)
1885 logging.error("RPC call %s (%s) failed on node %s: %s",
1886 result[node].call, failmsg, node, msg)
1887 else:
1888 success.append(node)
1889
1890
1891 if (len(success) + 1) < len(failed):
1892
1893 logging.error("More than half of the nodes failed")
1894
1896 """Helper for returning the node name/ip list.
1897
1898 @rtype: (list, list)
1899 @return: a tuple of two lists, the first one with the node
1900 names and the second one with the node addresses
1901
1902 """
1903
1904 name_list = self._nodes.keys()
1905 addr_list = [self._nodes[name] for name in name_list]
1906 return name_list, addr_list
1907
1909 """Writes a file locally and then replicates it to all nodes.
1910
1911 This function will replace the contents of a file on the local
1912 node and then replicate it to all the other nodes we have.
1913
1914 @type file_name: str
1915 @param file_name: the path of the file to be replicated
1916 @type data: str
1917 @param data: the new contents of the file
1918 @type replicate: boolean
1919 @param replicate: whether to spread the changes to the remote nodes
1920
1921 """
1922 getents = runtime.GetEnts()
1923 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1924 gid=getents.daemons_gid,
1925 mode=constants.JOB_QUEUE_FILES_PERMS)
1926
1927 if replicate:
1928 names, addrs = self._GetNodeIp()
1929 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1930 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1931
1933 """Renames a file locally and then replicate the change.
1934
1935 This function will rename a file in the local queue directory
1936 and then replicate this rename to all the other nodes we have.
1937
1938 @type rename: list of (old, new)
1939 @param rename: List containing tuples mapping old to new names
1940
1941 """
1942
1943 for old, new in rename:
1944 utils.RenameFile(old, new, mkdir=True)
1945
1946
1947 names, addrs = self._GetNodeIp()
1948 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1949 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1950
1952 """Generates a new job identifier.
1953
1954 Job identifiers are unique during the lifetime of a cluster.
1955
1956 @type count: integer
1957 @param count: how many serials to return
1958 @rtype: list of int
1959 @return: a list of job identifiers.
1960
1961 """
1962 assert ht.TNonNegativeInt(count)
1963
1964
1965 serial = self._last_serial + count
1966
1967
1968 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1969 "%s\n" % serial, True)
1970
1971 result = [jstore.FormatJobID(v)
1972 for v in range(self._last_serial + 1, serial + 1)]
1973
1974
1975 self._last_serial = serial
1976
1977 assert len(result) == count
1978
1979 return result
1980
1981 @staticmethod
1983 """Returns the job file for a given job id.
1984
1985 @type job_id: str
1986 @param job_id: the job identifier
1987 @rtype: str
1988 @return: the path to the job file
1989
1990 """
1991 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1992
1993 @staticmethod
1995 """Returns the archived job file for a give job id.
1996
1997 @type job_id: str
1998 @param job_id: the job identifier
1999 @rtype: str
2000 @return: the path to the archived job file
2001
2002 """
2003 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
2004 jstore.GetArchiveDirectory(job_id),
2005 "job-%s" % job_id)
2006
2007 @staticmethod
2009 """Build list of directories containing job files.
2010
2011 @type archived: bool
2012 @param archived: Whether to include directories for archived jobs
2013 @rtype: list
2014
2015 """
2016 result = [pathutils.QUEUE_DIR]
2017
2018 if archived:
2019 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2020 result.extend(map(compat.partial(utils.PathJoin, archive_path),
2021 utils.ListVisibleFiles(archive_path)))
2022
2023 return result
2024
2025 @classmethod
2027 """Return all known job IDs.
2028
2029 The method only looks at disk because it's a requirement that all
2030 jobs are present on disk (so in the _memcache we don't have any
2031 extra IDs).
2032
2033 @type sort: boolean
2034 @param sort: perform sorting on the returned job ids
2035 @rtype: list
2036 @return: the list of job IDs
2037
2038 """
2039 jlist = []
2040
2041 for path in cls._DetermineJobDirectories(archived):
2042 for filename in utils.ListVisibleFiles(path):
2043 m = constants.JOB_FILE_RE.match(filename)
2044 if m:
2045 jlist.append(int(m.group(1)))
2046
2047 if sort:
2048 jlist.sort()
2049 return jlist
2050
2052 """Loads a job from the disk or memory.
2053
2054 Given a job id, this will return the cached job object if
2055 existing, or try to load the job from the disk. If loading from
2056 disk, it will also add the job to the cache.
2057
2058 @type job_id: int
2059 @param job_id: the job id
2060 @rtype: L{_QueuedJob} or None
2061 @return: either None or the job object
2062
2063 """
2064 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2065
2066 job = self._memcache.get(job_id, None)
2067 if job:
2068 logging.debug("Found job %s in memcache", job_id)
2069 assert job.writable, "Found read-only job in memcache"
2070 return job
2071
2072 try:
2073 job = self._LoadJobFromDisk(job_id, False)
2074 if job is None:
2075 return job
2076 except errors.JobFileCorrupted:
2077 old_path = self._GetJobPath(job_id)
2078 new_path = self._GetArchivedJobPath(job_id)
2079 if old_path == new_path:
2080
2081 logging.exception("Can't parse job %s", job_id)
2082 else:
2083
2084 logging.exception("Can't parse job %s, will archive.", job_id)
2085 self._RenameFilesUnlocked([(old_path, new_path)])
2086 return None
2087
2088 assert job.writable, "Job just loaded is not writable"
2089
2090 self._memcache[job_id] = job
2091 logging.debug("Added job %s to the cache", job_id)
2092 return job
2093
2095 """Load the given job file from disk.
2096
2097 Given a job file, read, load and restore it in a _QueuedJob format.
2098
2099 @type job_id: int
2100 @param job_id: job identifier
2101 @type try_archived: bool
2102 @param try_archived: Whether to try loading an archived job
2103 @rtype: L{_QueuedJob} or None
2104 @return: either None or the job object
2105
2106 """
2107 path_functions = [(self._GetJobPath, False)]
2108
2109 if try_archived:
2110 path_functions.append((self._GetArchivedJobPath, True))
2111
2112 raw_data = None
2113 archived = None
2114
2115 for (fn, archived) in path_functions:
2116 filepath = fn(job_id)
2117 logging.debug("Loading job from %s", filepath)
2118 try:
2119 raw_data = utils.ReadFile(filepath)
2120 except EnvironmentError, err:
2121 if err.errno != errno.ENOENT:
2122 raise
2123 else:
2124 break
2125
2126 if not raw_data:
2127 return None
2128
2129 if writable is None:
2130 writable = not archived
2131
2132 try:
2133 data = serializer.LoadJson(raw_data)
2134 job = _QueuedJob.Restore(self, data, writable, archived)
2135 except Exception, err:
2136 raise errors.JobFileCorrupted(err)
2137
2138 return job
2139
2141 """Load the given job file from disk.
2142
2143 Given a job file, read, load and restore it in a _QueuedJob format.
2144 In case of error reading the job, it gets returned as None, and the
2145 exception is logged.
2146
2147 @type job_id: int
2148 @param job_id: job identifier
2149 @type try_archived: bool
2150 @param try_archived: Whether to try loading an archived job
2151 @rtype: L{_QueuedJob} or None
2152 @return: either None or the job object
2153
2154 """
2155 try:
2156 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2157 except (errors.JobFileCorrupted, EnvironmentError):
2158 logging.exception("Can't load/parse job %s", job_id)
2159 return None
2160
2162 """Update the queue size.
2163
2164 """
2165 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2166
2167 @locking.ssynchronized(_LOCK)
2168 @_RequireOpenQueue
2170 """Sets the drain flag for the queue.
2171
2172 @type drain_flag: boolean
2173 @param drain_flag: Whether to set or unset the drain flag
2174
2175 """
2176
2177 jstore.SetDrainFlag(drain_flag)
2178
2179 self._drained = drain_flag
2180
2181
2182 (names, addrs) = self._GetNodeIp()
2183 result = \
2184 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2185 self._CheckRpcResult(result, self._nodes,
2186 "Setting queue drain flag to %s" % drain_flag)
2187
2188 return True
2189
2190 @_RequireOpenQueue
2192 """Create and store a new job.
2193
2194 This enters the job into our job queue and also puts it on the new
2195 queue, in order for it to be picked up by the queue processors.
2196
2197 @type job_id: job ID
2198 @param job_id: the job ID for the new job
2199 @type ops: list
2200 @param ops: The list of OpCodes that will become the new job.
2201 @rtype: L{_QueuedJob}
2202 @return: the job object to be queued
2203 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2204 @raise errors.GenericError: If an opcode is not valid
2205
2206 """
2207 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2208 raise errors.JobQueueFull()
2209
2210 job = _QueuedJob(self, job_id, ops, True)
2211
2212 for idx, op in enumerate(job.ops):
2213
2214 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2215 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2216 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2217 " are %s" % (idx, op.priority, allowed))
2218
2219
2220 dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
2221 if not opcodes_base.TNoRelativeJobDependencies(dependencies):
2222 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2223 " match %s: %s" %
2224 (idx, opcodes_base.TNoRelativeJobDependencies,
2225 dependencies))
2226
2227
2228 self.UpdateJobUnlocked(job)
2229
2230 self._queue_size += 1
2231
2232 logging.debug("Adding new job %s to the cache", job_id)
2233 self._memcache[job_id] = job
2234
2235 return job
2236
2237 @locking.ssynchronized(_LOCK)
2238 @_RequireOpenQueue
2239 @_RequireNonDrainedQueue
2249
2250 @locking.ssynchronized(_LOCK)
2251 @_RequireOpenQueue
2262
2263 @locking.ssynchronized(_LOCK)
2264 @_RequireOpenQueue
2265 @_RequireNonDrainedQueue
2267 """Create and store multiple jobs.
2268
2269 @see: L{_SubmitJobUnlocked}
2270
2271 """
2272 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2273
2274 (results, added_jobs) = \
2275 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2276
2277 self._EnqueueJobsUnlocked(added_jobs)
2278
2279 return results
2280
2281 @staticmethod
2288
2289 @staticmethod
2291 """Resolves relative job IDs in dependencies.
2292
2293 @type resolve_fn: callable
2294 @param resolve_fn: Function to resolve a relative job ID
2295 @type deps: list
2296 @param deps: Dependencies
2297 @rtype: tuple; (boolean, string or list)
2298 @return: If successful (first tuple item), the returned list contains
2299 resolved job IDs along with the requested status; if not successful,
2300 the second element is an error message
2301
2302 """
2303 result = []
2304
2305 for (dep_job_id, dep_status) in deps:
2306 if ht.TRelativeJobId(dep_job_id):
2307 assert ht.TInt(dep_job_id) and dep_job_id < 0
2308 try:
2309 job_id = resolve_fn(dep_job_id)
2310 except IndexError:
2311
2312 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2313 else:
2314 job_id = dep_job_id
2315
2316 result.append((job_id, dep_status))
2317
2318 return (True, result)
2319
2321 """Create and store multiple jobs.
2322
2323 @see: L{_SubmitJobUnlocked}
2324
2325 """
2326 results = []
2327 added_jobs = []
2328
2329 def resolve_fn(job_idx, reljobid):
2330 assert reljobid < 0
2331 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2332
2333 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2334 for op in ops:
2335 if getattr(op, opcodes_base.DEPEND_ATTR, None):
2336 (status, data) = \
2337 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2338 op.depends)
2339 if not status:
2340
2341 assert ht.TNonEmptyString(data), "No error message"
2342 break
2343
2344 op.depends = data
2345 else:
2346 try:
2347 job = self._SubmitJobUnlocked(job_id, ops)
2348 except errors.GenericError, err:
2349 status = False
2350 data = self._FormatSubmitError(str(err), ops)
2351 else:
2352 status = True
2353 data = job_id
2354 added_jobs.append(job)
2355
2356 results.append((status, data))
2357
2358 return (results, added_jobs)
2359
2360 @locking.ssynchronized(_LOCK)
2362 """Helper function to add jobs to worker pool's queue.
2363
2364 @type jobs: list
2365 @param jobs: List of all jobs
2366
2367 """
2368 return self._EnqueueJobsUnlocked(jobs)
2369
2371 """Helper function to add jobs to worker pool's queue.
2372
2373 @type jobs: list
2374 @param jobs: List of all jobs
2375
2376 """
2377 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2378 self._wpool.AddManyTasks([(job, ) for job in jobs],
2379 priority=[job.CalcPriority() for job in jobs],
2380 task_id=map(_GetIdAttr, jobs))
2381
2383 """Gets the status of a job for dependencies.
2384
2385 @type job_id: int
2386 @param job_id: Job ID
2387 @raise errors.JobLost: If job can't be found
2388
2389 """
2390
2391
2392
2393 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2394
2395 assert not job.writable, "Got writable job"
2396
2397 if job:
2398 return job.CalcStatus()
2399
2400 raise errors.JobLost("Job %s not found" % job_id)
2401
2402 @_RequireOpenQueue
2404 """Update a job's on disk storage.
2405
2406 After a job has been modified, this function needs to be called in
2407 order to write the changes to disk and replicate them to the other
2408 nodes.
2409
2410 @type job: L{_QueuedJob}
2411 @param job: the changed job
2412 @type replicate: boolean
2413 @param replicate: whether to replicate the change to remote nodes
2414
2415 """
2416 if __debug__:
2417 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2418 assert (finalized ^ (job.end_timestamp is None))
2419 assert job.writable, "Can't update read-only job"
2420 assert not job.archived, "Can't update archived job"
2421
2422 filename = self._GetJobPath(job.id)
2423 data = serializer.DumpJson(job.Serialize())
2424 logging.debug("Writing job %s to %s", job.id, filename)
2425 self._UpdateJobQueueFile(filename, data, replicate)
2426
2427 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2428 timeout):
2429 """Waits for changes in a job.
2430
2431 @type job_id: int
2432 @param job_id: Job identifier
2433 @type fields: list of strings
2434 @param fields: Which fields to check for changes
2435 @type prev_job_info: list or None
2436 @param prev_job_info: Last job information returned
2437 @type prev_log_serial: int
2438 @param prev_log_serial: Last job message serial number
2439 @type timeout: float
2440 @param timeout: maximum time to wait in seconds
2441 @rtype: tuple (job info, log entries)
2442 @return: a tuple of the job information as required via
2443 the fields parameter, and the log entries as a list
2444
2445 if the job has not changed and the timeout has expired,
2446 we instead return a special value,
2447 L{constants.JOB_NOTCHANGED}, which should be interpreted
2448 as such by the clients
2449
2450 """
2451 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2452 writable=False)
2453
2454 helper = _WaitForJobChangesHelper()
2455
2456 return helper(self._GetJobPath(job_id), load_fn,
2457 fields, prev_job_info, prev_log_serial, timeout)
2458
2459 @locking.ssynchronized(_LOCK)
2460 @_RequireOpenQueue
2462 """Cancels a job.
2463
2464 This will only succeed if the job has not started yet.
2465
2466 @type job_id: int
2467 @param job_id: job ID of job to be cancelled.
2468
2469 """
2470 logging.info("Cancelling job %s", job_id)
2471
2472 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2473
2474 @locking.ssynchronized(_LOCK)
2475 @_RequireOpenQueue
2477 """Changes a job's priority.
2478
2479 @type job_id: int
2480 @param job_id: ID of the job whose priority should be changed
2481 @type priority: int
2482 @param priority: New priority
2483
2484 """
2485 logging.info("Changing priority of job %s to %s", job_id, priority)
2486
2487 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2488 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2489 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2490 (priority, allowed))
2491
2492 def fn(job):
2493 (success, msg) = job.ChangePriority(priority)
2494
2495 if success:
2496 try:
2497 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2498 except workerpool.NoSuchTask:
2499 logging.debug("Job %s is not in workerpool at this time", job.id)
2500
2501 return (success, msg)
2502
2503 return self._ModifyJobUnlocked(job_id, fn)
2504
2506 """Modifies a job.
2507
2508 @type job_id: int
2509 @param job_id: Job ID
2510 @type mod_fn: callable
2511 @param mod_fn: Modifying function, receiving job object as parameter,
2512 returning tuple of (status boolean, message string)
2513
2514 """
2515 job = self._LoadJobUnlocked(job_id)
2516 if not job:
2517 logging.debug("Job %s not found", job_id)
2518 return (False, "Job %s not found" % job_id)
2519
2520 assert job.writable, "Can't modify read-only job"
2521 assert not job.archived, "Can't modify archived job"
2522
2523 (success, msg) = mod_fn(job)
2524
2525 if success:
2526
2527
2528 self.UpdateJobUnlocked(job)
2529
2530 return (success, msg)
2531
2532 @_RequireOpenQueue
2534 """Archives jobs.
2535
2536 @type jobs: list of L{_QueuedJob}
2537 @param jobs: Job objects
2538 @rtype: int
2539 @return: Number of archived jobs
2540
2541 """
2542 archive_jobs = []
2543 rename_files = []
2544 for job in jobs:
2545 assert job.writable, "Can't archive read-only job"
2546 assert not job.archived, "Can't cancel archived job"
2547
2548 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2549 logging.debug("Job %s is not yet done", job.id)
2550 continue
2551
2552 archive_jobs.append(job)
2553
2554 old = self._GetJobPath(job.id)
2555 new = self._GetArchivedJobPath(job.id)
2556 rename_files.append((old, new))
2557
2558
2559 self._RenameFilesUnlocked(rename_files)
2560
2561 logging.debug("Successfully archived job(s) %s",
2562 utils.CommaJoin(job.id for job in archive_jobs))
2563
2564
2565
2566
2567
2568 self._UpdateQueueSizeUnlocked()
2569 return len(archive_jobs)
2570
2571 @locking.ssynchronized(_LOCK)
2572 @_RequireOpenQueue
2574 """Archives a job.
2575
2576 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2577
2578 @type job_id: int
2579 @param job_id: Job ID of job to be archived.
2580 @rtype: bool
2581 @return: Whether job was archived
2582
2583 """
2584 logging.info("Archiving job %s", job_id)
2585
2586 job = self._LoadJobUnlocked(job_id)
2587 if not job:
2588 logging.debug("Job %s not found", job_id)
2589 return False
2590
2591 return self._ArchiveJobsUnlocked([job]) == 1
2592
2593 @locking.ssynchronized(_LOCK)
2594 @_RequireOpenQueue
2596 """Archives all jobs based on age.
2597
2598 The method will archive all jobs which are older than the age
2599 parameter. For jobs that don't have an end timestamp, the start
2600 timestamp will be considered. The special '-1' age will cause
2601 archival of all jobs (that are not running or queued).
2602
2603 @type age: int
2604 @param age: the minimum age in seconds
2605
2606 """
2607 logging.info("Archiving jobs with age more than %s seconds", age)
2608
2609 now = time.time()
2610 end_time = now + timeout
2611 archived_count = 0
2612 last_touched = 0
2613
2614 all_job_ids = self._GetJobIDsUnlocked()
2615 pending = []
2616 for idx, job_id in enumerate(all_job_ids):
2617 last_touched = idx + 1
2618
2619
2620
2621
2622 if time.time() > end_time:
2623 break
2624
2625
2626 job = self._LoadJobUnlocked(job_id)
2627 if job:
2628 if job.end_timestamp is None:
2629 if job.start_timestamp is None:
2630 job_age = job.received_timestamp
2631 else:
2632 job_age = job.start_timestamp
2633 else:
2634 job_age = job.end_timestamp
2635
2636 if age == -1 or now - job_age[0] > age:
2637 pending.append(job)
2638
2639
2640 if len(pending) >= 10:
2641 archived_count += self._ArchiveJobsUnlocked(pending)
2642 pending = []
2643
2644 if pending:
2645 archived_count += self._ArchiveJobsUnlocked(pending)
2646
2647 return (archived_count, len(all_job_ids) - last_touched)
2648
2649 - def _Query(self, fields, qfilter):
2650 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2651 namefield="id")
2652
2653
2654
2655
2656 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2657
2658 job_ids = qobj.RequestedNames()
2659
2660 list_all = (job_ids is None)
2661
2662 if list_all:
2663
2664
2665 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2666
2667 jobs = []
2668
2669 for job_id in job_ids:
2670 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2671 if job is not None or not list_all:
2672 jobs.append((job_id, job))
2673
2674 return (qobj, jobs, list_all)
2675
2677 """Returns a list of jobs in queue.
2678
2679 @type fields: sequence
2680 @param fields: List of wanted fields
2681 @type qfilter: None or query2 filter (list)
2682 @param qfilter: Query filter
2683
2684 """
2685 (qobj, ctx, _) = self._Query(fields, qfilter)
2686
2687 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2688
2690 """Returns a list of jobs in queue.
2691
2692 @type job_ids: list
2693 @param job_ids: sequence of job identifiers or None for all
2694 @type fields: list
2695 @param fields: names of fields to return
2696 @rtype: list
2697 @return: list one element per job, each element being list with
2698 the requested fields
2699
2700 """
2701
2702 job_ids = [int(jid) for jid in job_ids]
2703 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2704
2705 (qobj, ctx, _) = self._Query(fields, qfilter)
2706
2707 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2708
2709 @locking.ssynchronized(_LOCK)
2711 """Prepare to stop the job queue.
2712
2713 Disables execution of jobs in the workerpool and returns whether there are
2714 any jobs currently running. If the latter is the case, the job queue is not
2715 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2716 be called without interfering with any job. Queued and unfinished jobs will
2717 be resumed next time.
2718
2719 Once this function has been called no new job submissions will be accepted
2720 (see L{_RequireNonDrainedQueue}).
2721
2722 @rtype: bool
2723 @return: Whether there are any running jobs
2724
2725 """
2726 if self._accepting_jobs:
2727 self._accepting_jobs = False
2728
2729
2730 self._wpool.SetActive(False)
2731
2732 return self._wpool.HasRunningTasks()
2733
2735 """Returns whether jobs are accepted.
2736
2737 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2738 queue is shutting down.
2739
2740 @rtype: bool
2741
2742 """
2743 return self._accepting_jobs
2744
2745 @locking.ssynchronized(_LOCK)
2746 @_RequireOpenQueue
2748 """Stops the job queue.
2749
2750 This shutdowns all the worker threads an closes the queue.
2751
2752 """
2753 self._wpool.TerminateWorkers()
2754
2755 self._queue_filelock.Close()
2756 self._queue_filelock = None
2757