1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28 processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38 import operator
39
40 try:
41
42 from pyinotify import pyinotify
43 except ImportError:
44 import pyinotify
45
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
60 from ganeti import ht
61 from ganeti import query
62 from ganeti import qlang
63 from ganeti import pathutils
64 from ganeti import vcluster
65
66
67 JOBQUEUE_THREADS = 25
68
69
70 _LOCK = "_lock"
71 _QUEUE = "_queue"
72
73
74 _GetIdAttr = operator.attrgetter("id")
78 """Special exception to cancel a job.
79
80 """
81
84 """Special exception to abort a job when the job queue is shutting down.
85
86 """
87
90 """Returns the current timestamp.
91
92 @rtype: tuple
93 @return: the current time in the (seconds, microseconds) format
94
95 """
96 return utils.SplitTime(time.time())
97
105
108 """Wrapper for job queries.
109
110 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111
112 """
118
120 """Executes a job query using cached field list.
121
122 """
123 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124
127 """Encapsulates an opcode object.
128
129 @ivar log: holds the execution log and consists of tuples
130 of the form C{(log_serial, timestamp, level, message)}
131 @ivar input: the OpCode we encapsulate
132 @ivar status: the current status
133 @ivar result: the result of the LU execution
134 @ivar start_timestamp: timestamp for the start of the execution
135 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136 @ivar stop_timestamp: timestamp for the end of the execution
137
138 """
139 __slots__ = ["input", "status", "result", "log", "priority",
140 "start_timestamp", "exec_timestamp", "end_timestamp",
141 "__weakref__"]
142
144 """Initializes instances of this class.
145
146 @type op: L{opcodes.OpCode}
147 @param op: the opcode we encapsulate
148
149 """
150 self.input = op
151 self.status = constants.OP_STATUS_QUEUED
152 self.result = None
153 self.log = []
154 self.start_timestamp = None
155 self.exec_timestamp = None
156 self.end_timestamp = None
157
158
159 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160
161 @classmethod
163 """Restore the _QueuedOpCode from the serialized form.
164
165 @type state: dict
166 @param state: the serialized state
167 @rtype: _QueuedOpCode
168 @return: a new _QueuedOpCode instance
169
170 """
171 obj = _QueuedOpCode.__new__(cls)
172 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173 obj.status = state["status"]
174 obj.result = state["result"]
175 obj.log = state["log"]
176 obj.start_timestamp = state.get("start_timestamp", None)
177 obj.exec_timestamp = state.get("exec_timestamp", None)
178 obj.end_timestamp = state.get("end_timestamp", None)
179 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180 return obj
181
183 """Serializes this _QueuedOpCode.
184
185 @rtype: dict
186 @return: the dictionary holding the serialized state
187
188 """
189 return {
190 "input": self.input.__getstate__(),
191 "status": self.status,
192 "result": self.result,
193 "log": self.log,
194 "start_timestamp": self.start_timestamp,
195 "exec_timestamp": self.exec_timestamp,
196 "end_timestamp": self.end_timestamp,
197 "priority": self.priority,
198 }
199
202 """In-memory job representation.
203
204 This is what we use to track the user-submitted jobs. Locking must
205 be taken care of by users of this class.
206
207 @type queue: L{JobQueue}
208 @ivar queue: the parent queue
209 @ivar id: the job ID
210 @type ops: list
211 @ivar ops: the list of _QueuedOpCode that constitute the job
212 @type log_serial: int
213 @ivar log_serial: holds the index for the next log entry
214 @ivar received_timestamp: the timestamp for when the job was received
215 @ivar start_timestmap: the timestamp for start of execution
216 @ivar end_timestamp: the timestamp for end of execution
217 @ivar writable: Whether the job is allowed to be modified
218
219 """
220
221 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222 "received_timestamp", "start_timestamp", "end_timestamp",
223 "__weakref__", "processor_lock", "writable", "archived"]
224
226 """Extend the reason trail
227
228 Add the reason for all the opcodes of this job to be executed.
229
230 """
231 count = 0
232 for queued_op in self.ops:
233 op = queued_op.input
234 reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
235 reason_text = "job=%d;index=%d" % (self.id, count)
236 reason = getattr(op, "reason", [])
237 reason.append((reason_src, reason_text, utils.EpochNano()))
238 op.reason = reason
239 count = count + 1
240
241 - def __init__(self, queue, job_id, ops, writable):
242 """Constructor for the _QueuedJob.
243
244 @type queue: L{JobQueue}
245 @param queue: our parent queue
246 @type job_id: job_id
247 @param job_id: our job id
248 @type ops: list
249 @param ops: the list of opcodes we hold, which will be encapsulated
250 in _QueuedOpCodes
251 @type writable: bool
252 @param writable: Whether job can be modified
253
254 """
255 if not ops:
256 raise errors.GenericError("A job needs at least one opcode")
257
258 self.queue = queue
259 self.id = int(job_id)
260 self.ops = [_QueuedOpCode(op) for op in ops]
261 self._AddReasons()
262 self.log_serial = 0
263 self.received_timestamp = TimeStampNow()
264 self.start_timestamp = None
265 self.end_timestamp = None
266 self.archived = False
267
268 self._InitInMemory(self, writable)
269
270 assert not self.archived, "New jobs can not be marked as archived"
271
272 @staticmethod
274 """Initializes in-memory variables.
275
276 """
277 obj.writable = writable
278 obj.ops_iter = None
279 obj.cur_opctx = None
280
281
282 if writable:
283 obj.processor_lock = threading.Lock()
284 else:
285 obj.processor_lock = None
286
288 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
289 "id=%s" % self.id,
290 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
291
292 return "<%s at %#x>" % (" ".join(status), id(self))
293
294 @classmethod
295 - def Restore(cls, queue, state, writable, archived):
296 """Restore a _QueuedJob from serialized state:
297
298 @type queue: L{JobQueue}
299 @param queue: to which queue the restored job belongs
300 @type state: dict
301 @param state: the serialized state
302 @type writable: bool
303 @param writable: Whether job can be modified
304 @type archived: bool
305 @param archived: Whether job was already archived
306 @rtype: _JobQueue
307 @return: the restored _JobQueue instance
308
309 """
310 obj = _QueuedJob.__new__(cls)
311 obj.queue = queue
312 obj.id = int(state["id"])
313 obj.received_timestamp = state.get("received_timestamp", None)
314 obj.start_timestamp = state.get("start_timestamp", None)
315 obj.end_timestamp = state.get("end_timestamp", None)
316 obj.archived = archived
317
318 obj.ops = []
319 obj.log_serial = 0
320 for op_state in state["ops"]:
321 op = _QueuedOpCode.Restore(op_state)
322 for log_entry in op.log:
323 obj.log_serial = max(obj.log_serial, log_entry[0])
324 obj.ops.append(op)
325
326 cls._InitInMemory(obj, writable)
327
328 return obj
329
331 """Serialize the _JobQueue instance.
332
333 @rtype: dict
334 @return: the serialized state
335
336 """
337 return {
338 "id": self.id,
339 "ops": [op.Serialize() for op in self.ops],
340 "start_timestamp": self.start_timestamp,
341 "end_timestamp": self.end_timestamp,
342 "received_timestamp": self.received_timestamp,
343 }
344
397
399 """Gets the current priority for this job.
400
401 Only unfinished opcodes are considered. When all are done, the default
402 priority is used.
403
404 @rtype: int
405
406 """
407 priorities = [op.priority for op in self.ops
408 if op.status not in constants.OPS_FINALIZED]
409
410 if not priorities:
411
412 return constants.OP_PRIO_DEFAULT
413
414 return min(priorities)
415
417 """Selectively returns the log entries.
418
419 @type newer_than: None or int
420 @param newer_than: if this is None, return all log entries,
421 otherwise return only the log entries with serial higher
422 than this value
423 @rtype: list
424 @return: the list of the log entries selected
425
426 """
427 if newer_than is None:
428 serial = -1
429 else:
430 serial = newer_than
431
432 entries = []
433 for op in self.ops:
434 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
435
436 return entries
437
439 """Returns information about a job.
440
441 @type fields: list
442 @param fields: names of fields to return
443 @rtype: list
444 @return: list with one element for each field
445 @raise errors.OpExecError: when an invalid field
446 has been passed
447
448 """
449 return _SimpleJobQuery(fields)(self)
450
452 """Mark unfinished opcodes with a given status and result.
453
454 This is an utility function for marking all running or waiting to
455 be run opcodes with a given status. Opcodes which are already
456 finalised are not changed.
457
458 @param status: a given opcode status
459 @param result: the opcode result
460
461 """
462 not_marked = True
463 for op in self.ops:
464 if op.status in constants.OPS_FINALIZED:
465 assert not_marked, "Finalized opcodes found after non-finalized ones"
466 continue
467 op.status = status
468 op.result = result
469 not_marked = False
470
472 """Marks the job as finalized.
473
474 """
475 self.end_timestamp = TimeStampNow()
476
501
503 """Changes the job priority.
504
505 @type priority: int
506 @param priority: New priority
507 @rtype: tuple; (bool, string)
508 @return: Boolean describing whether job's priority was successfully changed
509 and a text message
510
511 """
512 status = self.CalcStatus()
513
514 if status in constants.JOBS_FINALIZED:
515 return (False, "Job %s is finished" % self.id)
516 elif status == constants.JOB_STATUS_CANCELING:
517 return (False, "Job %s is cancelling" % self.id)
518 else:
519 assert status in (constants.JOB_STATUS_QUEUED,
520 constants.JOB_STATUS_WAITING,
521 constants.JOB_STATUS_RUNNING)
522
523 changed = False
524 for op in self.ops:
525 if (op.status == constants.OP_STATUS_RUNNING or
526 op.status in constants.OPS_FINALIZED):
527 assert not changed, \
528 ("Found opcode for which priority should not be changed after"
529 " priority has been changed for previous opcodes")
530 continue
531
532 assert op.status in (constants.OP_STATUS_QUEUED,
533 constants.OP_STATUS_WAITING)
534
535 changed = True
536
537
538 op.priority = priority
539
540 if changed:
541 return (True, ("Priorities of pending opcodes for job %s have been"
542 " changed to %s" % (self.id, priority)))
543 else:
544 return (False, "Job %s had no pending opcodes" % self.id)
545
549 """Initializes this class.
550
551 @type queue: L{JobQueue}
552 @param queue: Job queue
553 @type job: L{_QueuedJob}
554 @param job: Job object
555 @type op: L{_QueuedOpCode}
556 @param op: OpCode
557
558 """
559 assert queue, "Queue is missing"
560 assert job, "Job is missing"
561 assert op, "Opcode is missing"
562
563 self._queue = queue
564 self._job = job
565 self._op = op
566
580
581 @locking.ssynchronized(_QUEUE, shared=1)
583 """Mark the opcode as running, not lock-waiting.
584
585 This is called from the mcpu code as a notifier function, when the LU is
586 finally about to start the Exec() method. Of course, to have end-user
587 visible results, the opcode must be initially (before calling into
588 Processor.ExecOpCode) set to OP_STATUS_WAITING.
589
590 """
591 assert self._op in self._job.ops
592 assert self._op.status in (constants.OP_STATUS_WAITING,
593 constants.OP_STATUS_CANCELING)
594
595
596 self._CheckCancel()
597
598 logging.debug("Opcode is now running")
599
600 self._op.status = constants.OP_STATUS_RUNNING
601 self._op.exec_timestamp = TimeStampNow()
602
603
604 self._queue.UpdateJobUnlocked(self._job)
605
606 @locking.ssynchronized(_QUEUE, shared=1)
608 """Internal feedback append function, with locks
609
610 """
611 self._job.log_serial += 1
612 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
613 self._queue.UpdateJobUnlocked(self._job, replicate=False)
614
631
643
645 """Submits jobs for processing.
646
647 See L{JobQueue.SubmitManyJobs}.
648
649 """
650
651 return self._queue.SubmitManyJobs(jobs)
652
655 - def __init__(self, fields, prev_job_info, prev_log_serial):
656 """Initializes this class.
657
658 @type fields: list of strings
659 @param fields: Fields requested by LUXI client
660 @type prev_job_info: string
661 @param prev_job_info: previous job info, as passed by the LUXI client
662 @type prev_log_serial: string
663 @param prev_log_serial: previous job serial, as passed by the LUXI client
664
665 """
666 self._squery = _SimpleJobQuery(fields)
667 self._prev_job_info = prev_job_info
668 self._prev_log_serial = prev_log_serial
669
706
709 - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
710 """Initializes this class.
711
712 @type filename: string
713 @param filename: Path to job file
714 @raises errors.InotifyError: if the notifier cannot be setup
715
716 """
717 self._wm = _inotify_wm_cls()
718 self._inotify_handler = \
719 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
720 self._notifier = \
721 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
722 try:
723 self._inotify_handler.enable()
724 except Exception:
725
726 self._notifier.stop()
727 raise
728
730 """Callback for inotify.
731
732 """
733 if not notifier_enabled:
734 self._inotify_handler.enable()
735
736 - def Wait(self, timeout):
737 """Waits for the job file to change.
738
739 @type timeout: float
740 @param timeout: Timeout in seconds
741 @return: Whether there have been events
742
743 """
744 assert timeout >= 0
745 have_events = self._notifier.check_events(timeout * 1000)
746 if have_events:
747 self._notifier.read_events()
748 self._notifier.process_events()
749 return have_events
750
752 """Closes underlying notifier and its file descriptor.
753
754 """
755 self._notifier.stop()
756
760 """Initializes this class.
761
762 @type filename: string
763 @param filename: Path to job file
764
765 """
766 self._filewaiter = None
767 self._filename = filename
768 self._waiter_cls = _waiter_cls
769
770 - def Wait(self, timeout):
771 """Waits for a job to change.
772
773 @type timeout: float
774 @param timeout: Timeout in seconds
775 @return: Whether there have been events
776
777 """
778 if self._filewaiter:
779 return self._filewaiter.Wait(timeout)
780
781
782
783
784
785 self._filewaiter = self._waiter_cls(self._filename)
786
787 return True
788
790 """Closes underlying waiter.
791
792 """
793 if self._filewaiter:
794 self._filewaiter.Close()
795
798 """Helper class using inotify to wait for changes in a job file.
799
800 This class takes a previous job status and serial, and alerts the client when
801 the current job status has changed.
802
803 """
804 @staticmethod
806 if counter.next() > 0:
807
808
809
810 time.sleep(0.1)
811
812 job = job_load_fn()
813 if not job:
814 raise errors.JobLost()
815
816 result = check_fn(job)
817 if result is None:
818 raise utils.RetryAgain()
819
820 return result
821
822 - def __call__(self, filename, job_load_fn,
823 fields, prev_job_info, prev_log_serial, timeout,
824 _waiter_cls=_JobChangesWaiter):
825 """Waits for changes on a job.
826
827 @type filename: string
828 @param filename: File on which to wait for changes
829 @type job_load_fn: callable
830 @param job_load_fn: Function to load job
831 @type fields: list of strings
832 @param fields: Which fields to check for changes
833 @type prev_job_info: list or None
834 @param prev_job_info: Last job information returned
835 @type prev_log_serial: int
836 @param prev_log_serial: Last job message serial number
837 @type timeout: float
838 @param timeout: maximum time to wait in seconds
839
840 """
841 counter = itertools.count()
842 try:
843 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
844 waiter = _waiter_cls(filename)
845 try:
846 return utils.Retry(compat.partial(self._CheckForChanges,
847 counter, job_load_fn, check_fn),
848 utils.RETRY_REMAINING_TIME, timeout,
849 wait_fn=waiter.Wait)
850 finally:
851 waiter.Close()
852 except errors.JobLost:
853 return None
854 except utils.RetryTimeout:
855 return constants.JOB_NOTCHANGED
856
868
872 """Initializes this class.
873
874 """
875 self._fn = fn
876 self._next = None
877
879 """Gets the next timeout if necessary.
880
881 """
882 if self._next is None:
883 self._next = self._fn()
884
886 """Returns the next timeout.
887
888 """
889 self._Advance()
890 return self._next
891
893 """Returns the current timeout and advances the internal state.
894
895 """
896 self._Advance()
897 result = self._next
898 self._next = None
899 return result
900
901
902 -class _OpExecContext:
903 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
904 """Initializes this class.
905
906 """
907 self.op = op
908 self.index = index
909 self.log_prefix = log_prefix
910 self.summary = op.input.Summary()
911
912
913 if getattr(op.input, opcodes.DEPEND_ATTR, None):
914 self.jobdeps = op.input.depends[:]
915 else:
916 self.jobdeps = None
917
918 self._timeout_strategy_factory = timeout_strategy_factory
919 self._ResetTimeoutStrategy()
920
922 """Creates a new timeout strategy.
923
924 """
925 self._timeout_strategy = \
926 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
927
929 """Checks whether priority can and should be increased.
930
931 Called when locks couldn't be acquired.
932
933 """
934 op = self.op
935
936
937
938 if (self._timeout_strategy.Peek() is None and
939 op.priority > constants.OP_PRIO_HIGHEST):
940 logging.debug("Increasing priority")
941 op.priority -= 1
942 self._ResetTimeoutStrategy()
943 return True
944
945 return False
946
948 """Returns the next lock acquire timeout.
949
950 """
951 return self._timeout_strategy.Next()
952
955 (DEFER,
956 WAITDEP,
957 FINISHED) = range(1, 4)
958
961 """Initializes this class.
962
963 """
964 self.queue = queue
965 self.opexec_fn = opexec_fn
966 self.job = job
967 self._timeout_strategy_factory = _timeout_strategy_factory
968
969 @staticmethod
971 """Locates the next opcode to run.
972
973 @type job: L{_QueuedJob}
974 @param job: Job object
975 @param timeout_strategy_factory: Callable to create new timeout strategy
976
977 """
978
979
980
981
982 if job.ops_iter is None:
983 job.ops_iter = enumerate(job.ops)
984
985
986 while True:
987 try:
988 (idx, op) = job.ops_iter.next()
989 except StopIteration:
990 raise errors.ProgrammerError("Called for a finished job")
991
992 if op.status == constants.OP_STATUS_RUNNING:
993
994 raise errors.ProgrammerError("Called for job marked as running")
995
996 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
997 timeout_strategy_factory)
998
999 if op.status not in constants.OPS_FINALIZED:
1000 return opctx
1001
1002
1003
1004
1005
1006 logging.info("%s: opcode %s already processed, skipping",
1007 opctx.log_prefix, opctx.summary)
1008
1009 @staticmethod
1011 """Marks an opcode as waiting for locks.
1012
1013 The job's start timestamp is also set if necessary.
1014
1015 @type job: L{_QueuedJob}
1016 @param job: Job object
1017 @type op: L{_QueuedOpCode}
1018 @param op: Opcode object
1019
1020 """
1021 assert op in job.ops
1022 assert op.status in (constants.OP_STATUS_QUEUED,
1023 constants.OP_STATUS_WAITING)
1024
1025 update = False
1026
1027 op.result = None
1028
1029 if op.status == constants.OP_STATUS_QUEUED:
1030 op.status = constants.OP_STATUS_WAITING
1031 update = True
1032
1033 if op.start_timestamp is None:
1034 op.start_timestamp = TimeStampNow()
1035 update = True
1036
1037 if job.start_timestamp is None:
1038 job.start_timestamp = op.start_timestamp
1039 update = True
1040
1041 assert op.status == constants.OP_STATUS_WAITING
1042
1043 return update
1044
1045 @staticmethod
1047 """Checks if an opcode has dependencies and if so, processes them.
1048
1049 @type queue: L{JobQueue}
1050 @param queue: Queue object
1051 @type job: L{_QueuedJob}
1052 @param job: Job object
1053 @type opctx: L{_OpExecContext}
1054 @param opctx: Opcode execution context
1055 @rtype: bool
1056 @return: Whether opcode will be re-scheduled by dependency tracker
1057
1058 """
1059 op = opctx.op
1060
1061 result = False
1062
1063 while opctx.jobdeps:
1064 (dep_job_id, dep_status) = opctx.jobdeps[0]
1065
1066 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1067 dep_status)
1068 assert ht.TNonEmptyString(depmsg), "No dependency message"
1069
1070 logging.info("%s: %s", opctx.log_prefix, depmsg)
1071
1072 if depresult == _JobDependencyManager.CONTINUE:
1073
1074 opctx.jobdeps.pop(0)
1075
1076 elif depresult == _JobDependencyManager.WAIT:
1077
1078
1079 result = True
1080 break
1081
1082 elif depresult == _JobDependencyManager.CANCEL:
1083
1084 job.Cancel()
1085 assert op.status == constants.OP_STATUS_CANCELING
1086 break
1087
1088 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1089 _JobDependencyManager.ERROR):
1090
1091 op.status = constants.OP_STATUS_ERROR
1092 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1093 break
1094
1095 else:
1096 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1097 depresult)
1098
1099 return result
1100
1102 """Processes one opcode and returns the result.
1103
1104 """
1105 op = opctx.op
1106
1107 assert op.status in (constants.OP_STATUS_WAITING,
1108 constants.OP_STATUS_CANCELING)
1109
1110
1111 if op.status == constants.OP_STATUS_CANCELING:
1112 return (constants.OP_STATUS_CANCELING, None)
1113
1114 timeout = opctx.GetNextLockTimeout()
1115
1116 try:
1117
1118 result = self.opexec_fn(op.input,
1119 _OpExecCallbacks(self.queue, self.job, op),
1120 timeout=timeout)
1121 except mcpu.LockAcquireTimeout:
1122 assert timeout is not None, "Received timeout for blocking acquire"
1123 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1124
1125 assert op.status in (constants.OP_STATUS_WAITING,
1126 constants.OP_STATUS_CANCELING)
1127
1128
1129 if op.status == constants.OP_STATUS_CANCELING:
1130 return (constants.OP_STATUS_CANCELING, None)
1131
1132
1133 if not self.queue.AcceptingJobsUnlocked():
1134 return (constants.OP_STATUS_QUEUED, None)
1135
1136
1137 return (constants.OP_STATUS_WAITING, None)
1138 except CancelJob:
1139 logging.exception("%s: Canceling job", opctx.log_prefix)
1140 assert op.status == constants.OP_STATUS_CANCELING
1141 return (constants.OP_STATUS_CANCELING, None)
1142
1143 except QueueShutdown:
1144 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1145
1146 assert op.status == constants.OP_STATUS_WAITING
1147
1148
1149 return (constants.OP_STATUS_QUEUED, None)
1150
1151 except Exception, err:
1152 logging.exception("%s: Caught exception in %s",
1153 opctx.log_prefix, opctx.summary)
1154 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1155 else:
1156 logging.debug("%s: %s successful",
1157 opctx.log_prefix, opctx.summary)
1158 return (constants.OP_STATUS_SUCCESS, result)
1159
1161 """Continues execution of a job.
1162
1163 @param _nextop_fn: Callback function for tests
1164 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1165 be deferred and C{WAITDEP} if the dependency manager
1166 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1167
1168 """
1169 queue = self.queue
1170 job = self.job
1171
1172 logging.debug("Processing job %s", job.id)
1173
1174 queue.acquire(shared=1)
1175 try:
1176 opcount = len(job.ops)
1177
1178 assert job.writable, "Expected writable job"
1179
1180
1181 if job.CalcStatus() in constants.JOBS_FINALIZED:
1182 return self.FINISHED
1183
1184
1185 if job.cur_opctx:
1186 opctx = job.cur_opctx
1187 job.cur_opctx = None
1188 else:
1189 if __debug__ and _nextop_fn:
1190 _nextop_fn()
1191 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1192
1193 op = opctx.op
1194
1195
1196 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1197 constants.OP_STATUS_CANCELING)
1198 for i in job.ops[opctx.index + 1:])
1199
1200 assert op.status in (constants.OP_STATUS_QUEUED,
1201 constants.OP_STATUS_WAITING,
1202 constants.OP_STATUS_CANCELING)
1203
1204 assert (op.priority <= constants.OP_PRIO_LOWEST and
1205 op.priority >= constants.OP_PRIO_HIGHEST)
1206
1207 waitjob = None
1208
1209 if op.status != constants.OP_STATUS_CANCELING:
1210 assert op.status in (constants.OP_STATUS_QUEUED,
1211 constants.OP_STATUS_WAITING)
1212
1213
1214 if self._MarkWaitlock(job, op):
1215
1216 queue.UpdateJobUnlocked(job)
1217
1218 assert op.status == constants.OP_STATUS_WAITING
1219 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1220 assert job.start_timestamp and op.start_timestamp
1221 assert waitjob is None
1222
1223
1224 waitjob = self._CheckDependencies(queue, job, opctx)
1225
1226 assert op.status in (constants.OP_STATUS_WAITING,
1227 constants.OP_STATUS_CANCELING,
1228 constants.OP_STATUS_ERROR)
1229
1230 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1231 constants.OP_STATUS_ERROR)):
1232 logging.info("%s: opcode %s waiting for locks",
1233 opctx.log_prefix, opctx.summary)
1234
1235 assert not opctx.jobdeps, "Not all dependencies were removed"
1236
1237 queue.release()
1238 try:
1239 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1240 finally:
1241 queue.acquire(shared=1)
1242
1243 op.status = op_status
1244 op.result = op_result
1245
1246 assert not waitjob
1247
1248 if op.status in (constants.OP_STATUS_WAITING,
1249 constants.OP_STATUS_QUEUED):
1250
1251
1252 assert not op.end_timestamp
1253 else:
1254
1255 op.end_timestamp = TimeStampNow()
1256
1257 if op.status == constants.OP_STATUS_CANCELING:
1258 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1259 for i in job.ops[opctx.index:])
1260 else:
1261 assert op.status in constants.OPS_FINALIZED
1262
1263 if op.status == constants.OP_STATUS_QUEUED:
1264
1265 assert not waitjob
1266
1267 finalize = False
1268
1269
1270 job.cur_opctx = None
1271
1272
1273 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1274
1275 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1276 finalize = False
1277
1278 if not waitjob and opctx.CheckPriorityIncrease():
1279
1280 queue.UpdateJobUnlocked(job)
1281
1282
1283 job.cur_opctx = opctx
1284
1285 assert (op.priority <= constants.OP_PRIO_LOWEST and
1286 op.priority >= constants.OP_PRIO_HIGHEST)
1287
1288
1289 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1290
1291 else:
1292
1293 assert (opctx.index == 0 or
1294 compat.all(i.status == constants.OP_STATUS_SUCCESS
1295 for i in job.ops[:opctx.index]))
1296
1297
1298 job.cur_opctx = None
1299
1300 if op.status == constants.OP_STATUS_SUCCESS:
1301 finalize = False
1302
1303 elif op.status == constants.OP_STATUS_ERROR:
1304
1305 assert errors.GetEncodedError(job.ops[opctx.index].result)
1306
1307 to_encode = errors.OpExecError("Preceding opcode failed")
1308 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1309 _EncodeOpError(to_encode))
1310 finalize = True
1311
1312
1313 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1314 errors.GetEncodedError(i.result)
1315 for i in job.ops[opctx.index:])
1316
1317 elif op.status == constants.OP_STATUS_CANCELING:
1318 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1319 "Job canceled by request")
1320 finalize = True
1321
1322 else:
1323 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1324
1325 if opctx.index == (opcount - 1):
1326
1327 finalize = True
1328
1329 if finalize:
1330
1331 job.Finalize()
1332
1333
1334
1335 queue.UpdateJobUnlocked(job)
1336
1337 assert not waitjob
1338
1339 if finalize:
1340 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1341 return self.FINISHED
1342
1343 assert not waitjob or queue.depmgr.JobWaiting(job)
1344
1345 if waitjob:
1346 return self.WAITDEP
1347 else:
1348 return self.DEFER
1349 finally:
1350 assert job.writable, "Job became read-only while being processed"
1351 queue.release()
1352
1375
1378 """The actual job workers.
1379
1380 """
1382 """Job executor.
1383
1384 @type job: L{_QueuedJob}
1385 @param job: the job to be processed
1386
1387 """
1388 assert job.writable, "Expected writable job"
1389
1390
1391
1392
1393 job.processor_lock.acquire()
1394 try:
1395 return self._RunTaskInner(job)
1396 finally:
1397 job.processor_lock.release()
1398
1419
1420 @staticmethod
1422 """Updates the worker thread name to include a short summary of the opcode.
1423
1424 @param setname_fn: Callable setting worker thread name
1425 @param execop_fn: Callable for executing opcode (usually
1426 L{mcpu.Processor.ExecOpCode})
1427
1428 """
1429 setname_fn(op)
1430 try:
1431 return execop_fn(op, *args, **kwargs)
1432 finally:
1433 setname_fn(None)
1434
1435 @staticmethod
1437 """Sets the worker thread name.
1438
1439 @type job: L{_QueuedJob}
1440 @type op: L{opcodes.OpCode}
1441
1442 """
1443 parts = ["Job%s" % job.id]
1444
1445 if op:
1446 parts.append(op.TinySummary())
1447
1448 return "/".join(parts)
1449
1452 """Simple class implementing a job-processing workerpool.
1453
1454 """
1460
1463 """Keeps track of job dependencies.
1464
1465 """
1466 (WAIT,
1467 ERROR,
1468 CANCEL,
1469 CONTINUE,
1470 WRONGSTATUS) = range(1, 6)
1471
1472 - def __init__(self, getstatus_fn, enqueue_fn):
1473 """Initializes this class.
1474
1475 """
1476 self._getstatus_fn = getstatus_fn
1477 self._enqueue_fn = enqueue_fn
1478
1479 self._waiters = {}
1480 self._lock = locking.SharedLock("JobDepMgr")
1481
1482 @locking.ssynchronized(_LOCK, shared=1)
1484 """Retrieves information about waiting jobs.
1485
1486 @type requested: set
1487 @param requested: Requested information, see C{query.LQ_*}
1488
1489 """
1490
1491
1492
1493 return [("job/%s" % job_id, None, None,
1494 [("job", [job.id for job in waiters])])
1495 for job_id, waiters in self._waiters.items()
1496 if waiters]
1497
1498 @locking.ssynchronized(_LOCK, shared=1)
1500 """Checks if a job is waiting.
1501
1502 """
1503 return compat.any(job in jobs
1504 for jobs in self._waiters.values())
1505
1506 @locking.ssynchronized(_LOCK)
1508 """Checks if a dependency job has the requested status.
1509
1510 If the other job is not yet in a finalized status, the calling job will be
1511 notified (re-added to the workerpool) at a later point.
1512
1513 @type job: L{_QueuedJob}
1514 @param job: Job object
1515 @type dep_job_id: int
1516 @param dep_job_id: ID of dependency job
1517 @type dep_status: list
1518 @param dep_status: Required status
1519
1520 """
1521 assert ht.TJobId(job.id)
1522 assert ht.TJobId(dep_job_id)
1523 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1524
1525 if job.id == dep_job_id:
1526 return (self.ERROR, "Job can't depend on itself")
1527
1528
1529 try:
1530 status = self._getstatus_fn(dep_job_id)
1531 except errors.JobLost, err:
1532 return (self.ERROR, "Dependency error: %s" % err)
1533
1534 assert status in constants.JOB_STATUS_ALL
1535
1536 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1537
1538 if status not in constants.JOBS_FINALIZED:
1539
1540 job_id_waiters.add(job)
1541 return (self.WAIT,
1542 "Need to wait for job %s, wanted status '%s'" %
1543 (dep_job_id, dep_status))
1544
1545
1546 if job in job_id_waiters:
1547 job_id_waiters.remove(job)
1548
1549 if (status == constants.JOB_STATUS_CANCELED and
1550 constants.JOB_STATUS_CANCELED not in dep_status):
1551 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1552
1553 elif not dep_status or status in dep_status:
1554 return (self.CONTINUE,
1555 "Dependency job %s finished with status '%s'" %
1556 (dep_job_id, status))
1557
1558 else:
1559 return (self.WRONGSTATUS,
1560 "Dependency job %s finished with status '%s',"
1561 " not one of '%s' as required" %
1562 (dep_job_id, status, utils.CommaJoin(dep_status)))
1563
1565 """Remove all jobs without actual waiters.
1566
1567 """
1568 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1569 if not waiters]:
1570 del self._waiters[job_id]
1571
1573 """Notifies all jobs waiting for a certain job ID.
1574
1575 @attention: Do not call until L{CheckAndRegister} returned a status other
1576 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1577 @type job_id: int
1578 @param job_id: Job ID
1579
1580 """
1581 assert ht.TJobId(job_id)
1582
1583 self._lock.acquire()
1584 try:
1585 self._RemoveEmptyWaitersUnlocked()
1586
1587 jobs = self._waiters.pop(job_id, None)
1588 finally:
1589 self._lock.release()
1590
1591 if jobs:
1592
1593 logging.debug("Re-adding %s jobs which were waiting for job %s",
1594 len(jobs), job_id)
1595 self._enqueue_fn(jobs)
1596
1599 """Decorator for "public" functions.
1600
1601 This function should be used for all 'public' functions. That is,
1602 functions usually called from other classes. Note that this should
1603 be applied only to methods (not plain functions), since it expects
1604 that the decorated function is called with a first argument that has
1605 a '_queue_filelock' argument.
1606
1607 @warning: Use this decorator only after locking.ssynchronized
1608
1609 Example::
1610 @locking.ssynchronized(_LOCK)
1611 @_RequireOpenQueue
1612 def Example(self):
1613 pass
1614
1615 """
1616 def wrapper(self, *args, **kwargs):
1617
1618 assert self._queue_filelock is not None, "Queue should be open"
1619 return fn(self, *args, **kwargs)
1620 return wrapper
1621
1624 """Decorator checking for a non-drained queue.
1625
1626 To be used with functions submitting new jobs.
1627
1628 """
1629 def wrapper(self, *args, **kwargs):
1630 """Wrapper function.
1631
1632 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1633
1634 """
1635
1636
1637
1638 if self._drained:
1639 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1640
1641 if not self._accepting_jobs:
1642 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1643
1644 return fn(self, *args, **kwargs)
1645 return wrapper
1646
1649 """Queue used to manage the jobs.
1650
1651 """
1653 """Constructor for JobQueue.
1654
1655 The constructor will initialize the job queue object and then
1656 start loading the current jobs from disk, either for starting them
1657 (if they were queue) or for aborting them (if they were already
1658 running).
1659
1660 @type context: GanetiContext
1661 @param context: the context object for access to the configuration
1662 data and other ganeti objects
1663
1664 """
1665 self.context = context
1666 self._memcache = weakref.WeakValueDictionary()
1667 self._my_hostname = netutils.Hostname.GetSysName()
1668
1669
1670
1671
1672
1673
1674 self._lock = locking.SharedLock("JobQueue")
1675
1676 self.acquire = self._lock.acquire
1677 self.release = self._lock.release
1678
1679
1680 self._accepting_jobs = True
1681
1682
1683
1684 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1685
1686
1687 self._last_serial = jstore.ReadSerial()
1688 assert self._last_serial is not None, ("Serial file was modified between"
1689 " check in jstore and here")
1690
1691
1692 self._nodes = dict((n.name, n.primary_ip)
1693 for n in self.context.cfg.GetAllNodesInfo().values()
1694 if n.master_candidate)
1695
1696
1697 self._nodes.pop(self._my_hostname, None)
1698
1699
1700
1701 self._queue_size = None
1702 self._UpdateQueueSizeUnlocked()
1703 assert ht.TInt(self._queue_size)
1704 self._drained = jstore.CheckDrainFlag()
1705
1706
1707 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1708 self._EnqueueJobs)
1709 self.context.glm.AddToLockMonitor(self.depmgr)
1710
1711
1712 self._wpool = _JobQueueWorkerPool(self)
1713 try:
1714 self._InspectQueue()
1715 except:
1716 self._wpool.TerminateWorkers()
1717 raise
1718
1719 @locking.ssynchronized(_LOCK)
1720 @_RequireOpenQueue
1722 """Loads the whole job queue and resumes unfinished jobs.
1723
1724 This function needs the lock here because WorkerPool.AddTask() may start a
1725 job while we're still doing our work.
1726
1727 """
1728 logging.info("Inspecting job queue")
1729
1730 restartjobs = []
1731
1732 all_job_ids = self._GetJobIDsUnlocked()
1733 jobs_count = len(all_job_ids)
1734 lastinfo = time.time()
1735 for idx, job_id in enumerate(all_job_ids):
1736
1737 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1738 idx == (jobs_count - 1)):
1739 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1740 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1741 lastinfo = time.time()
1742
1743 job = self._LoadJobUnlocked(job_id)
1744
1745
1746 if job is None:
1747 continue
1748
1749 status = job.CalcStatus()
1750
1751 if status == constants.JOB_STATUS_QUEUED:
1752 restartjobs.append(job)
1753
1754 elif status in (constants.JOB_STATUS_RUNNING,
1755 constants.JOB_STATUS_WAITING,
1756 constants.JOB_STATUS_CANCELING):
1757 logging.warning("Unfinished job %s found: %s", job.id, job)
1758
1759 if status == constants.JOB_STATUS_WAITING:
1760
1761 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1762 restartjobs.append(job)
1763 else:
1764 to_encode = errors.OpExecError("Unclean master daemon shutdown")
1765 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1766 _EncodeOpError(to_encode))
1767 job.Finalize()
1768
1769 self.UpdateJobUnlocked(job)
1770
1771 if restartjobs:
1772 logging.info("Restarting %s jobs", len(restartjobs))
1773 self._EnqueueJobsUnlocked(restartjobs)
1774
1775 logging.info("Job queue inspection finished")
1776
1778 """Gets RPC runner with context.
1779
1780 """
1781 return rpc.JobQueueRunner(self.context, address_list)
1782
1783 @locking.ssynchronized(_LOCK)
1784 @_RequireOpenQueue
1786 """Register a new node with the queue.
1787
1788 @type node: L{objects.Node}
1789 @param node: the node object to be added
1790
1791 """
1792 node_name = node.name
1793 assert node_name != self._my_hostname
1794
1795
1796 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1797 msg = result.fail_msg
1798 if msg:
1799 logging.warning("Cannot cleanup queue directory on node %s: %s",
1800 node_name, msg)
1801
1802 if not node.master_candidate:
1803
1804 self._nodes.pop(node_name, None)
1805
1806 return
1807
1808
1809 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1810
1811
1812 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1813
1814
1815 addrs = [node.primary_ip]
1816
1817 for file_name in files:
1818
1819 content = utils.ReadFile(file_name)
1820
1821 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1822 file_name, content)
1823 msg = result[node_name].fail_msg
1824 if msg:
1825 logging.error("Failed to upload file %s to node %s: %s",
1826 file_name, node_name, msg)
1827
1828
1829 result = \
1830 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1831 self._drained)
1832 msg = result[node_name].fail_msg
1833 if msg:
1834 logging.error("Failed to set queue drained flag on node %s: %s",
1835 node_name, msg)
1836
1837 self._nodes[node_name] = node.primary_ip
1838
1839 @locking.ssynchronized(_LOCK)
1840 @_RequireOpenQueue
1842 """Callback called when removing nodes from the cluster.
1843
1844 @type node_name: str
1845 @param node_name: the name of the node to remove
1846
1847 """
1848 self._nodes.pop(node_name, None)
1849
1850 @staticmethod
1852 """Verifies the status of an RPC call.
1853
1854 Since we aim to keep consistency should this node (the current
1855 master) fail, we will log errors if our rpc fail, and especially
1856 log the case when more than half of the nodes fails.
1857
1858 @param result: the data as returned from the rpc call
1859 @type nodes: list
1860 @param nodes: the list of nodes we made the call to
1861 @type failmsg: str
1862 @param failmsg: the identifier to be used for logging
1863
1864 """
1865 failed = []
1866 success = []
1867
1868 for node in nodes:
1869 msg = result[node].fail_msg
1870 if msg:
1871 failed.append(node)
1872 logging.error("RPC call %s (%s) failed on node %s: %s",
1873 result[node].call, failmsg, node, msg)
1874 else:
1875 success.append(node)
1876
1877
1878 if (len(success) + 1) < len(failed):
1879
1880 logging.error("More than half of the nodes failed")
1881
1883 """Helper for returning the node name/ip list.
1884
1885 @rtype: (list, list)
1886 @return: a tuple of two lists, the first one with the node
1887 names and the second one with the node addresses
1888
1889 """
1890
1891 name_list = self._nodes.keys()
1892 addr_list = [self._nodes[name] for name in name_list]
1893 return name_list, addr_list
1894
1896 """Writes a file locally and then replicates it to all nodes.
1897
1898 This function will replace the contents of a file on the local
1899 node and then replicate it to all the other nodes we have.
1900
1901 @type file_name: str
1902 @param file_name: the path of the file to be replicated
1903 @type data: str
1904 @param data: the new contents of the file
1905 @type replicate: boolean
1906 @param replicate: whether to spread the changes to the remote nodes
1907
1908 """
1909 getents = runtime.GetEnts()
1910 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1911 gid=getents.daemons_gid,
1912 mode=constants.JOB_QUEUE_FILES_PERMS)
1913
1914 if replicate:
1915 names, addrs = self._GetNodeIp()
1916 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1917 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1918
1920 """Renames a file locally and then replicate the change.
1921
1922 This function will rename a file in the local queue directory
1923 and then replicate this rename to all the other nodes we have.
1924
1925 @type rename: list of (old, new)
1926 @param rename: List containing tuples mapping old to new names
1927
1928 """
1929
1930 for old, new in rename:
1931 utils.RenameFile(old, new, mkdir=True)
1932
1933
1934 names, addrs = self._GetNodeIp()
1935 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1936 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1937
1939 """Generates a new job identifier.
1940
1941 Job identifiers are unique during the lifetime of a cluster.
1942
1943 @type count: integer
1944 @param count: how many serials to return
1945 @rtype: list of int
1946 @return: a list of job identifiers.
1947
1948 """
1949 assert ht.TNonNegativeInt(count)
1950
1951
1952 serial = self._last_serial + count
1953
1954
1955 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1956 "%s\n" % serial, True)
1957
1958 result = [jstore.FormatJobID(v)
1959 for v in range(self._last_serial + 1, serial + 1)]
1960
1961
1962 self._last_serial = serial
1963
1964 assert len(result) == count
1965
1966 return result
1967
1968 @staticmethod
1970 """Returns the job file for a given job id.
1971
1972 @type job_id: str
1973 @param job_id: the job identifier
1974 @rtype: str
1975 @return: the path to the job file
1976
1977 """
1978 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1979
1980 @staticmethod
1982 """Returns the archived job file for a give job id.
1983
1984 @type job_id: str
1985 @param job_id: the job identifier
1986 @rtype: str
1987 @return: the path to the archived job file
1988
1989 """
1990 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1991 jstore.GetArchiveDirectory(job_id),
1992 "job-%s" % job_id)
1993
1994 @staticmethod
1996 """Build list of directories containing job files.
1997
1998 @type archived: bool
1999 @param archived: Whether to include directories for archived jobs
2000 @rtype: list
2001
2002 """
2003 result = [pathutils.QUEUE_DIR]
2004
2005 if archived:
2006 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2007 result.extend(map(compat.partial(utils.PathJoin, archive_path),
2008 utils.ListVisibleFiles(archive_path)))
2009
2010 return result
2011
2012 @classmethod
2014 """Return all known job IDs.
2015
2016 The method only looks at disk because it's a requirement that all
2017 jobs are present on disk (so in the _memcache we don't have any
2018 extra IDs).
2019
2020 @type sort: boolean
2021 @param sort: perform sorting on the returned job ids
2022 @rtype: list
2023 @return: the list of job IDs
2024
2025 """
2026 jlist = []
2027
2028 for path in cls._DetermineJobDirectories(archived):
2029 for filename in utils.ListVisibleFiles(path):
2030 m = constants.JOB_FILE_RE.match(filename)
2031 if m:
2032 jlist.append(int(m.group(1)))
2033
2034 if sort:
2035 jlist.sort()
2036 return jlist
2037
2039 """Loads a job from the disk or memory.
2040
2041 Given a job id, this will return the cached job object if
2042 existing, or try to load the job from the disk. If loading from
2043 disk, it will also add the job to the cache.
2044
2045 @type job_id: int
2046 @param job_id: the job id
2047 @rtype: L{_QueuedJob} or None
2048 @return: either None or the job object
2049
2050 """
2051 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2052
2053 job = self._memcache.get(job_id, None)
2054 if job:
2055 logging.debug("Found job %s in memcache", job_id)
2056 assert job.writable, "Found read-only job in memcache"
2057 return job
2058
2059 try:
2060 job = self._LoadJobFromDisk(job_id, False)
2061 if job is None:
2062 return job
2063 except errors.JobFileCorrupted:
2064 old_path = self._GetJobPath(job_id)
2065 new_path = self._GetArchivedJobPath(job_id)
2066 if old_path == new_path:
2067
2068 logging.exception("Can't parse job %s", job_id)
2069 else:
2070
2071 logging.exception("Can't parse job %s, will archive.", job_id)
2072 self._RenameFilesUnlocked([(old_path, new_path)])
2073 return None
2074
2075 assert job.writable, "Job just loaded is not writable"
2076
2077 self._memcache[job_id] = job
2078 logging.debug("Added job %s to the cache", job_id)
2079 return job
2080
2082 """Load the given job file from disk.
2083
2084 Given a job file, read, load and restore it in a _QueuedJob format.
2085
2086 @type job_id: int
2087 @param job_id: job identifier
2088 @type try_archived: bool
2089 @param try_archived: Whether to try loading an archived job
2090 @rtype: L{_QueuedJob} or None
2091 @return: either None or the job object
2092
2093 """
2094 path_functions = [(self._GetJobPath, False)]
2095
2096 if try_archived:
2097 path_functions.append((self._GetArchivedJobPath, True))
2098
2099 raw_data = None
2100 archived = None
2101
2102 for (fn, archived) in path_functions:
2103 filepath = fn(job_id)
2104 logging.debug("Loading job from %s", filepath)
2105 try:
2106 raw_data = utils.ReadFile(filepath)
2107 except EnvironmentError, err:
2108 if err.errno != errno.ENOENT:
2109 raise
2110 else:
2111 break
2112
2113 if not raw_data:
2114 return None
2115
2116 if writable is None:
2117 writable = not archived
2118
2119 try:
2120 data = serializer.LoadJson(raw_data)
2121 job = _QueuedJob.Restore(self, data, writable, archived)
2122 except Exception, err:
2123 raise errors.JobFileCorrupted(err)
2124
2125 return job
2126
2128 """Load the given job file from disk.
2129
2130 Given a job file, read, load and restore it in a _QueuedJob format.
2131 In case of error reading the job, it gets returned as None, and the
2132 exception is logged.
2133
2134 @type job_id: int
2135 @param job_id: job identifier
2136 @type try_archived: bool
2137 @param try_archived: Whether to try loading an archived job
2138 @rtype: L{_QueuedJob} or None
2139 @return: either None or the job object
2140
2141 """
2142 try:
2143 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2144 except (errors.JobFileCorrupted, EnvironmentError):
2145 logging.exception("Can't load/parse job %s", job_id)
2146 return None
2147
2149 """Update the queue size.
2150
2151 """
2152 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2153
2154 @locking.ssynchronized(_LOCK)
2155 @_RequireOpenQueue
2157 """Sets the drain flag for the queue.
2158
2159 @type drain_flag: boolean
2160 @param drain_flag: Whether to set or unset the drain flag
2161
2162 """
2163
2164 jstore.SetDrainFlag(drain_flag)
2165
2166 self._drained = drain_flag
2167
2168
2169 (names, addrs) = self._GetNodeIp()
2170 result = \
2171 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2172 self._CheckRpcResult(result, self._nodes,
2173 "Setting queue drain flag to %s" % drain_flag)
2174
2175 return True
2176
2177 @_RequireOpenQueue
2179 """Create and store a new job.
2180
2181 This enters the job into our job queue and also puts it on the new
2182 queue, in order for it to be picked up by the queue processors.
2183
2184 @type job_id: job ID
2185 @param job_id: the job ID for the new job
2186 @type ops: list
2187 @param ops: The list of OpCodes that will become the new job.
2188 @rtype: L{_QueuedJob}
2189 @return: the job object to be queued
2190 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2191 @raise errors.GenericError: If an opcode is not valid
2192
2193 """
2194 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2195 raise errors.JobQueueFull()
2196
2197 job = _QueuedJob(self, job_id, ops, True)
2198
2199 for idx, op in enumerate(job.ops):
2200
2201 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2202 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2203 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2204 " are %s" % (idx, op.priority, allowed))
2205
2206
2207 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2208 if not opcodes.TNoRelativeJobDependencies(dependencies):
2209 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2210 " match %s: %s" %
2211 (idx, opcodes.TNoRelativeJobDependencies,
2212 dependencies))
2213
2214
2215 self.UpdateJobUnlocked(job)
2216
2217 self._queue_size += 1
2218
2219 logging.debug("Adding new job %s to the cache", job_id)
2220 self._memcache[job_id] = job
2221
2222 return job
2223
2224 @locking.ssynchronized(_LOCK)
2225 @_RequireOpenQueue
2226 @_RequireNonDrainedQueue
2236
2237 @locking.ssynchronized(_LOCK)
2238 @_RequireOpenQueue
2239 @_RequireNonDrainedQueue
2241 """Create and store multiple jobs.
2242
2243 @see: L{_SubmitJobUnlocked}
2244
2245 """
2246 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2247
2248 (results, added_jobs) = \
2249 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2250
2251 self._EnqueueJobsUnlocked(added_jobs)
2252
2253 return results
2254
2255 @staticmethod
2262
2263 @staticmethod
2265 """Resolves relative job IDs in dependencies.
2266
2267 @type resolve_fn: callable
2268 @param resolve_fn: Function to resolve a relative job ID
2269 @type deps: list
2270 @param deps: Dependencies
2271 @rtype: tuple; (boolean, string or list)
2272 @return: If successful (first tuple item), the returned list contains
2273 resolved job IDs along with the requested status; if not successful,
2274 the second element is an error message
2275
2276 """
2277 result = []
2278
2279 for (dep_job_id, dep_status) in deps:
2280 if ht.TRelativeJobId(dep_job_id):
2281 assert ht.TInt(dep_job_id) and dep_job_id < 0
2282 try:
2283 job_id = resolve_fn(dep_job_id)
2284 except IndexError:
2285
2286 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2287 else:
2288 job_id = dep_job_id
2289
2290 result.append((job_id, dep_status))
2291
2292 return (True, result)
2293
2295 """Create and store multiple jobs.
2296
2297 @see: L{_SubmitJobUnlocked}
2298
2299 """
2300 results = []
2301 added_jobs = []
2302
2303 def resolve_fn(job_idx, reljobid):
2304 assert reljobid < 0
2305 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2306
2307 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2308 for op in ops:
2309 if getattr(op, opcodes.DEPEND_ATTR, None):
2310 (status, data) = \
2311 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2312 op.depends)
2313 if not status:
2314
2315 assert ht.TNonEmptyString(data), "No error message"
2316 break
2317
2318 op.depends = data
2319 else:
2320 try:
2321 job = self._SubmitJobUnlocked(job_id, ops)
2322 except errors.GenericError, err:
2323 status = False
2324 data = self._FormatSubmitError(str(err), ops)
2325 else:
2326 status = True
2327 data = job_id
2328 added_jobs.append(job)
2329
2330 results.append((status, data))
2331
2332 return (results, added_jobs)
2333
2334 @locking.ssynchronized(_LOCK)
2336 """Helper function to add jobs to worker pool's queue.
2337
2338 @type jobs: list
2339 @param jobs: List of all jobs
2340
2341 """
2342 return self._EnqueueJobsUnlocked(jobs)
2343
2345 """Helper function to add jobs to worker pool's queue.
2346
2347 @type jobs: list
2348 @param jobs: List of all jobs
2349
2350 """
2351 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2352 self._wpool.AddManyTasks([(job, ) for job in jobs],
2353 priority=[job.CalcPriority() for job in jobs],
2354 task_id=map(_GetIdAttr, jobs))
2355
2357 """Gets the status of a job for dependencies.
2358
2359 @type job_id: int
2360 @param job_id: Job ID
2361 @raise errors.JobLost: If job can't be found
2362
2363 """
2364
2365
2366
2367 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2368
2369 assert not job.writable, "Got writable job"
2370
2371 if job:
2372 return job.CalcStatus()
2373
2374 raise errors.JobLost("Job %s not found" % job_id)
2375
2376 @_RequireOpenQueue
2378 """Update a job's on disk storage.
2379
2380 After a job has been modified, this function needs to be called in
2381 order to write the changes to disk and replicate them to the other
2382 nodes.
2383
2384 @type job: L{_QueuedJob}
2385 @param job: the changed job
2386 @type replicate: boolean
2387 @param replicate: whether to replicate the change to remote nodes
2388
2389 """
2390 if __debug__:
2391 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2392 assert (finalized ^ (job.end_timestamp is None))
2393 assert job.writable, "Can't update read-only job"
2394 assert not job.archived, "Can't update archived job"
2395
2396 filename = self._GetJobPath(job.id)
2397 data = serializer.DumpJson(job.Serialize())
2398 logging.debug("Writing job %s to %s", job.id, filename)
2399 self._UpdateJobQueueFile(filename, data, replicate)
2400
2401 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2402 timeout):
2403 """Waits for changes in a job.
2404
2405 @type job_id: int
2406 @param job_id: Job identifier
2407 @type fields: list of strings
2408 @param fields: Which fields to check for changes
2409 @type prev_job_info: list or None
2410 @param prev_job_info: Last job information returned
2411 @type prev_log_serial: int
2412 @param prev_log_serial: Last job message serial number
2413 @type timeout: float
2414 @param timeout: maximum time to wait in seconds
2415 @rtype: tuple (job info, log entries)
2416 @return: a tuple of the job information as required via
2417 the fields parameter, and the log entries as a list
2418
2419 if the job has not changed and the timeout has expired,
2420 we instead return a special value,
2421 L{constants.JOB_NOTCHANGED}, which should be interpreted
2422 as such by the clients
2423
2424 """
2425 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2426 writable=False)
2427
2428 helper = _WaitForJobChangesHelper()
2429
2430 return helper(self._GetJobPath(job_id), load_fn,
2431 fields, prev_job_info, prev_log_serial, timeout)
2432
2433 @locking.ssynchronized(_LOCK)
2434 @_RequireOpenQueue
2436 """Cancels a job.
2437
2438 This will only succeed if the job has not started yet.
2439
2440 @type job_id: int
2441 @param job_id: job ID of job to be cancelled.
2442
2443 """
2444 logging.info("Cancelling job %s", job_id)
2445
2446 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2447
2448 @locking.ssynchronized(_LOCK)
2449 @_RequireOpenQueue
2451 """Changes a job's priority.
2452
2453 @type job_id: int
2454 @param job_id: ID of the job whose priority should be changed
2455 @type priority: int
2456 @param priority: New priority
2457
2458 """
2459 logging.info("Changing priority of job %s to %s", job_id, priority)
2460
2461 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2462 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2463 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2464 (priority, allowed))
2465
2466 def fn(job):
2467 (success, msg) = job.ChangePriority(priority)
2468
2469 if success:
2470 try:
2471 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2472 except workerpool.NoSuchTask:
2473 logging.debug("Job %s is not in workerpool at this time", job.id)
2474
2475 return (success, msg)
2476
2477 return self._ModifyJobUnlocked(job_id, fn)
2478
2480 """Modifies a job.
2481
2482 @type job_id: int
2483 @param job_id: Job ID
2484 @type mod_fn: callable
2485 @param mod_fn: Modifying function, receiving job object as parameter,
2486 returning tuple of (status boolean, message string)
2487
2488 """
2489 job = self._LoadJobUnlocked(job_id)
2490 if not job:
2491 logging.debug("Job %s not found", job_id)
2492 return (False, "Job %s not found" % job_id)
2493
2494 assert job.writable, "Can't modify read-only job"
2495 assert not job.archived, "Can't modify archived job"
2496
2497 (success, msg) = mod_fn(job)
2498
2499 if success:
2500
2501
2502 self.UpdateJobUnlocked(job)
2503
2504 return (success, msg)
2505
2506 @_RequireOpenQueue
2508 """Archives jobs.
2509
2510 @type jobs: list of L{_QueuedJob}
2511 @param jobs: Job objects
2512 @rtype: int
2513 @return: Number of archived jobs
2514
2515 """
2516 archive_jobs = []
2517 rename_files = []
2518 for job in jobs:
2519 assert job.writable, "Can't archive read-only job"
2520 assert not job.archived, "Can't cancel archived job"
2521
2522 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2523 logging.debug("Job %s is not yet done", job.id)
2524 continue
2525
2526 archive_jobs.append(job)
2527
2528 old = self._GetJobPath(job.id)
2529 new = self._GetArchivedJobPath(job.id)
2530 rename_files.append((old, new))
2531
2532
2533 self._RenameFilesUnlocked(rename_files)
2534
2535 logging.debug("Successfully archived job(s) %s",
2536 utils.CommaJoin(job.id for job in archive_jobs))
2537
2538
2539
2540
2541
2542 self._UpdateQueueSizeUnlocked()
2543 return len(archive_jobs)
2544
2545 @locking.ssynchronized(_LOCK)
2546 @_RequireOpenQueue
2548 """Archives a job.
2549
2550 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2551
2552 @type job_id: int
2553 @param job_id: Job ID of job to be archived.
2554 @rtype: bool
2555 @return: Whether job was archived
2556
2557 """
2558 logging.info("Archiving job %s", job_id)
2559
2560 job = self._LoadJobUnlocked(job_id)
2561 if not job:
2562 logging.debug("Job %s not found", job_id)
2563 return False
2564
2565 return self._ArchiveJobsUnlocked([job]) == 1
2566
2567 @locking.ssynchronized(_LOCK)
2568 @_RequireOpenQueue
2570 """Archives all jobs based on age.
2571
2572 The method will archive all jobs which are older than the age
2573 parameter. For jobs that don't have an end timestamp, the start
2574 timestamp will be considered. The special '-1' age will cause
2575 archival of all jobs (that are not running or queued).
2576
2577 @type age: int
2578 @param age: the minimum age in seconds
2579
2580 """
2581 logging.info("Archiving jobs with age more than %s seconds", age)
2582
2583 now = time.time()
2584 end_time = now + timeout
2585 archived_count = 0
2586 last_touched = 0
2587
2588 all_job_ids = self._GetJobIDsUnlocked()
2589 pending = []
2590 for idx, job_id in enumerate(all_job_ids):
2591 last_touched = idx + 1
2592
2593
2594
2595
2596 if time.time() > end_time:
2597 break
2598
2599
2600 job = self._LoadJobUnlocked(job_id)
2601 if job:
2602 if job.end_timestamp is None:
2603 if job.start_timestamp is None:
2604 job_age = job.received_timestamp
2605 else:
2606 job_age = job.start_timestamp
2607 else:
2608 job_age = job.end_timestamp
2609
2610 if age == -1 or now - job_age[0] > age:
2611 pending.append(job)
2612
2613
2614 if len(pending) >= 10:
2615 archived_count += self._ArchiveJobsUnlocked(pending)
2616 pending = []
2617
2618 if pending:
2619 archived_count += self._ArchiveJobsUnlocked(pending)
2620
2621 return (archived_count, len(all_job_ids) - last_touched)
2622
2623 - def _Query(self, fields, qfilter):
2624 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2625 namefield="id")
2626
2627
2628
2629
2630 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2631
2632 job_ids = qobj.RequestedNames()
2633
2634 list_all = (job_ids is None)
2635
2636 if list_all:
2637
2638
2639 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2640
2641 jobs = []
2642
2643 for job_id in job_ids:
2644 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2645 if job is not None or not list_all:
2646 jobs.append((job_id, job))
2647
2648 return (qobj, jobs, list_all)
2649
2651 """Returns a list of jobs in queue.
2652
2653 @type fields: sequence
2654 @param fields: List of wanted fields
2655 @type qfilter: None or query2 filter (list)
2656 @param qfilter: Query filter
2657
2658 """
2659 (qobj, ctx, _) = self._Query(fields, qfilter)
2660
2661 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2662
2664 """Returns a list of jobs in queue.
2665
2666 @type job_ids: list
2667 @param job_ids: sequence of job identifiers or None for all
2668 @type fields: list
2669 @param fields: names of fields to return
2670 @rtype: list
2671 @return: list one element per job, each element being list with
2672 the requested fields
2673
2674 """
2675
2676 job_ids = [int(jid) for jid in job_ids]
2677 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2678
2679 (qobj, ctx, _) = self._Query(fields, qfilter)
2680
2681 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2682
2683 @locking.ssynchronized(_LOCK)
2685 """Prepare to stop the job queue.
2686
2687 Disables execution of jobs in the workerpool and returns whether there are
2688 any jobs currently running. If the latter is the case, the job queue is not
2689 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2690 be called without interfering with any job. Queued and unfinished jobs will
2691 be resumed next time.
2692
2693 Once this function has been called no new job submissions will be accepted
2694 (see L{_RequireNonDrainedQueue}).
2695
2696 @rtype: bool
2697 @return: Whether there are any running jobs
2698
2699 """
2700 if self._accepting_jobs:
2701 self._accepting_jobs = False
2702
2703
2704 self._wpool.SetActive(False)
2705
2706 return self._wpool.HasRunningTasks()
2707
2709 """Returns whether jobs are accepted.
2710
2711 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2712 queue is shutting down.
2713
2714 @rtype: bool
2715
2716 """
2717 return self._accepting_jobs
2718
2719 @locking.ssynchronized(_LOCK)
2720 @_RequireOpenQueue
2722 """Stops the job queue.
2723
2724 This shutdowns all the worker threads an closes the queue.
2725
2726 """
2727 self._wpool.TerminateWorkers()
2728
2729 self._queue_filelock.Close()
2730 self._queue_filelock = None
2731