1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28 processing jobs
29
30 """
31
32 import os
33 import logging
34 import errno
35 import re
36 import time
37 import weakref
38
39 try:
40
41 from pyinotify import pyinotify
42 except ImportError:
43 import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59
60
61 JOBQUEUE_THREADS = 25
62 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63
64
65 _LOCK = "_lock"
66 _QUEUE = "_queue"
70 """Special exception to cancel a job.
71
72 """
73
76 """Returns the current timestamp.
77
78 @rtype: tuple
79 @return: the current time in the (seconds, microseconds) format
80
81 """
82 return utils.SplitTime(time.time())
83
86 """Encapsulates an opcode object.
87
88 @ivar log: holds the execution log and consists of tuples
89 of the form C{(log_serial, timestamp, level, message)}
90 @ivar input: the OpCode we encapsulate
91 @ivar status: the current status
92 @ivar result: the result of the LU execution
93 @ivar start_timestamp: timestamp for the start of the execution
94 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95 @ivar stop_timestamp: timestamp for the end of the execution
96
97 """
98 __slots__ = ["input", "status", "result", "log", "priority",
99 "start_timestamp", "exec_timestamp", "end_timestamp",
100 "__weakref__"]
101
103 """Constructor for the _QuededOpCode.
104
105 @type op: L{opcodes.OpCode}
106 @param op: the opcode we encapsulate
107
108 """
109 self.input = op
110 self.status = constants.OP_STATUS_QUEUED
111 self.result = None
112 self.log = []
113 self.start_timestamp = None
114 self.exec_timestamp = None
115 self.end_timestamp = None
116
117
118 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119
120 @classmethod
122 """Restore the _QueuedOpCode from the serialized form.
123
124 @type state: dict
125 @param state: the serialized state
126 @rtype: _QueuedOpCode
127 @return: a new _QueuedOpCode instance
128
129 """
130 obj = _QueuedOpCode.__new__(cls)
131 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132 obj.status = state["status"]
133 obj.result = state["result"]
134 obj.log = state["log"]
135 obj.start_timestamp = state.get("start_timestamp", None)
136 obj.exec_timestamp = state.get("exec_timestamp", None)
137 obj.end_timestamp = state.get("end_timestamp", None)
138 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139 return obj
140
142 """Serializes this _QueuedOpCode.
143
144 @rtype: dict
145 @return: the dictionary holding the serialized state
146
147 """
148 return {
149 "input": self.input.__getstate__(),
150 "status": self.status,
151 "result": self.result,
152 "log": self.log,
153 "start_timestamp": self.start_timestamp,
154 "exec_timestamp": self.exec_timestamp,
155 "end_timestamp": self.end_timestamp,
156 "priority": self.priority,
157 }
158
161 """In-memory job representation.
162
163 This is what we use to track the user-submitted jobs. Locking must
164 be taken care of by users of this class.
165
166 @type queue: L{JobQueue}
167 @ivar queue: the parent queue
168 @ivar id: the job ID
169 @type ops: list
170 @ivar ops: the list of _QueuedOpCode that constitute the job
171 @type log_serial: int
172 @ivar log_serial: holds the index for the next log entry
173 @ivar received_timestamp: the timestamp for when the job was received
174 @ivar start_timestmap: the timestamp for start of execution
175 @ivar end_timestamp: the timestamp for end of execution
176
177 """
178
179 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180 "received_timestamp", "start_timestamp", "end_timestamp",
181 "__weakref__"]
182
183 - def __init__(self, queue, job_id, ops):
184 """Constructor for the _QueuedJob.
185
186 @type queue: L{JobQueue}
187 @param queue: our parent queue
188 @type job_id: job_id
189 @param job_id: our job id
190 @type ops: list
191 @param ops: the list of opcodes we hold, which will be encapsulated
192 in _QueuedOpCodes
193
194 """
195 if not ops:
196 raise errors.GenericError("A job needs at least one opcode")
197
198 self.queue = queue
199 self.id = job_id
200 self.ops = [_QueuedOpCode(op) for op in ops]
201 self.log_serial = 0
202 self.received_timestamp = TimeStampNow()
203 self.start_timestamp = None
204 self.end_timestamp = None
205
206 self._InitInMemory(self)
207
208 @staticmethod
210 """Initializes in-memory variables.
211
212 """
213 obj.ops_iter = None
214 obj.cur_opctx = None
215
217 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218 "id=%s" % self.id,
219 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220
221 return "<%s at %#x>" % (" ".join(status), id(self))
222
223 @classmethod
225 """Restore a _QueuedJob from serialized state:
226
227 @type queue: L{JobQueue}
228 @param queue: to which queue the restored job belongs
229 @type state: dict
230 @param state: the serialized state
231 @rtype: _JobQueue
232 @return: the restored _JobQueue instance
233
234 """
235 obj = _QueuedJob.__new__(cls)
236 obj.queue = queue
237 obj.id = state["id"]
238 obj.received_timestamp = state.get("received_timestamp", None)
239 obj.start_timestamp = state.get("start_timestamp", None)
240 obj.end_timestamp = state.get("end_timestamp", None)
241
242 obj.ops = []
243 obj.log_serial = 0
244 for op_state in state["ops"]:
245 op = _QueuedOpCode.Restore(op_state)
246 for log_entry in op.log:
247 obj.log_serial = max(obj.log_serial, log_entry[0])
248 obj.ops.append(op)
249
250 cls._InitInMemory(obj)
251
252 return obj
253
255 """Serialize the _JobQueue instance.
256
257 @rtype: dict
258 @return: the serialized state
259
260 """
261 return {
262 "id": self.id,
263 "ops": [op.Serialize() for op in self.ops],
264 "start_timestamp": self.start_timestamp,
265 "end_timestamp": self.end_timestamp,
266 "received_timestamp": self.received_timestamp,
267 }
268
321
323 """Gets the current priority for this job.
324
325 Only unfinished opcodes are considered. When all are done, the default
326 priority is used.
327
328 @rtype: int
329
330 """
331 priorities = [op.priority for op in self.ops
332 if op.status not in constants.OPS_FINALIZED]
333
334 if not priorities:
335
336 return constants.OP_PRIO_DEFAULT
337
338 return min(priorities)
339
341 """Selectively returns the log entries.
342
343 @type newer_than: None or int
344 @param newer_than: if this is None, return all log entries,
345 otherwise return only the log entries with serial higher
346 than this value
347 @rtype: list
348 @return: the list of the log entries selected
349
350 """
351 if newer_than is None:
352 serial = -1
353 else:
354 serial = newer_than
355
356 entries = []
357 for op in self.ops:
358 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359
360 return entries
361
363 """Returns information about a job.
364
365 @type fields: list
366 @param fields: names of fields to return
367 @rtype: list
368 @return: list with one element for each field
369 @raise errors.OpExecError: when an invalid field
370 has been passed
371
372 """
373 row = []
374 for fname in fields:
375 if fname == "id":
376 row.append(self.id)
377 elif fname == "status":
378 row.append(self.CalcStatus())
379 elif fname == "priority":
380 row.append(self.CalcPriority())
381 elif fname == "ops":
382 row.append([op.input.__getstate__() for op in self.ops])
383 elif fname == "opresult":
384 row.append([op.result for op in self.ops])
385 elif fname == "opstatus":
386 row.append([op.status for op in self.ops])
387 elif fname == "oplog":
388 row.append([op.log for op in self.ops])
389 elif fname == "opstart":
390 row.append([op.start_timestamp for op in self.ops])
391 elif fname == "opexec":
392 row.append([op.exec_timestamp for op in self.ops])
393 elif fname == "opend":
394 row.append([op.end_timestamp for op in self.ops])
395 elif fname == "oppriority":
396 row.append([op.priority for op in self.ops])
397 elif fname == "received_ts":
398 row.append(self.received_timestamp)
399 elif fname == "start_ts":
400 row.append(self.start_timestamp)
401 elif fname == "end_ts":
402 row.append(self.end_timestamp)
403 elif fname == "summary":
404 row.append([op.input.Summary() for op in self.ops])
405 else:
406 raise errors.OpExecError("Invalid self query field '%s'" % fname)
407 return row
408
410 """Mark unfinished opcodes with a given status and result.
411
412 This is an utility function for marking all running or waiting to
413 be run opcodes with a given status. Opcodes which are already
414 finalised are not changed.
415
416 @param status: a given opcode status
417 @param result: the opcode result
418
419 """
420 not_marked = True
421 for op in self.ops:
422 if op.status in constants.OPS_FINALIZED:
423 assert not_marked, "Finalized opcodes found after non-finalized ones"
424 continue
425 op.status = status
426 op.result = result
427 not_marked = False
428
430 """Marks the job as finalized.
431
432 """
433 self.end_timestamp = TimeStampNow()
434
459
463 """Initializes this class.
464
465 @type queue: L{JobQueue}
466 @param queue: Job queue
467 @type job: L{_QueuedJob}
468 @param job: Job object
469 @type op: L{_QueuedOpCode}
470 @param op: OpCode
471
472 """
473 assert queue, "Queue is missing"
474 assert job, "Job is missing"
475 assert op, "Opcode is missing"
476
477 self._queue = queue
478 self._job = job
479 self._op = op
480
482 """Raises an exception to cancel the job if asked to.
483
484 """
485
486 if self._op.status == constants.OP_STATUS_CANCELING:
487 logging.debug("Canceling opcode")
488 raise CancelJob()
489
490 @locking.ssynchronized(_QUEUE, shared=1)
492 """Mark the opcode as running, not lock-waiting.
493
494 This is called from the mcpu code as a notifier function, when the LU is
495 finally about to start the Exec() method. Of course, to have end-user
496 visible results, the opcode must be initially (before calling into
497 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
498
499 """
500 assert self._op in self._job.ops
501 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
502 constants.OP_STATUS_CANCELING)
503
504
505 self._CheckCancel()
506
507 logging.debug("Opcode is now running")
508
509 self._op.status = constants.OP_STATUS_RUNNING
510 self._op.exec_timestamp = TimeStampNow()
511
512
513 self._queue.UpdateJobUnlocked(self._job)
514
515 @locking.ssynchronized(_QUEUE, shared=1)
517 """Internal feedback append function, with locks
518
519 """
520 self._job.log_serial += 1
521 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522 self._queue.UpdateJobUnlocked(self._job, replicate=False)
523
540
550
553 - def __init__(self, fields, prev_job_info, prev_log_serial):
554 """Initializes this class.
555
556 @type fields: list of strings
557 @param fields: Fields requested by LUXI client
558 @type prev_job_info: string
559 @param prev_job_info: previous job info, as passed by the LUXI client
560 @type prev_log_serial: string
561 @param prev_log_serial: previous job serial, as passed by the LUXI client
562
563 """
564 self._fields = fields
565 self._prev_job_info = prev_job_info
566 self._prev_log_serial = prev_log_serial
567
602
606 """Initializes this class.
607
608 @type filename: string
609 @param filename: Path to job file
610 @raises errors.InotifyError: if the notifier cannot be setup
611
612 """
613 self._wm = pyinotify.WatchManager()
614 self._inotify_handler = \
615 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
616 self._notifier = \
617 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
618 try:
619 self._inotify_handler.enable()
620 except Exception:
621
622 self._notifier.stop()
623 raise
624
626 """Callback for inotify.
627
628 """
629 if not notifier_enabled:
630 self._inotify_handler.enable()
631
632 - def Wait(self, timeout):
633 """Waits for the job file to change.
634
635 @type timeout: float
636 @param timeout: Timeout in seconds
637 @return: Whether there have been events
638
639 """
640 assert timeout >= 0
641 have_events = self._notifier.check_events(timeout * 1000)
642 if have_events:
643 self._notifier.read_events()
644 self._notifier.process_events()
645 return have_events
646
648 """Closes underlying notifier and its file descriptor.
649
650 """
651 self._notifier.stop()
652
656 """Initializes this class.
657
658 @type filename: string
659 @param filename: Path to job file
660
661 """
662 self._filewaiter = None
663 self._filename = filename
664
665 - def Wait(self, timeout):
666 """Waits for a job to change.
667
668 @type timeout: float
669 @param timeout: Timeout in seconds
670 @return: Whether there have been events
671
672 """
673 if self._filewaiter:
674 return self._filewaiter.Wait(timeout)
675
676
677
678
679
680 self._filewaiter = _JobFileChangesWaiter(self._filename)
681
682 return True
683
685 """Closes underlying waiter.
686
687 """
688 if self._filewaiter:
689 self._filewaiter.Close()
690
693 """Helper class using inotify to wait for changes in a job file.
694
695 This class takes a previous job status and serial, and alerts the client when
696 the current job status has changed.
697
698 """
699 @staticmethod
701 job = job_load_fn()
702 if not job:
703 raise errors.JobLost()
704
705 result = check_fn(job)
706 if result is None:
707 raise utils.RetryAgain()
708
709 return result
710
711 - def __call__(self, filename, job_load_fn,
712 fields, prev_job_info, prev_log_serial, timeout):
713 """Waits for changes on a job.
714
715 @type filename: string
716 @param filename: File on which to wait for changes
717 @type job_load_fn: callable
718 @param job_load_fn: Function to load job
719 @type fields: list of strings
720 @param fields: Which fields to check for changes
721 @type prev_job_info: list or None
722 @param prev_job_info: Last job information returned
723 @type prev_log_serial: int
724 @param prev_log_serial: Last job message serial number
725 @type timeout: float
726 @param timeout: maximum time to wait in seconds
727
728 """
729 try:
730 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
731 waiter = _JobChangesWaiter(filename)
732 try:
733 return utils.Retry(compat.partial(self._CheckForChanges,
734 job_load_fn, check_fn),
735 utils.RETRY_REMAINING_TIME, timeout,
736 wait_fn=waiter.Wait)
737 finally:
738 waiter.Close()
739 except (errors.InotifyError, errors.JobLost):
740 return None
741 except utils.RetryTimeout:
742 return constants.JOB_NOTCHANGED
743
755
759 """Initializes this class.
760
761 """
762 self._fn = fn
763 self._next = None
764
766 """Gets the next timeout if necessary.
767
768 """
769 if self._next is None:
770 self._next = self._fn()
771
773 """Returns the next timeout.
774
775 """
776 self._Advance()
777 return self._next
778
780 """Returns the current timeout and advances the internal state.
781
782 """
783 self._Advance()
784 result = self._next
785 self._next = None
786 return result
787
788
789 -class _OpExecContext:
790 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
791 """Initializes this class.
792
793 """
794 self.op = op
795 self.index = index
796 self.log_prefix = log_prefix
797 self.summary = op.input.Summary()
798
799 self._timeout_strategy_factory = timeout_strategy_factory
800 self._ResetTimeoutStrategy()
801
803 """Creates a new timeout strategy.
804
805 """
806 self._timeout_strategy = \
807 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
808
810 """Checks whether priority can and should be increased.
811
812 Called when locks couldn't be acquired.
813
814 """
815 op = self.op
816
817
818
819 if (self._timeout_strategy.Peek() is None and
820 op.priority > constants.OP_PRIO_HIGHEST):
821 logging.debug("Increasing priority")
822 op.priority -= 1
823 self._ResetTimeoutStrategy()
824 return True
825
826 return False
827
829 """Returns the next lock acquire timeout.
830
831 """
832 return self._timeout_strategy.Next()
833
838 """Initializes this class.
839
840 """
841 self.queue = queue
842 self.opexec_fn = opexec_fn
843 self.job = job
844 self._timeout_strategy_factory = _timeout_strategy_factory
845
846 @staticmethod
848 """Locates the next opcode to run.
849
850 @type job: L{_QueuedJob}
851 @param job: Job object
852 @param timeout_strategy_factory: Callable to create new timeout strategy
853
854 """
855
856
857
858
859 if job.ops_iter is None:
860 job.ops_iter = enumerate(job.ops)
861
862
863 while True:
864 try:
865 (idx, op) = job.ops_iter.next()
866 except StopIteration:
867 raise errors.ProgrammerError("Called for a finished job")
868
869 if op.status == constants.OP_STATUS_RUNNING:
870
871 raise errors.ProgrammerError("Called for job marked as running")
872
873 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
874 timeout_strategy_factory)
875
876 if op.status not in constants.OPS_FINALIZED:
877 return opctx
878
879
880
881
882
883 logging.info("%s: opcode %s already processed, skipping",
884 opctx.log_prefix, opctx.summary)
885
886 @staticmethod
888 """Marks an opcode as waiting for locks.
889
890 The job's start timestamp is also set if necessary.
891
892 @type job: L{_QueuedJob}
893 @param job: Job object
894 @type op: L{_QueuedOpCode}
895 @param op: Opcode object
896
897 """
898 assert op in job.ops
899 assert op.status in (constants.OP_STATUS_QUEUED,
900 constants.OP_STATUS_WAITLOCK)
901
902 update = False
903
904 op.result = None
905
906 if op.status == constants.OP_STATUS_QUEUED:
907 op.status = constants.OP_STATUS_WAITLOCK
908 update = True
909
910 if op.start_timestamp is None:
911 op.start_timestamp = TimeStampNow()
912 update = True
913
914 if job.start_timestamp is None:
915 job.start_timestamp = op.start_timestamp
916 update = True
917
918 assert op.status == constants.OP_STATUS_WAITLOCK
919
920 return update
921
923 """Processes one opcode and returns the result.
924
925 """
926 op = opctx.op
927
928 assert op.status == constants.OP_STATUS_WAITLOCK
929
930 timeout = opctx.GetNextLockTimeout()
931
932 try:
933
934 result = self.opexec_fn(op.input,
935 _OpExecCallbacks(self.queue, self.job, op),
936 timeout=timeout, priority=op.priority)
937 except mcpu.LockAcquireTimeout:
938 assert timeout is not None, "Received timeout for blocking acquire"
939 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
940
941 assert op.status in (constants.OP_STATUS_WAITLOCK,
942 constants.OP_STATUS_CANCELING)
943
944
945 if op.status == constants.OP_STATUS_CANCELING:
946 return (constants.OP_STATUS_CANCELING, None)
947
948
949 return (constants.OP_STATUS_WAITLOCK, None)
950 except CancelJob:
951 logging.exception("%s: Canceling job", opctx.log_prefix)
952 assert op.status == constants.OP_STATUS_CANCELING
953 return (constants.OP_STATUS_CANCELING, None)
954 except Exception, err:
955 logging.exception("%s: Caught exception in %s",
956 opctx.log_prefix, opctx.summary)
957 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
958 else:
959 logging.debug("%s: %s successful",
960 opctx.log_prefix, opctx.summary)
961 return (constants.OP_STATUS_SUCCESS, result)
962
964 """Continues execution of a job.
965
966 @param _nextop_fn: Callback function for tests
967 @rtype: bool
968 @return: True if job is finished, False if processor needs to be called
969 again
970
971 """
972 queue = self.queue
973 job = self.job
974
975 logging.debug("Processing job %s", job.id)
976
977 queue.acquire(shared=1)
978 try:
979 opcount = len(job.ops)
980
981
982 if job.CalcStatus() in constants.JOBS_FINALIZED:
983 return True
984
985
986 if job.cur_opctx:
987 opctx = job.cur_opctx
988 job.cur_opctx = None
989 else:
990 if __debug__ and _nextop_fn:
991 _nextop_fn()
992 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
993
994 op = opctx.op
995
996
997 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
998 constants.OP_STATUS_CANCELING)
999 for i in job.ops[opctx.index + 1:])
1000
1001 assert op.status in (constants.OP_STATUS_QUEUED,
1002 constants.OP_STATUS_WAITLOCK,
1003 constants.OP_STATUS_CANCELING)
1004
1005 assert (op.priority <= constants.OP_PRIO_LOWEST and
1006 op.priority >= constants.OP_PRIO_HIGHEST)
1007
1008 if op.status != constants.OP_STATUS_CANCELING:
1009 assert op.status in (constants.OP_STATUS_QUEUED,
1010 constants.OP_STATUS_WAITLOCK)
1011
1012
1013 if self._MarkWaitlock(job, op):
1014
1015 queue.UpdateJobUnlocked(job)
1016
1017 assert op.status == constants.OP_STATUS_WAITLOCK
1018 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1019 assert job.start_timestamp and op.start_timestamp
1020
1021 logging.info("%s: opcode %s waiting for locks",
1022 opctx.log_prefix, opctx.summary)
1023
1024 queue.release()
1025 try:
1026 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1027 finally:
1028 queue.acquire(shared=1)
1029
1030 op.status = op_status
1031 op.result = op_result
1032
1033 if op.status == constants.OP_STATUS_WAITLOCK:
1034
1035 assert not op.end_timestamp
1036 else:
1037
1038 op.end_timestamp = TimeStampNow()
1039
1040 if op.status == constants.OP_STATUS_CANCELING:
1041 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1042 for i in job.ops[opctx.index:])
1043 else:
1044 assert op.status in constants.OPS_FINALIZED
1045
1046 if op.status == constants.OP_STATUS_WAITLOCK:
1047 finalize = False
1048
1049 if opctx.CheckPriorityIncrease():
1050
1051 queue.UpdateJobUnlocked(job)
1052
1053
1054 job.cur_opctx = opctx
1055
1056 assert (op.priority <= constants.OP_PRIO_LOWEST and
1057 op.priority >= constants.OP_PRIO_HIGHEST)
1058
1059
1060 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1061
1062 else:
1063
1064 assert (opctx.index == 0 or
1065 compat.all(i.status == constants.OP_STATUS_SUCCESS
1066 for i in job.ops[:opctx.index]))
1067
1068
1069 job.cur_opctx = None
1070
1071 if op.status == constants.OP_STATUS_SUCCESS:
1072 finalize = False
1073
1074 elif op.status == constants.OP_STATUS_ERROR:
1075
1076 assert errors.GetEncodedError(job.ops[opctx.index].result)
1077
1078 to_encode = errors.OpExecError("Preceding opcode failed")
1079 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1080 _EncodeOpError(to_encode))
1081 finalize = True
1082
1083
1084 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1085 errors.GetEncodedError(i.result)
1086 for i in job.ops[opctx.index:])
1087
1088 elif op.status == constants.OP_STATUS_CANCELING:
1089 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1090 "Job canceled by request")
1091 finalize = True
1092
1093 else:
1094 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1095
1096 if opctx.index == (opcount - 1):
1097
1098 finalize = True
1099
1100 if finalize:
1101
1102 job.Finalize()
1103
1104
1105
1106 queue.UpdateJobUnlocked(job)
1107
1108 if finalize:
1109 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1110 return True
1111
1112 return False
1113 finally:
1114 queue.release()
1115
1118 """The actual job workers.
1119
1120 """
1122 """Job executor.
1123
1124 This functions processes a job. It is closely tied to the L{_QueuedJob} and
1125 L{_QueuedOpCode} classes.
1126
1127 @type job: L{_QueuedJob}
1128 @param job: the job to be processed
1129
1130 """
1131 queue = job.queue
1132 assert queue == self.pool.queue
1133
1134 self.SetTaskName("Job%s" % job.id)
1135
1136 proc = mcpu.Processor(queue.context, job.id)
1137
1138 if not _JobProcessor(queue, proc.ExecOpCode, job)():
1139
1140 raise workerpool.DeferTask(priority=job.CalcPriority())
1141
1144 """Simple class implementing a job-processing workerpool.
1145
1146 """
1152
1155 """Decorator for "public" functions.
1156
1157 This function should be used for all 'public' functions. That is,
1158 functions usually called from other classes. Note that this should
1159 be applied only to methods (not plain functions), since it expects
1160 that the decorated function is called with a first argument that has
1161 a '_queue_filelock' argument.
1162
1163 @warning: Use this decorator only after locking.ssynchronized
1164
1165 Example::
1166 @locking.ssynchronized(_LOCK)
1167 @_RequireOpenQueue
1168 def Example(self):
1169 pass
1170
1171 """
1172 def wrapper(self, *args, **kwargs):
1173
1174 assert self._queue_filelock is not None, "Queue should be open"
1175 return fn(self, *args, **kwargs)
1176 return wrapper
1177
1180 """Queue used to manage the jobs.
1181
1182 @cvar _RE_JOB_FILE: regex matching the valid job file names
1183
1184 """
1185 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1186
1188 """Constructor for JobQueue.
1189
1190 The constructor will initialize the job queue object and then
1191 start loading the current jobs from disk, either for starting them
1192 (if they were queue) or for aborting them (if they were already
1193 running).
1194
1195 @type context: GanetiContext
1196 @param context: the context object for access to the configuration
1197 data and other ganeti objects
1198
1199 """
1200 self.context = context
1201 self._memcache = weakref.WeakValueDictionary()
1202 self._my_hostname = netutils.Hostname.GetSysName()
1203
1204
1205
1206
1207
1208
1209 self._lock = locking.SharedLock("JobQueue")
1210
1211 self.acquire = self._lock.acquire
1212 self.release = self._lock.release
1213
1214
1215
1216 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1217
1218
1219 self._last_serial = jstore.ReadSerial()
1220 assert self._last_serial is not None, ("Serial file was modified between"
1221 " check in jstore and here")
1222
1223
1224 self._nodes = dict((n.name, n.primary_ip)
1225 for n in self.context.cfg.GetAllNodesInfo().values()
1226 if n.master_candidate)
1227
1228
1229 self._nodes.pop(self._my_hostname, None)
1230
1231
1232
1233 self._queue_size = 0
1234 self._UpdateQueueSizeUnlocked()
1235 self._drained = self._IsQueueMarkedDrain()
1236
1237
1238 self._wpool = _JobQueueWorkerPool(self)
1239 try:
1240 self._InspectQueue()
1241 except:
1242 self._wpool.TerminateWorkers()
1243 raise
1244
1245 @locking.ssynchronized(_LOCK)
1246 @_RequireOpenQueue
1248 """Loads the whole job queue and resumes unfinished jobs.
1249
1250 This function needs the lock here because WorkerPool.AddTask() may start a
1251 job while we're still doing our work.
1252
1253 """
1254 logging.info("Inspecting job queue")
1255
1256 restartjobs = []
1257
1258 all_job_ids = self._GetJobIDsUnlocked()
1259 jobs_count = len(all_job_ids)
1260 lastinfo = time.time()
1261 for idx, job_id in enumerate(all_job_ids):
1262
1263 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1264 idx == (jobs_count - 1)):
1265 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1266 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1267 lastinfo = time.time()
1268
1269 job = self._LoadJobUnlocked(job_id)
1270
1271
1272 if job is None:
1273 continue
1274
1275 status = job.CalcStatus()
1276
1277 if status == constants.JOB_STATUS_QUEUED:
1278 restartjobs.append(job)
1279
1280 elif status in (constants.JOB_STATUS_RUNNING,
1281 constants.JOB_STATUS_WAITLOCK,
1282 constants.JOB_STATUS_CANCELING):
1283 logging.warning("Unfinished job %s found: %s", job.id, job)
1284
1285 if status == constants.JOB_STATUS_WAITLOCK:
1286
1287 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1288 restartjobs.append(job)
1289 else:
1290 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1291 "Unclean master daemon shutdown")
1292 job.Finalize()
1293
1294 self.UpdateJobUnlocked(job)
1295
1296 if restartjobs:
1297 logging.info("Restarting %s jobs", len(restartjobs))
1298 self._EnqueueJobs(restartjobs)
1299
1300 logging.info("Job queue inspection finished")
1301
1302 @locking.ssynchronized(_LOCK)
1303 @_RequireOpenQueue
1305 """Register a new node with the queue.
1306
1307 @type node: L{objects.Node}
1308 @param node: the node object to be added
1309
1310 """
1311 node_name = node.name
1312 assert node_name != self._my_hostname
1313
1314
1315 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1316 msg = result.fail_msg
1317 if msg:
1318 logging.warning("Cannot cleanup queue directory on node %s: %s",
1319 node_name, msg)
1320
1321 if not node.master_candidate:
1322
1323 self._nodes.pop(node_name, None)
1324
1325 return
1326
1327
1328 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1329
1330
1331 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1332
1333 for file_name in files:
1334
1335 content = utils.ReadFile(file_name)
1336
1337 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1338 [node.primary_ip],
1339 file_name, content)
1340 msg = result[node_name].fail_msg
1341 if msg:
1342 logging.error("Failed to upload file %s to node %s: %s",
1343 file_name, node_name, msg)
1344
1345 self._nodes[node_name] = node.primary_ip
1346
1347 @locking.ssynchronized(_LOCK)
1348 @_RequireOpenQueue
1350 """Callback called when removing nodes from the cluster.
1351
1352 @type node_name: str
1353 @param node_name: the name of the node to remove
1354
1355 """
1356 self._nodes.pop(node_name, None)
1357
1358 @staticmethod
1360 """Verifies the status of an RPC call.
1361
1362 Since we aim to keep consistency should this node (the current
1363 master) fail, we will log errors if our rpc fail, and especially
1364 log the case when more than half of the nodes fails.
1365
1366 @param result: the data as returned from the rpc call
1367 @type nodes: list
1368 @param nodes: the list of nodes we made the call to
1369 @type failmsg: str
1370 @param failmsg: the identifier to be used for logging
1371
1372 """
1373 failed = []
1374 success = []
1375
1376 for node in nodes:
1377 msg = result[node].fail_msg
1378 if msg:
1379 failed.append(node)
1380 logging.error("RPC call %s (%s) failed on node %s: %s",
1381 result[node].call, failmsg, node, msg)
1382 else:
1383 success.append(node)
1384
1385
1386 if (len(success) + 1) < len(failed):
1387
1388 logging.error("More than half of the nodes failed")
1389
1391 """Helper for returning the node name/ip list.
1392
1393 @rtype: (list, list)
1394 @return: a tuple of two lists, the first one with the node
1395 names and the second one with the node addresses
1396
1397 """
1398
1399 name_list = self._nodes.keys()
1400 addr_list = [self._nodes[name] for name in name_list]
1401 return name_list, addr_list
1402
1404 """Writes a file locally and then replicates it to all nodes.
1405
1406 This function will replace the contents of a file on the local
1407 node and then replicate it to all the other nodes we have.
1408
1409 @type file_name: str
1410 @param file_name: the path of the file to be replicated
1411 @type data: str
1412 @param data: the new contents of the file
1413 @type replicate: boolean
1414 @param replicate: whether to spread the changes to the remote nodes
1415
1416 """
1417 getents = runtime.GetEnts()
1418 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1419 gid=getents.masterd_gid)
1420
1421 if replicate:
1422 names, addrs = self._GetNodeIp()
1423 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1424 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1425
1427 """Renames a file locally and then replicate the change.
1428
1429 This function will rename a file in the local queue directory
1430 and then replicate this rename to all the other nodes we have.
1431
1432 @type rename: list of (old, new)
1433 @param rename: List containing tuples mapping old to new names
1434
1435 """
1436
1437 for old, new in rename:
1438 utils.RenameFile(old, new, mkdir=True)
1439
1440
1441 names, addrs = self._GetNodeIp()
1442 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1443 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1444
1445 @staticmethod
1465
1466 @classmethod
1468 """Returns the archive directory for a job.
1469
1470 @type job_id: str
1471 @param job_id: Job identifier
1472 @rtype: str
1473 @return: Directory name
1474
1475 """
1476 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1477
1479 """Generates a new job identifier.
1480
1481 Job identifiers are unique during the lifetime of a cluster.
1482
1483 @type count: integer
1484 @param count: how many serials to return
1485 @rtype: str
1486 @return: a string representing the job identifier.
1487
1488 """
1489 assert count > 0
1490
1491 serial = self._last_serial + count
1492
1493
1494 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1495 "%s\n" % serial, True)
1496
1497 result = [self._FormatJobID(v)
1498 for v in range(self._last_serial + 1, serial + 1)]
1499
1500
1501 self._last_serial = serial
1502
1503 assert len(result) == count
1504
1505 return result
1506
1507 @staticmethod
1509 """Returns the job file for a given job id.
1510
1511 @type job_id: str
1512 @param job_id: the job identifier
1513 @rtype: str
1514 @return: the path to the job file
1515
1516 """
1517 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1518
1519 @classmethod
1521 """Returns the archived job file for a give job id.
1522
1523 @type job_id: str
1524 @param job_id: the job identifier
1525 @rtype: str
1526 @return: the path to the archived job file
1527
1528 """
1529 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1530 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1531
1533 """Return all known job IDs.
1534
1535 The method only looks at disk because it's a requirement that all
1536 jobs are present on disk (so in the _memcache we don't have any
1537 extra IDs).
1538
1539 @type sort: boolean
1540 @param sort: perform sorting on the returned job ids
1541 @rtype: list
1542 @return: the list of job IDs
1543
1544 """
1545 jlist = []
1546 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1547 m = self._RE_JOB_FILE.match(filename)
1548 if m:
1549 jlist.append(m.group(1))
1550 if sort:
1551 jlist = utils.NiceSort(jlist)
1552 return jlist
1553
1555 """Loads a job from the disk or memory.
1556
1557 Given a job id, this will return the cached job object if
1558 existing, or try to load the job from the disk. If loading from
1559 disk, it will also add the job to the cache.
1560
1561 @param job_id: the job id
1562 @rtype: L{_QueuedJob} or None
1563 @return: either None or the job object
1564
1565 """
1566 job = self._memcache.get(job_id, None)
1567 if job:
1568 logging.debug("Found job %s in memcache", job_id)
1569 return job
1570
1571 try:
1572 job = self._LoadJobFromDisk(job_id)
1573 if job is None:
1574 return job
1575 except errors.JobFileCorrupted:
1576 old_path = self._GetJobPath(job_id)
1577 new_path = self._GetArchivedJobPath(job_id)
1578 if old_path == new_path:
1579
1580 logging.exception("Can't parse job %s", job_id)
1581 else:
1582
1583 logging.exception("Can't parse job %s, will archive.", job_id)
1584 self._RenameFilesUnlocked([(old_path, new_path)])
1585 return None
1586
1587 self._memcache[job_id] = job
1588 logging.debug("Added job %s to the cache", job_id)
1589 return job
1590
1592 """Load the given job file from disk.
1593
1594 Given a job file, read, load and restore it in a _QueuedJob format.
1595
1596 @type job_id: string
1597 @param job_id: job identifier
1598 @rtype: L{_QueuedJob} or None
1599 @return: either None or the job object
1600
1601 """
1602 filepath = self._GetJobPath(job_id)
1603 logging.debug("Loading job from %s", filepath)
1604 try:
1605 raw_data = utils.ReadFile(filepath)
1606 except EnvironmentError, err:
1607 if err.errno in (errno.ENOENT, ):
1608 return None
1609 raise
1610
1611 try:
1612 data = serializer.LoadJson(raw_data)
1613 job = _QueuedJob.Restore(self, data)
1614 except Exception, err:
1615 raise errors.JobFileCorrupted(err)
1616
1617 return job
1618
1620 """Load the given job file from disk.
1621
1622 Given a job file, read, load and restore it in a _QueuedJob format.
1623 In case of error reading the job, it gets returned as None, and the
1624 exception is logged.
1625
1626 @type job_id: string
1627 @param job_id: job identifier
1628 @rtype: L{_QueuedJob} or None
1629 @return: either None or the job object
1630
1631 """
1632 try:
1633 return self._LoadJobFromDisk(job_id)
1634 except (errors.JobFileCorrupted, EnvironmentError):
1635 logging.exception("Can't load/parse job %s", job_id)
1636 return None
1637
1638 @staticmethod
1640 """Check if the queue is marked from drain.
1641
1642 This currently uses the queue drain file, which makes it a
1643 per-node flag. In the future this can be moved to the config file.
1644
1645 @rtype: boolean
1646 @return: True of the job queue is marked for draining
1647
1648 """
1649 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1650
1652 """Update the queue size.
1653
1654 """
1655 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1656
1657 @locking.ssynchronized(_LOCK)
1658 @_RequireOpenQueue
1677
1678 @_RequireOpenQueue
1680 """Create and store a new job.
1681
1682 This enters the job into our job queue and also puts it on the new
1683 queue, in order for it to be picked up by the queue processors.
1684
1685 @type job_id: job ID
1686 @param job_id: the job ID for the new job
1687 @type ops: list
1688 @param ops: The list of OpCodes that will become the new job.
1689 @rtype: L{_QueuedJob}
1690 @return: the job object to be queued
1691 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1692 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1693 @raise errors.GenericError: If an opcode is not valid
1694
1695 """
1696
1697
1698 if self._drained:
1699 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1700
1701 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1702 raise errors.JobQueueFull()
1703
1704 job = _QueuedJob(self, job_id, ops)
1705
1706
1707 for idx, op in enumerate(job.ops):
1708 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1709 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1710 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1711 " are %s" % (idx, op.priority, allowed))
1712
1713
1714 self.UpdateJobUnlocked(job)
1715
1716 self._queue_size += 1
1717
1718 logging.debug("Adding new job %s to the cache", job_id)
1719 self._memcache[job_id] = job
1720
1721 return job
1722
1723 @locking.ssynchronized(_LOCK)
1724 @_RequireOpenQueue
1734
1735 @locking.ssynchronized(_LOCK)
1736 @_RequireOpenQueue
1738 """Create and store multiple jobs.
1739
1740 @see: L{_SubmitJobUnlocked}
1741
1742 """
1743 results = []
1744 added_jobs = []
1745 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1746 for job_id, ops in zip(all_job_ids, jobs):
1747 try:
1748 added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1749 status = True
1750 data = job_id
1751 except errors.GenericError, err:
1752 data = str(err)
1753 status = False
1754 results.append((status, data))
1755
1756 self._EnqueueJobs(added_jobs)
1757
1758 return results
1759
1761 """Helper function to add jobs to worker pool's queue.
1762
1763 @type jobs: list
1764 @param jobs: List of all jobs
1765
1766 """
1767 self._wpool.AddManyTasks([(job, ) for job in jobs],
1768 priority=[job.CalcPriority() for job in jobs])
1769
1770 @_RequireOpenQueue
1772 """Update a job's on disk storage.
1773
1774 After a job has been modified, this function needs to be called in
1775 order to write the changes to disk and replicate them to the other
1776 nodes.
1777
1778 @type job: L{_QueuedJob}
1779 @param job: the changed job
1780 @type replicate: boolean
1781 @param replicate: whether to replicate the change to remote nodes
1782
1783 """
1784 if __debug__:
1785 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1786 assert (finalized ^ (job.end_timestamp is None))
1787
1788 filename = self._GetJobPath(job.id)
1789 data = serializer.DumpJson(job.Serialize(), indent=False)
1790 logging.debug("Writing job %s to %s", job.id, filename)
1791 self._UpdateJobQueueFile(filename, data, replicate)
1792
1793 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1794 timeout):
1795 """Waits for changes in a job.
1796
1797 @type job_id: string
1798 @param job_id: Job identifier
1799 @type fields: list of strings
1800 @param fields: Which fields to check for changes
1801 @type prev_job_info: list or None
1802 @param prev_job_info: Last job information returned
1803 @type prev_log_serial: int
1804 @param prev_log_serial: Last job message serial number
1805 @type timeout: float
1806 @param timeout: maximum time to wait in seconds
1807 @rtype: tuple (job info, log entries)
1808 @return: a tuple of the job information as required via
1809 the fields parameter, and the log entries as a list
1810
1811 if the job has not changed and the timeout has expired,
1812 we instead return a special value,
1813 L{constants.JOB_NOTCHANGED}, which should be interpreted
1814 as such by the clients
1815
1816 """
1817 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1818
1819 helper = _WaitForJobChangesHelper()
1820
1821 return helper(self._GetJobPath(job_id), load_fn,
1822 fields, prev_job_info, prev_log_serial, timeout)
1823
1824 @locking.ssynchronized(_LOCK)
1825 @_RequireOpenQueue
1827 """Cancels a job.
1828
1829 This will only succeed if the job has not started yet.
1830
1831 @type job_id: string
1832 @param job_id: job ID of job to be cancelled.
1833
1834 """
1835 logging.info("Cancelling job %s", job_id)
1836
1837 job = self._LoadJobUnlocked(job_id)
1838 if not job:
1839 logging.debug("Job %s not found", job_id)
1840 return (False, "Job %s not found" % job_id)
1841
1842 (success, msg) = job.Cancel()
1843
1844 if success:
1845
1846
1847 self.UpdateJobUnlocked(job)
1848
1849 return (success, msg)
1850
1851 @_RequireOpenQueue
1853 """Archives jobs.
1854
1855 @type jobs: list of L{_QueuedJob}
1856 @param jobs: Job objects
1857 @rtype: int
1858 @return: Number of archived jobs
1859
1860 """
1861 archive_jobs = []
1862 rename_files = []
1863 for job in jobs:
1864 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1865 logging.debug("Job %s is not yet done", job.id)
1866 continue
1867
1868 archive_jobs.append(job)
1869
1870 old = self._GetJobPath(job.id)
1871 new = self._GetArchivedJobPath(job.id)
1872 rename_files.append((old, new))
1873
1874
1875 self._RenameFilesUnlocked(rename_files)
1876
1877 logging.debug("Successfully archived job(s) %s",
1878 utils.CommaJoin(job.id for job in archive_jobs))
1879
1880
1881
1882
1883
1884 self._UpdateQueueSizeUnlocked()
1885 return len(archive_jobs)
1886
1887 @locking.ssynchronized(_LOCK)
1888 @_RequireOpenQueue
1890 """Archives a job.
1891
1892 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1893
1894 @type job_id: string
1895 @param job_id: Job ID of job to be archived.
1896 @rtype: bool
1897 @return: Whether job was archived
1898
1899 """
1900 logging.info("Archiving job %s", job_id)
1901
1902 job = self._LoadJobUnlocked(job_id)
1903 if not job:
1904 logging.debug("Job %s not found", job_id)
1905 return False
1906
1907 return self._ArchiveJobsUnlocked([job]) == 1
1908
1909 @locking.ssynchronized(_LOCK)
1910 @_RequireOpenQueue
1912 """Archives all jobs based on age.
1913
1914 The method will archive all jobs which are older than the age
1915 parameter. For jobs that don't have an end timestamp, the start
1916 timestamp will be considered. The special '-1' age will cause
1917 archival of all jobs (that are not running or queued).
1918
1919 @type age: int
1920 @param age: the minimum age in seconds
1921
1922 """
1923 logging.info("Archiving jobs with age more than %s seconds", age)
1924
1925 now = time.time()
1926 end_time = now + timeout
1927 archived_count = 0
1928 last_touched = 0
1929
1930 all_job_ids = self._GetJobIDsUnlocked()
1931 pending = []
1932 for idx, job_id in enumerate(all_job_ids):
1933 last_touched = idx + 1
1934
1935
1936
1937
1938 if time.time() > end_time:
1939 break
1940
1941
1942 job = self._LoadJobUnlocked(job_id)
1943 if job:
1944 if job.end_timestamp is None:
1945 if job.start_timestamp is None:
1946 job_age = job.received_timestamp
1947 else:
1948 job_age = job.start_timestamp
1949 else:
1950 job_age = job.end_timestamp
1951
1952 if age == -1 or now - job_age[0] > age:
1953 pending.append(job)
1954
1955
1956 if len(pending) >= 10:
1957 archived_count += self._ArchiveJobsUnlocked(pending)
1958 pending = []
1959
1960 if pending:
1961 archived_count += self._ArchiveJobsUnlocked(pending)
1962
1963 return (archived_count, len(all_job_ids) - last_touched)
1964
1966 """Returns a list of jobs in queue.
1967
1968 @type job_ids: list
1969 @param job_ids: sequence of job identifiers or None for all
1970 @type fields: list
1971 @param fields: names of fields to return
1972 @rtype: list
1973 @return: list one element per job, each element being list with
1974 the requested fields
1975
1976 """
1977 jobs = []
1978 list_all = False
1979 if not job_ids:
1980
1981
1982 job_ids = self._GetJobIDsUnlocked()
1983 list_all = True
1984
1985 for job_id in job_ids:
1986 job = self.SafeLoadJobFromDisk(job_id)
1987 if job is not None:
1988 jobs.append(job.GetInfo(fields))
1989 elif not list_all:
1990 jobs.append(None)
1991
1992 return jobs
1993
1994 @locking.ssynchronized(_LOCK)
1995 @_RequireOpenQueue
1997 """Stops the job queue.
1998
1999 This shutdowns all the worker threads an closes the queue.
2000
2001 """
2002 self._wpool.TerminateWorkers()
2003
2004 self._queue_filelock.Close()
2005 self._queue_filelock = None
2006