1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28 processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40
41 from pyinotify import pyinotify
42 except ImportError:
43 import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60 from ganeti import query
61 from ganeti import qlang
62
63
64 JOBQUEUE_THREADS = 25
65 JOBS_PER_ARCHIVE_DIRECTORY = 10000
66
67
68 _LOCK = "_lock"
69 _QUEUE = "_queue"
73 """Special exception to cancel a job.
74
75 """
76
79 """Special exception to abort a job when the job queue is shutting down.
80
81 """
82
85 """Returns the current timestamp.
86
87 @rtype: tuple
88 @return: the current time in the (seconds, microseconds) format
89
90 """
91 return utils.SplitTime(time.time())
92
95 """Wrapper for job queries.
96
97 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
98
99 """
105
107 """Executes a job query using cached field list.
108
109 """
110 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
111
114 """Encapsulates an opcode object.
115
116 @ivar log: holds the execution log and consists of tuples
117 of the form C{(log_serial, timestamp, level, message)}
118 @ivar input: the OpCode we encapsulate
119 @ivar status: the current status
120 @ivar result: the result of the LU execution
121 @ivar start_timestamp: timestamp for the start of the execution
122 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
123 @ivar stop_timestamp: timestamp for the end of the execution
124
125 """
126 __slots__ = ["input", "status", "result", "log", "priority",
127 "start_timestamp", "exec_timestamp", "end_timestamp",
128 "__weakref__"]
129
131 """Initializes instances of this class.
132
133 @type op: L{opcodes.OpCode}
134 @param op: the opcode we encapsulate
135
136 """
137 self.input = op
138 self.status = constants.OP_STATUS_QUEUED
139 self.result = None
140 self.log = []
141 self.start_timestamp = None
142 self.exec_timestamp = None
143 self.end_timestamp = None
144
145
146 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
147
148 @classmethod
150 """Restore the _QueuedOpCode from the serialized form.
151
152 @type state: dict
153 @param state: the serialized state
154 @rtype: _QueuedOpCode
155 @return: a new _QueuedOpCode instance
156
157 """
158 obj = _QueuedOpCode.__new__(cls)
159 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
160 obj.status = state["status"]
161 obj.result = state["result"]
162 obj.log = state["log"]
163 obj.start_timestamp = state.get("start_timestamp", None)
164 obj.exec_timestamp = state.get("exec_timestamp", None)
165 obj.end_timestamp = state.get("end_timestamp", None)
166 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
167 return obj
168
170 """Serializes this _QueuedOpCode.
171
172 @rtype: dict
173 @return: the dictionary holding the serialized state
174
175 """
176 return {
177 "input": self.input.__getstate__(),
178 "status": self.status,
179 "result": self.result,
180 "log": self.log,
181 "start_timestamp": self.start_timestamp,
182 "exec_timestamp": self.exec_timestamp,
183 "end_timestamp": self.end_timestamp,
184 "priority": self.priority,
185 }
186
189 """In-memory job representation.
190
191 This is what we use to track the user-submitted jobs. Locking must
192 be taken care of by users of this class.
193
194 @type queue: L{JobQueue}
195 @ivar queue: the parent queue
196 @ivar id: the job ID
197 @type ops: list
198 @ivar ops: the list of _QueuedOpCode that constitute the job
199 @type log_serial: int
200 @ivar log_serial: holds the index for the next log entry
201 @ivar received_timestamp: the timestamp for when the job was received
202 @ivar start_timestmap: the timestamp for start of execution
203 @ivar end_timestamp: the timestamp for end of execution
204 @ivar writable: Whether the job is allowed to be modified
205
206 """
207
208 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
209 "received_timestamp", "start_timestamp", "end_timestamp",
210 "__weakref__", "processor_lock", "writable"]
211
212 - def __init__(self, queue, job_id, ops, writable):
213 """Constructor for the _QueuedJob.
214
215 @type queue: L{JobQueue}
216 @param queue: our parent queue
217 @type job_id: job_id
218 @param job_id: our job id
219 @type ops: list
220 @param ops: the list of opcodes we hold, which will be encapsulated
221 in _QueuedOpCodes
222 @type writable: bool
223 @param writable: Whether job can be modified
224
225 """
226 if not ops:
227 raise errors.GenericError("A job needs at least one opcode")
228
229 self.queue = queue
230 self.id = job_id
231 self.ops = [_QueuedOpCode(op) for op in ops]
232 self.log_serial = 0
233 self.received_timestamp = TimeStampNow()
234 self.start_timestamp = None
235 self.end_timestamp = None
236
237 self._InitInMemory(self, writable)
238
239 @staticmethod
241 """Initializes in-memory variables.
242
243 """
244 obj.writable = writable
245 obj.ops_iter = None
246 obj.cur_opctx = None
247
248
249 if writable:
250 obj.processor_lock = threading.Lock()
251 else:
252 obj.processor_lock = None
253
255 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
256 "id=%s" % self.id,
257 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
258
259 return "<%s at %#x>" % (" ".join(status), id(self))
260
261 @classmethod
262 - def Restore(cls, queue, state, writable):
263 """Restore a _QueuedJob from serialized state:
264
265 @type queue: L{JobQueue}
266 @param queue: to which queue the restored job belongs
267 @type state: dict
268 @param state: the serialized state
269 @type writable: bool
270 @param writable: Whether job can be modified
271 @rtype: _JobQueue
272 @return: the restored _JobQueue instance
273
274 """
275 obj = _QueuedJob.__new__(cls)
276 obj.queue = queue
277 obj.id = state["id"]
278 obj.received_timestamp = state.get("received_timestamp", None)
279 obj.start_timestamp = state.get("start_timestamp", None)
280 obj.end_timestamp = state.get("end_timestamp", None)
281
282 obj.ops = []
283 obj.log_serial = 0
284 for op_state in state["ops"]:
285 op = _QueuedOpCode.Restore(op_state)
286 for log_entry in op.log:
287 obj.log_serial = max(obj.log_serial, log_entry[0])
288 obj.ops.append(op)
289
290 cls._InitInMemory(obj, writable)
291
292 return obj
293
295 """Serialize the _JobQueue instance.
296
297 @rtype: dict
298 @return: the serialized state
299
300 """
301 return {
302 "id": self.id,
303 "ops": [op.Serialize() for op in self.ops],
304 "start_timestamp": self.start_timestamp,
305 "end_timestamp": self.end_timestamp,
306 "received_timestamp": self.received_timestamp,
307 }
308
361
363 """Gets the current priority for this job.
364
365 Only unfinished opcodes are considered. When all are done, the default
366 priority is used.
367
368 @rtype: int
369
370 """
371 priorities = [op.priority for op in self.ops
372 if op.status not in constants.OPS_FINALIZED]
373
374 if not priorities:
375
376 return constants.OP_PRIO_DEFAULT
377
378 return min(priorities)
379
381 """Selectively returns the log entries.
382
383 @type newer_than: None or int
384 @param newer_than: if this is None, return all log entries,
385 otherwise return only the log entries with serial higher
386 than this value
387 @rtype: list
388 @return: the list of the log entries selected
389
390 """
391 if newer_than is None:
392 serial = -1
393 else:
394 serial = newer_than
395
396 entries = []
397 for op in self.ops:
398 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
399
400 return entries
401
403 """Returns information about a job.
404
405 @type fields: list
406 @param fields: names of fields to return
407 @rtype: list
408 @return: list with one element for each field
409 @raise errors.OpExecError: when an invalid field
410 has been passed
411
412 """
413 return _SimpleJobQuery(fields)(self)
414
416 """Mark unfinished opcodes with a given status and result.
417
418 This is an utility function for marking all running or waiting to
419 be run opcodes with a given status. Opcodes which are already
420 finalised are not changed.
421
422 @param status: a given opcode status
423 @param result: the opcode result
424
425 """
426 not_marked = True
427 for op in self.ops:
428 if op.status in constants.OPS_FINALIZED:
429 assert not_marked, "Finalized opcodes found after non-finalized ones"
430 continue
431 op.status = status
432 op.result = result
433 not_marked = False
434
436 """Marks the job as finalized.
437
438 """
439 self.end_timestamp = TimeStampNow()
440
465
469 """Initializes this class.
470
471 @type queue: L{JobQueue}
472 @param queue: Job queue
473 @type job: L{_QueuedJob}
474 @param job: Job object
475 @type op: L{_QueuedOpCode}
476 @param op: OpCode
477
478 """
479 assert queue, "Queue is missing"
480 assert job, "Job is missing"
481 assert op, "Opcode is missing"
482
483 self._queue = queue
484 self._job = job
485 self._op = op
486
500
501 @locking.ssynchronized(_QUEUE, shared=1)
503 """Mark the opcode as running, not lock-waiting.
504
505 This is called from the mcpu code as a notifier function, when the LU is
506 finally about to start the Exec() method. Of course, to have end-user
507 visible results, the opcode must be initially (before calling into
508 Processor.ExecOpCode) set to OP_STATUS_WAITING.
509
510 """
511 assert self._op in self._job.ops
512 assert self._op.status in (constants.OP_STATUS_WAITING,
513 constants.OP_STATUS_CANCELING)
514
515
516 self._CheckCancel()
517
518 logging.debug("Opcode is now running")
519
520 self._op.status = constants.OP_STATUS_RUNNING
521 self._op.exec_timestamp = TimeStampNow()
522
523
524 self._queue.UpdateJobUnlocked(self._job)
525
526 @locking.ssynchronized(_QUEUE, shared=1)
528 """Internal feedback append function, with locks
529
530 """
531 self._job.log_serial += 1
532 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
533 self._queue.UpdateJobUnlocked(self._job, replicate=False)
534
551
561
563 """Submits jobs for processing.
564
565 See L{JobQueue.SubmitManyJobs}.
566
567 """
568
569 return self._queue.SubmitManyJobs(jobs)
570
573 - def __init__(self, fields, prev_job_info, prev_log_serial):
574 """Initializes this class.
575
576 @type fields: list of strings
577 @param fields: Fields requested by LUXI client
578 @type prev_job_info: string
579 @param prev_job_info: previous job info, as passed by the LUXI client
580 @type prev_log_serial: string
581 @param prev_log_serial: previous job serial, as passed by the LUXI client
582
583 """
584 self._squery = _SimpleJobQuery(fields)
585 self._prev_job_info = prev_job_info
586 self._prev_log_serial = prev_log_serial
587
624
628 """Initializes this class.
629
630 @type filename: string
631 @param filename: Path to job file
632 @raises errors.InotifyError: if the notifier cannot be setup
633
634 """
635 self._wm = pyinotify.WatchManager()
636 self._inotify_handler = \
637 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
638 self._notifier = \
639 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
640 try:
641 self._inotify_handler.enable()
642 except Exception:
643
644 self._notifier.stop()
645 raise
646
648 """Callback for inotify.
649
650 """
651 if not notifier_enabled:
652 self._inotify_handler.enable()
653
654 - def Wait(self, timeout):
655 """Waits for the job file to change.
656
657 @type timeout: float
658 @param timeout: Timeout in seconds
659 @return: Whether there have been events
660
661 """
662 assert timeout >= 0
663 have_events = self._notifier.check_events(timeout * 1000)
664 if have_events:
665 self._notifier.read_events()
666 self._notifier.process_events()
667 return have_events
668
670 """Closes underlying notifier and its file descriptor.
671
672 """
673 self._notifier.stop()
674
678 """Initializes this class.
679
680 @type filename: string
681 @param filename: Path to job file
682
683 """
684 self._filewaiter = None
685 self._filename = filename
686
687 - def Wait(self, timeout):
688 """Waits for a job to change.
689
690 @type timeout: float
691 @param timeout: Timeout in seconds
692 @return: Whether there have been events
693
694 """
695 if self._filewaiter:
696 return self._filewaiter.Wait(timeout)
697
698
699
700
701
702 self._filewaiter = _JobFileChangesWaiter(self._filename)
703
704 return True
705
707 """Closes underlying waiter.
708
709 """
710 if self._filewaiter:
711 self._filewaiter.Close()
712
715 """Helper class using inotify to wait for changes in a job file.
716
717 This class takes a previous job status and serial, and alerts the client when
718 the current job status has changed.
719
720 """
721 @staticmethod
723 if counter.next() > 0:
724
725
726
727 time.sleep(0.1)
728
729 job = job_load_fn()
730 if not job:
731 raise errors.JobLost()
732
733 result = check_fn(job)
734 if result is None:
735 raise utils.RetryAgain()
736
737 return result
738
739 - def __call__(self, filename, job_load_fn,
740 fields, prev_job_info, prev_log_serial, timeout):
741 """Waits for changes on a job.
742
743 @type filename: string
744 @param filename: File on which to wait for changes
745 @type job_load_fn: callable
746 @param job_load_fn: Function to load job
747 @type fields: list of strings
748 @param fields: Which fields to check for changes
749 @type prev_job_info: list or None
750 @param prev_job_info: Last job information returned
751 @type prev_log_serial: int
752 @param prev_log_serial: Last job message serial number
753 @type timeout: float
754 @param timeout: maximum time to wait in seconds
755
756 """
757 counter = itertools.count()
758 try:
759 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
760 waiter = _JobChangesWaiter(filename)
761 try:
762 return utils.Retry(compat.partial(self._CheckForChanges,
763 counter, job_load_fn, check_fn),
764 utils.RETRY_REMAINING_TIME, timeout,
765 wait_fn=waiter.Wait)
766 finally:
767 waiter.Close()
768 except (errors.InotifyError, errors.JobLost):
769 return None
770 except utils.RetryTimeout:
771 return constants.JOB_NOTCHANGED
772
784
788 """Initializes this class.
789
790 """
791 self._fn = fn
792 self._next = None
793
795 """Gets the next timeout if necessary.
796
797 """
798 if self._next is None:
799 self._next = self._fn()
800
802 """Returns the next timeout.
803
804 """
805 self._Advance()
806 return self._next
807
809 """Returns the current timeout and advances the internal state.
810
811 """
812 self._Advance()
813 result = self._next
814 self._next = None
815 return result
816
817
818 -class _OpExecContext:
819 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
820 """Initializes this class.
821
822 """
823 self.op = op
824 self.index = index
825 self.log_prefix = log_prefix
826 self.summary = op.input.Summary()
827
828
829 if getattr(op.input, opcodes.DEPEND_ATTR, None):
830 self.jobdeps = op.input.depends[:]
831 else:
832 self.jobdeps = None
833
834 self._timeout_strategy_factory = timeout_strategy_factory
835 self._ResetTimeoutStrategy()
836
838 """Creates a new timeout strategy.
839
840 """
841 self._timeout_strategy = \
842 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
843
845 """Checks whether priority can and should be increased.
846
847 Called when locks couldn't be acquired.
848
849 """
850 op = self.op
851
852
853
854 if (self._timeout_strategy.Peek() is None and
855 op.priority > constants.OP_PRIO_HIGHEST):
856 logging.debug("Increasing priority")
857 op.priority -= 1
858 self._ResetTimeoutStrategy()
859 return True
860
861 return False
862
864 """Returns the next lock acquire timeout.
865
866 """
867 return self._timeout_strategy.Next()
868
871 (DEFER,
872 WAITDEP,
873 FINISHED) = range(1, 4)
874
877 """Initializes this class.
878
879 """
880 self.queue = queue
881 self.opexec_fn = opexec_fn
882 self.job = job
883 self._timeout_strategy_factory = _timeout_strategy_factory
884
885 @staticmethod
887 """Locates the next opcode to run.
888
889 @type job: L{_QueuedJob}
890 @param job: Job object
891 @param timeout_strategy_factory: Callable to create new timeout strategy
892
893 """
894
895
896
897
898 if job.ops_iter is None:
899 job.ops_iter = enumerate(job.ops)
900
901
902 while True:
903 try:
904 (idx, op) = job.ops_iter.next()
905 except StopIteration:
906 raise errors.ProgrammerError("Called for a finished job")
907
908 if op.status == constants.OP_STATUS_RUNNING:
909
910 raise errors.ProgrammerError("Called for job marked as running")
911
912 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
913 timeout_strategy_factory)
914
915 if op.status not in constants.OPS_FINALIZED:
916 return opctx
917
918
919
920
921
922 logging.info("%s: opcode %s already processed, skipping",
923 opctx.log_prefix, opctx.summary)
924
925 @staticmethod
927 """Marks an opcode as waiting for locks.
928
929 The job's start timestamp is also set if necessary.
930
931 @type job: L{_QueuedJob}
932 @param job: Job object
933 @type op: L{_QueuedOpCode}
934 @param op: Opcode object
935
936 """
937 assert op in job.ops
938 assert op.status in (constants.OP_STATUS_QUEUED,
939 constants.OP_STATUS_WAITING)
940
941 update = False
942
943 op.result = None
944
945 if op.status == constants.OP_STATUS_QUEUED:
946 op.status = constants.OP_STATUS_WAITING
947 update = True
948
949 if op.start_timestamp is None:
950 op.start_timestamp = TimeStampNow()
951 update = True
952
953 if job.start_timestamp is None:
954 job.start_timestamp = op.start_timestamp
955 update = True
956
957 assert op.status == constants.OP_STATUS_WAITING
958
959 return update
960
961 @staticmethod
963 """Checks if an opcode has dependencies and if so, processes them.
964
965 @type queue: L{JobQueue}
966 @param queue: Queue object
967 @type job: L{_QueuedJob}
968 @param job: Job object
969 @type opctx: L{_OpExecContext}
970 @param opctx: Opcode execution context
971 @rtype: bool
972 @return: Whether opcode will be re-scheduled by dependency tracker
973
974 """
975 op = opctx.op
976
977 result = False
978
979 while opctx.jobdeps:
980 (dep_job_id, dep_status) = opctx.jobdeps[0]
981
982 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
983 dep_status)
984 assert ht.TNonEmptyString(depmsg), "No dependency message"
985
986 logging.info("%s: %s", opctx.log_prefix, depmsg)
987
988 if depresult == _JobDependencyManager.CONTINUE:
989
990 opctx.jobdeps.pop(0)
991
992 elif depresult == _JobDependencyManager.WAIT:
993
994
995 result = True
996 break
997
998 elif depresult == _JobDependencyManager.CANCEL:
999
1000 job.Cancel()
1001 assert op.status == constants.OP_STATUS_CANCELING
1002 break
1003
1004 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1005 _JobDependencyManager.ERROR):
1006
1007 op.status = constants.OP_STATUS_ERROR
1008 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1009 break
1010
1011 else:
1012 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1013 depresult)
1014
1015 return result
1016
1018 """Processes one opcode and returns the result.
1019
1020 """
1021 op = opctx.op
1022
1023 assert op.status == constants.OP_STATUS_WAITING
1024
1025 timeout = opctx.GetNextLockTimeout()
1026
1027 try:
1028
1029 result = self.opexec_fn(op.input,
1030 _OpExecCallbacks(self.queue, self.job, op),
1031 timeout=timeout, priority=op.priority)
1032 except mcpu.LockAcquireTimeout:
1033 assert timeout is not None, "Received timeout for blocking acquire"
1034 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1035
1036 assert op.status in (constants.OP_STATUS_WAITING,
1037 constants.OP_STATUS_CANCELING)
1038
1039
1040 if op.status == constants.OP_STATUS_CANCELING:
1041 return (constants.OP_STATUS_CANCELING, None)
1042
1043
1044 if not self.queue.AcceptingJobsUnlocked():
1045 return (constants.OP_STATUS_QUEUED, None)
1046
1047
1048 return (constants.OP_STATUS_WAITING, None)
1049 except CancelJob:
1050 logging.exception("%s: Canceling job", opctx.log_prefix)
1051 assert op.status == constants.OP_STATUS_CANCELING
1052 return (constants.OP_STATUS_CANCELING, None)
1053
1054 except QueueShutdown:
1055 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1056
1057 assert op.status == constants.OP_STATUS_WAITING
1058
1059
1060 return (constants.OP_STATUS_QUEUED, None)
1061
1062 except Exception, err:
1063 logging.exception("%s: Caught exception in %s",
1064 opctx.log_prefix, opctx.summary)
1065 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1066 else:
1067 logging.debug("%s: %s successful",
1068 opctx.log_prefix, opctx.summary)
1069 return (constants.OP_STATUS_SUCCESS, result)
1070
1072 """Continues execution of a job.
1073
1074 @param _nextop_fn: Callback function for tests
1075 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1076 be deferred and C{WAITDEP} if the dependency manager
1077 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1078
1079 """
1080 queue = self.queue
1081 job = self.job
1082
1083 logging.debug("Processing job %s", job.id)
1084
1085 queue.acquire(shared=1)
1086 try:
1087 opcount = len(job.ops)
1088
1089 assert job.writable, "Expected writable job"
1090
1091
1092 if job.CalcStatus() in constants.JOBS_FINALIZED:
1093 return self.FINISHED
1094
1095
1096 if job.cur_opctx:
1097 opctx = job.cur_opctx
1098 job.cur_opctx = None
1099 else:
1100 if __debug__ and _nextop_fn:
1101 _nextop_fn()
1102 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1103
1104 op = opctx.op
1105
1106
1107 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1108 constants.OP_STATUS_CANCELING)
1109 for i in job.ops[opctx.index + 1:])
1110
1111 assert op.status in (constants.OP_STATUS_QUEUED,
1112 constants.OP_STATUS_WAITING,
1113 constants.OP_STATUS_CANCELING)
1114
1115 assert (op.priority <= constants.OP_PRIO_LOWEST and
1116 op.priority >= constants.OP_PRIO_HIGHEST)
1117
1118 waitjob = None
1119
1120 if op.status != constants.OP_STATUS_CANCELING:
1121 assert op.status in (constants.OP_STATUS_QUEUED,
1122 constants.OP_STATUS_WAITING)
1123
1124
1125 if self._MarkWaitlock(job, op):
1126
1127 queue.UpdateJobUnlocked(job)
1128
1129 assert op.status == constants.OP_STATUS_WAITING
1130 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1131 assert job.start_timestamp and op.start_timestamp
1132 assert waitjob is None
1133
1134
1135 waitjob = self._CheckDependencies(queue, job, opctx)
1136
1137 assert op.status in (constants.OP_STATUS_WAITING,
1138 constants.OP_STATUS_CANCELING,
1139 constants.OP_STATUS_ERROR)
1140
1141 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1142 constants.OP_STATUS_ERROR)):
1143 logging.info("%s: opcode %s waiting for locks",
1144 opctx.log_prefix, opctx.summary)
1145
1146 assert not opctx.jobdeps, "Not all dependencies were removed"
1147
1148 queue.release()
1149 try:
1150 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1151 finally:
1152 queue.acquire(shared=1)
1153
1154 op.status = op_status
1155 op.result = op_result
1156
1157 assert not waitjob
1158
1159 if op.status in (constants.OP_STATUS_WAITING,
1160 constants.OP_STATUS_QUEUED):
1161
1162
1163 assert not op.end_timestamp
1164 else:
1165
1166 op.end_timestamp = TimeStampNow()
1167
1168 if op.status == constants.OP_STATUS_CANCELING:
1169 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1170 for i in job.ops[opctx.index:])
1171 else:
1172 assert op.status in constants.OPS_FINALIZED
1173
1174 if op.status == constants.OP_STATUS_QUEUED:
1175
1176 assert not waitjob
1177
1178 finalize = False
1179
1180
1181 job.cur_opctx = None
1182
1183
1184 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1185
1186 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1187 finalize = False
1188
1189 if not waitjob and opctx.CheckPriorityIncrease():
1190
1191 queue.UpdateJobUnlocked(job)
1192
1193
1194 job.cur_opctx = opctx
1195
1196 assert (op.priority <= constants.OP_PRIO_LOWEST and
1197 op.priority >= constants.OP_PRIO_HIGHEST)
1198
1199
1200 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1201
1202 else:
1203
1204 assert (opctx.index == 0 or
1205 compat.all(i.status == constants.OP_STATUS_SUCCESS
1206 for i in job.ops[:opctx.index]))
1207
1208
1209 job.cur_opctx = None
1210
1211 if op.status == constants.OP_STATUS_SUCCESS:
1212 finalize = False
1213
1214 elif op.status == constants.OP_STATUS_ERROR:
1215
1216 assert errors.GetEncodedError(job.ops[opctx.index].result)
1217
1218 to_encode = errors.OpExecError("Preceding opcode failed")
1219 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1220 _EncodeOpError(to_encode))
1221 finalize = True
1222
1223
1224 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1225 errors.GetEncodedError(i.result)
1226 for i in job.ops[opctx.index:])
1227
1228 elif op.status == constants.OP_STATUS_CANCELING:
1229 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1230 "Job canceled by request")
1231 finalize = True
1232
1233 else:
1234 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1235
1236 if opctx.index == (opcount - 1):
1237
1238 finalize = True
1239
1240 if finalize:
1241
1242 job.Finalize()
1243
1244
1245
1246 queue.UpdateJobUnlocked(job)
1247
1248 assert not waitjob
1249
1250 if finalize:
1251 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1252 return self.FINISHED
1253
1254 assert not waitjob or queue.depmgr.JobWaiting(job)
1255
1256 if waitjob:
1257 return self.WAITDEP
1258 else:
1259 return self.DEFER
1260 finally:
1261 assert job.writable, "Job became read-only while being processed"
1262 queue.release()
1263
1286
1289 """The actual job workers.
1290
1291 """
1293 """Job executor.
1294
1295 @type job: L{_QueuedJob}
1296 @param job: the job to be processed
1297
1298 """
1299 assert job.writable, "Expected writable job"
1300
1301
1302
1303
1304 job.processor_lock.acquire()
1305 try:
1306 return self._RunTaskInner(job)
1307 finally:
1308 job.processor_lock.release()
1309
1330
1331 @staticmethod
1333 """Updates the worker thread name to include a short summary of the opcode.
1334
1335 @param setname_fn: Callable setting worker thread name
1336 @param execop_fn: Callable for executing opcode (usually
1337 L{mcpu.Processor.ExecOpCode})
1338
1339 """
1340 setname_fn(op)
1341 try:
1342 return execop_fn(op, *args, **kwargs)
1343 finally:
1344 setname_fn(None)
1345
1346 @staticmethod
1348 """Sets the worker thread name.
1349
1350 @type job: L{_QueuedJob}
1351 @type op: L{opcodes.OpCode}
1352
1353 """
1354 parts = ["Job%s" % job.id]
1355
1356 if op:
1357 parts.append(op.TinySummary())
1358
1359 return "/".join(parts)
1360
1363 """Simple class implementing a job-processing workerpool.
1364
1365 """
1371
1374 """Keeps track of job dependencies.
1375
1376 """
1377 (WAIT,
1378 ERROR,
1379 CANCEL,
1380 CONTINUE,
1381 WRONGSTATUS) = range(1, 6)
1382
1383 - def __init__(self, getstatus_fn, enqueue_fn):
1384 """Initializes this class.
1385
1386 """
1387 self._getstatus_fn = getstatus_fn
1388 self._enqueue_fn = enqueue_fn
1389
1390 self._waiters = {}
1391 self._lock = locking.SharedLock("JobDepMgr")
1392
1393 @locking.ssynchronized(_LOCK, shared=1)
1395 """Retrieves information about waiting jobs.
1396
1397 @type requested: set
1398 @param requested: Requested information, see C{query.LQ_*}
1399
1400 """
1401
1402
1403
1404 return [("job/%s" % job_id, None, None,
1405 [("job", [job.id for job in waiters])])
1406 for job_id, waiters in self._waiters.items()
1407 if waiters]
1408
1409 @locking.ssynchronized(_LOCK, shared=1)
1411 """Checks if a job is waiting.
1412
1413 """
1414 return compat.any(job in jobs
1415 for jobs in self._waiters.values())
1416
1417 @locking.ssynchronized(_LOCK)
1419 """Checks if a dependency job has the requested status.
1420
1421 If the other job is not yet in a finalized status, the calling job will be
1422 notified (re-added to the workerpool) at a later point.
1423
1424 @type job: L{_QueuedJob}
1425 @param job: Job object
1426 @type dep_job_id: string
1427 @param dep_job_id: ID of dependency job
1428 @type dep_status: list
1429 @param dep_status: Required status
1430
1431 """
1432 assert ht.TString(job.id)
1433 assert ht.TString(dep_job_id)
1434 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1435
1436 if job.id == dep_job_id:
1437 return (self.ERROR, "Job can't depend on itself")
1438
1439
1440 try:
1441 status = self._getstatus_fn(dep_job_id)
1442 except errors.JobLost, err:
1443 return (self.ERROR, "Dependency error: %s" % err)
1444
1445 assert status in constants.JOB_STATUS_ALL
1446
1447 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1448
1449 if status not in constants.JOBS_FINALIZED:
1450
1451 job_id_waiters.add(job)
1452 return (self.WAIT,
1453 "Need to wait for job %s, wanted status '%s'" %
1454 (dep_job_id, dep_status))
1455
1456
1457 if job in job_id_waiters:
1458 job_id_waiters.remove(job)
1459
1460 if (status == constants.JOB_STATUS_CANCELED and
1461 constants.JOB_STATUS_CANCELED not in dep_status):
1462 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1463
1464 elif not dep_status or status in dep_status:
1465 return (self.CONTINUE,
1466 "Dependency job %s finished with status '%s'" %
1467 (dep_job_id, status))
1468
1469 else:
1470 return (self.WRONGSTATUS,
1471 "Dependency job %s finished with status '%s',"
1472 " not one of '%s' as required" %
1473 (dep_job_id, status, utils.CommaJoin(dep_status)))
1474
1476 """Remove all jobs without actual waiters.
1477
1478 """
1479 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1480 if not waiters]:
1481 del self._waiters[job_id]
1482
1484 """Notifies all jobs waiting for a certain job ID.
1485
1486 @attention: Do not call until L{CheckAndRegister} returned a status other
1487 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1488 @type job_id: string
1489 @param job_id: Job ID
1490
1491 """
1492 assert ht.TString(job_id)
1493
1494 self._lock.acquire()
1495 try:
1496 self._RemoveEmptyWaitersUnlocked()
1497
1498 jobs = self._waiters.pop(job_id, None)
1499 finally:
1500 self._lock.release()
1501
1502 if jobs:
1503
1504 logging.debug("Re-adding %s jobs which were waiting for job %s",
1505 len(jobs), job_id)
1506 self._enqueue_fn(jobs)
1507
1510 """Decorator for "public" functions.
1511
1512 This function should be used for all 'public' functions. That is,
1513 functions usually called from other classes. Note that this should
1514 be applied only to methods (not plain functions), since it expects
1515 that the decorated function is called with a first argument that has
1516 a '_queue_filelock' argument.
1517
1518 @warning: Use this decorator only after locking.ssynchronized
1519
1520 Example::
1521 @locking.ssynchronized(_LOCK)
1522 @_RequireOpenQueue
1523 def Example(self):
1524 pass
1525
1526 """
1527 def wrapper(self, *args, **kwargs):
1528
1529 assert self._queue_filelock is not None, "Queue should be open"
1530 return fn(self, *args, **kwargs)
1531 return wrapper
1532
1535 """Decorator checking for a non-drained queue.
1536
1537 To be used with functions submitting new jobs.
1538
1539 """
1540 def wrapper(self, *args, **kwargs):
1541 """Wrapper function.
1542
1543 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1544
1545 """
1546
1547
1548
1549 if self._drained:
1550 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1551
1552 if not self._accepting_jobs:
1553 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1554
1555 return fn(self, *args, **kwargs)
1556 return wrapper
1557
1560 """Queue used to manage the jobs.
1561
1562 """
1564 """Constructor for JobQueue.
1565
1566 The constructor will initialize the job queue object and then
1567 start loading the current jobs from disk, either for starting them
1568 (if they were queue) or for aborting them (if they were already
1569 running).
1570
1571 @type context: GanetiContext
1572 @param context: the context object for access to the configuration
1573 data and other ganeti objects
1574
1575 """
1576 self.context = context
1577 self._memcache = weakref.WeakValueDictionary()
1578 self._my_hostname = netutils.Hostname.GetSysName()
1579
1580
1581
1582
1583
1584
1585 self._lock = locking.SharedLock("JobQueue")
1586
1587 self.acquire = self._lock.acquire
1588 self.release = self._lock.release
1589
1590
1591 self._accepting_jobs = True
1592
1593
1594
1595 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1596
1597
1598 self._last_serial = jstore.ReadSerial()
1599 assert self._last_serial is not None, ("Serial file was modified between"
1600 " check in jstore and here")
1601
1602
1603 self._nodes = dict((n.name, n.primary_ip)
1604 for n in self.context.cfg.GetAllNodesInfo().values()
1605 if n.master_candidate)
1606
1607
1608 self._nodes.pop(self._my_hostname, None)
1609
1610
1611
1612 self._queue_size = None
1613 self._UpdateQueueSizeUnlocked()
1614 assert ht.TInt(self._queue_size)
1615 self._drained = jstore.CheckDrainFlag()
1616
1617
1618 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1619 self._EnqueueJobs)
1620 self.context.glm.AddToLockMonitor(self.depmgr)
1621
1622
1623 self._wpool = _JobQueueWorkerPool(self)
1624 try:
1625 self._InspectQueue()
1626 except:
1627 self._wpool.TerminateWorkers()
1628 raise
1629
1630 @locking.ssynchronized(_LOCK)
1631 @_RequireOpenQueue
1633 """Loads the whole job queue and resumes unfinished jobs.
1634
1635 This function needs the lock here because WorkerPool.AddTask() may start a
1636 job while we're still doing our work.
1637
1638 """
1639 logging.info("Inspecting job queue")
1640
1641 restartjobs = []
1642
1643 all_job_ids = self._GetJobIDsUnlocked()
1644 jobs_count = len(all_job_ids)
1645 lastinfo = time.time()
1646 for idx, job_id in enumerate(all_job_ids):
1647
1648 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1649 idx == (jobs_count - 1)):
1650 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1651 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1652 lastinfo = time.time()
1653
1654 job = self._LoadJobUnlocked(job_id)
1655
1656
1657 if job is None:
1658 continue
1659
1660 status = job.CalcStatus()
1661
1662 if status == constants.JOB_STATUS_QUEUED:
1663 restartjobs.append(job)
1664
1665 elif status in (constants.JOB_STATUS_RUNNING,
1666 constants.JOB_STATUS_WAITING,
1667 constants.JOB_STATUS_CANCELING):
1668 logging.warning("Unfinished job %s found: %s", job.id, job)
1669
1670 if status == constants.JOB_STATUS_WAITING:
1671
1672 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1673 restartjobs.append(job)
1674 else:
1675 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1676 "Unclean master daemon shutdown")
1677 job.Finalize()
1678
1679 self.UpdateJobUnlocked(job)
1680
1681 if restartjobs:
1682 logging.info("Restarting %s jobs", len(restartjobs))
1683 self._EnqueueJobsUnlocked(restartjobs)
1684
1685 logging.info("Job queue inspection finished")
1686
1688 """Gets RPC runner with context.
1689
1690 """
1691 return rpc.JobQueueRunner(self.context, address_list)
1692
1693 @locking.ssynchronized(_LOCK)
1694 @_RequireOpenQueue
1696 """Register a new node with the queue.
1697
1698 @type node: L{objects.Node}
1699 @param node: the node object to be added
1700
1701 """
1702 node_name = node.name
1703 assert node_name != self._my_hostname
1704
1705
1706 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1707 msg = result.fail_msg
1708 if msg:
1709 logging.warning("Cannot cleanup queue directory on node %s: %s",
1710 node_name, msg)
1711
1712 if not node.master_candidate:
1713
1714 self._nodes.pop(node_name, None)
1715
1716 return
1717
1718
1719 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1720
1721
1722 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1723
1724
1725 addrs = [node.primary_ip]
1726
1727 for file_name in files:
1728
1729 content = utils.ReadFile(file_name)
1730
1731 result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1732 content)
1733 msg = result[node_name].fail_msg
1734 if msg:
1735 logging.error("Failed to upload file %s to node %s: %s",
1736 file_name, node_name, msg)
1737
1738 self._nodes[node_name] = node.primary_ip
1739
1740 @locking.ssynchronized(_LOCK)
1741 @_RequireOpenQueue
1743 """Callback called when removing nodes from the cluster.
1744
1745 @type node_name: str
1746 @param node_name: the name of the node to remove
1747
1748 """
1749 self._nodes.pop(node_name, None)
1750
1751 @staticmethod
1753 """Verifies the status of an RPC call.
1754
1755 Since we aim to keep consistency should this node (the current
1756 master) fail, we will log errors if our rpc fail, and especially
1757 log the case when more than half of the nodes fails.
1758
1759 @param result: the data as returned from the rpc call
1760 @type nodes: list
1761 @param nodes: the list of nodes we made the call to
1762 @type failmsg: str
1763 @param failmsg: the identifier to be used for logging
1764
1765 """
1766 failed = []
1767 success = []
1768
1769 for node in nodes:
1770 msg = result[node].fail_msg
1771 if msg:
1772 failed.append(node)
1773 logging.error("RPC call %s (%s) failed on node %s: %s",
1774 result[node].call, failmsg, node, msg)
1775 else:
1776 success.append(node)
1777
1778
1779 if (len(success) + 1) < len(failed):
1780
1781 logging.error("More than half of the nodes failed")
1782
1784 """Helper for returning the node name/ip list.
1785
1786 @rtype: (list, list)
1787 @return: a tuple of two lists, the first one with the node
1788 names and the second one with the node addresses
1789
1790 """
1791
1792 name_list = self._nodes.keys()
1793 addr_list = [self._nodes[name] for name in name_list]
1794 return name_list, addr_list
1795
1797 """Writes a file locally and then replicates it to all nodes.
1798
1799 This function will replace the contents of a file on the local
1800 node and then replicate it to all the other nodes we have.
1801
1802 @type file_name: str
1803 @param file_name: the path of the file to be replicated
1804 @type data: str
1805 @param data: the new contents of the file
1806 @type replicate: boolean
1807 @param replicate: whether to spread the changes to the remote nodes
1808
1809 """
1810 getents = runtime.GetEnts()
1811 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1812 gid=getents.masterd_gid)
1813
1814 if replicate:
1815 names, addrs = self._GetNodeIp()
1816 result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1817 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1818
1820 """Renames a file locally and then replicate the change.
1821
1822 This function will rename a file in the local queue directory
1823 and then replicate this rename to all the other nodes we have.
1824
1825 @type rename: list of (old, new)
1826 @param rename: List containing tuples mapping old to new names
1827
1828 """
1829
1830 for old, new in rename:
1831 utils.RenameFile(old, new, mkdir=True)
1832
1833
1834 names, addrs = self._GetNodeIp()
1835 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1836 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1837
1838 @staticmethod
1858
1859 @classmethod
1861 """Returns the archive directory for a job.
1862
1863 @type job_id: str
1864 @param job_id: Job identifier
1865 @rtype: str
1866 @return: Directory name
1867
1868 """
1869 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1870
1872 """Generates a new job identifier.
1873
1874 Job identifiers are unique during the lifetime of a cluster.
1875
1876 @type count: integer
1877 @param count: how many serials to return
1878 @rtype: str
1879 @return: a string representing the job identifier.
1880
1881 """
1882 assert ht.TPositiveInt(count)
1883
1884
1885 serial = self._last_serial + count
1886
1887
1888 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1889 "%s\n" % serial, True)
1890
1891 result = [self._FormatJobID(v)
1892 for v in range(self._last_serial + 1, serial + 1)]
1893
1894
1895 self._last_serial = serial
1896
1897 assert len(result) == count
1898
1899 return result
1900
1901 @staticmethod
1903 """Returns the job file for a given job id.
1904
1905 @type job_id: str
1906 @param job_id: the job identifier
1907 @rtype: str
1908 @return: the path to the job file
1909
1910 """
1911 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1912
1913 @classmethod
1915 """Returns the archived job file for a give job id.
1916
1917 @type job_id: str
1918 @param job_id: the job identifier
1919 @rtype: str
1920 @return: the path to the archived job file
1921
1922 """
1923 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1924 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1925
1926 @staticmethod
1928 """Return all known job IDs.
1929
1930 The method only looks at disk because it's a requirement that all
1931 jobs are present on disk (so in the _memcache we don't have any
1932 extra IDs).
1933
1934 @type sort: boolean
1935 @param sort: perform sorting on the returned job ids
1936 @rtype: list
1937 @return: the list of job IDs
1938
1939 """
1940 jlist = []
1941 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1942 m = constants.JOB_FILE_RE.match(filename)
1943 if m:
1944 jlist.append(m.group(1))
1945 if sort:
1946 jlist = utils.NiceSort(jlist)
1947 return jlist
1948
1950 """Loads a job from the disk or memory.
1951
1952 Given a job id, this will return the cached job object if
1953 existing, or try to load the job from the disk. If loading from
1954 disk, it will also add the job to the cache.
1955
1956 @param job_id: the job id
1957 @rtype: L{_QueuedJob} or None
1958 @return: either None or the job object
1959
1960 """
1961 job = self._memcache.get(job_id, None)
1962 if job:
1963 logging.debug("Found job %s in memcache", job_id)
1964 assert job.writable, "Found read-only job in memcache"
1965 return job
1966
1967 try:
1968 job = self._LoadJobFromDisk(job_id, False)
1969 if job is None:
1970 return job
1971 except errors.JobFileCorrupted:
1972 old_path = self._GetJobPath(job_id)
1973 new_path = self._GetArchivedJobPath(job_id)
1974 if old_path == new_path:
1975
1976 logging.exception("Can't parse job %s", job_id)
1977 else:
1978
1979 logging.exception("Can't parse job %s, will archive.", job_id)
1980 self._RenameFilesUnlocked([(old_path, new_path)])
1981 return None
1982
1983 assert job.writable, "Job just loaded is not writable"
1984
1985 self._memcache[job_id] = job
1986 logging.debug("Added job %s to the cache", job_id)
1987 return job
1988
1990 """Load the given job file from disk.
1991
1992 Given a job file, read, load and restore it in a _QueuedJob format.
1993
1994 @type job_id: string
1995 @param job_id: job identifier
1996 @type try_archived: bool
1997 @param try_archived: Whether to try loading an archived job
1998 @rtype: L{_QueuedJob} or None
1999 @return: either None or the job object
2000
2001 """
2002 path_functions = [(self._GetJobPath, True)]
2003
2004 if try_archived:
2005 path_functions.append((self._GetArchivedJobPath, False))
2006
2007 raw_data = None
2008 writable_default = None
2009
2010 for (fn, writable_default) in path_functions:
2011 filepath = fn(job_id)
2012 logging.debug("Loading job from %s", filepath)
2013 try:
2014 raw_data = utils.ReadFile(filepath)
2015 except EnvironmentError, err:
2016 if err.errno != errno.ENOENT:
2017 raise
2018 else:
2019 break
2020
2021 if not raw_data:
2022 return None
2023
2024 if writable is None:
2025 writable = writable_default
2026
2027 try:
2028 data = serializer.LoadJson(raw_data)
2029 job = _QueuedJob.Restore(self, data, writable)
2030 except Exception, err:
2031 raise errors.JobFileCorrupted(err)
2032
2033 return job
2034
2036 """Load the given job file from disk.
2037
2038 Given a job file, read, load and restore it in a _QueuedJob format.
2039 In case of error reading the job, it gets returned as None, and the
2040 exception is logged.
2041
2042 @type job_id: string
2043 @param job_id: job identifier
2044 @type try_archived: bool
2045 @param try_archived: Whether to try loading an archived job
2046 @rtype: L{_QueuedJob} or None
2047 @return: either None or the job object
2048
2049 """
2050 try:
2051 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2052 except (errors.JobFileCorrupted, EnvironmentError):
2053 logging.exception("Can't load/parse job %s", job_id)
2054 return None
2055
2057 """Update the queue size.
2058
2059 """
2060 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2061
2062 @locking.ssynchronized(_LOCK)
2063 @_RequireOpenQueue
2065 """Sets the drain flag for the queue.
2066
2067 @type drain_flag: boolean
2068 @param drain_flag: Whether to set or unset the drain flag
2069
2070 """
2071 jstore.SetDrainFlag(drain_flag)
2072
2073 self._drained = drain_flag
2074
2075 return True
2076
2077 @_RequireOpenQueue
2079 """Create and store a new job.
2080
2081 This enters the job into our job queue and also puts it on the new
2082 queue, in order for it to be picked up by the queue processors.
2083
2084 @type job_id: job ID
2085 @param job_id: the job ID for the new job
2086 @type ops: list
2087 @param ops: The list of OpCodes that will become the new job.
2088 @rtype: L{_QueuedJob}
2089 @return: the job object to be queued
2090 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2091 @raise errors.GenericError: If an opcode is not valid
2092
2093 """
2094 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2095 raise errors.JobQueueFull()
2096
2097 job = _QueuedJob(self, job_id, ops, True)
2098
2099
2100 for idx, op in enumerate(job.ops):
2101 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2102 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2103 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2104 " are %s" % (idx, op.priority, allowed))
2105
2106 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2107 if not opcodes.TNoRelativeJobDependencies(dependencies):
2108 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2109 " match %s: %s" %
2110 (idx, opcodes.TNoRelativeJobDependencies,
2111 dependencies))
2112
2113
2114 self.UpdateJobUnlocked(job)
2115
2116 self._queue_size += 1
2117
2118 logging.debug("Adding new job %s to the cache", job_id)
2119 self._memcache[job_id] = job
2120
2121 return job
2122
2123 @locking.ssynchronized(_LOCK)
2124 @_RequireOpenQueue
2125 @_RequireNonDrainedQueue
2135
2136 @locking.ssynchronized(_LOCK)
2137 @_RequireOpenQueue
2138 @_RequireNonDrainedQueue
2140 """Create and store multiple jobs.
2141
2142 @see: L{_SubmitJobUnlocked}
2143
2144 """
2145 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2146
2147 (results, added_jobs) = \
2148 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2149
2150 self._EnqueueJobsUnlocked(added_jobs)
2151
2152 return results
2153
2154 @staticmethod
2161
2162 @staticmethod
2164 """Resolves relative job IDs in dependencies.
2165
2166 @type resolve_fn: callable
2167 @param resolve_fn: Function to resolve a relative job ID
2168 @type deps: list
2169 @param deps: Dependencies
2170 @rtype: list
2171 @return: Resolved dependencies
2172
2173 """
2174 result = []
2175
2176 for (dep_job_id, dep_status) in deps:
2177 if ht.TRelativeJobId(dep_job_id):
2178 assert ht.TInt(dep_job_id) and dep_job_id < 0
2179 try:
2180 job_id = resolve_fn(dep_job_id)
2181 except IndexError:
2182
2183 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2184 else:
2185 job_id = dep_job_id
2186
2187 result.append((job_id, dep_status))
2188
2189 return (True, result)
2190
2192 """Create and store multiple jobs.
2193
2194 @see: L{_SubmitJobUnlocked}
2195
2196 """
2197 results = []
2198 added_jobs = []
2199
2200 def resolve_fn(job_idx, reljobid):
2201 assert reljobid < 0
2202 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2203
2204 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2205 for op in ops:
2206 if getattr(op, opcodes.DEPEND_ATTR, None):
2207 (status, data) = \
2208 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2209 op.depends)
2210 if not status:
2211
2212 assert ht.TNonEmptyString(data), "No error message"
2213 break
2214
2215 op.depends = data
2216 else:
2217 try:
2218 job = self._SubmitJobUnlocked(job_id, ops)
2219 except errors.GenericError, err:
2220 status = False
2221 data = self._FormatSubmitError(str(err), ops)
2222 else:
2223 status = True
2224 data = job_id
2225 added_jobs.append(job)
2226
2227 results.append((status, data))
2228
2229 return (results, added_jobs)
2230
2231 @locking.ssynchronized(_LOCK)
2233 """Helper function to add jobs to worker pool's queue.
2234
2235 @type jobs: list
2236 @param jobs: List of all jobs
2237
2238 """
2239 return self._EnqueueJobsUnlocked(jobs)
2240
2242 """Helper function to add jobs to worker pool's queue.
2243
2244 @type jobs: list
2245 @param jobs: List of all jobs
2246
2247 """
2248 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2249 self._wpool.AddManyTasks([(job, ) for job in jobs],
2250 priority=[job.CalcPriority() for job in jobs])
2251
2253 """Gets the status of a job for dependencies.
2254
2255 @type job_id: string
2256 @param job_id: Job ID
2257 @raise errors.JobLost: If job can't be found
2258
2259 """
2260 if not isinstance(job_id, basestring):
2261 job_id = self._FormatJobID(job_id)
2262
2263
2264
2265
2266 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2267
2268 assert not job.writable, "Got writable job"
2269
2270 if job:
2271 return job.CalcStatus()
2272
2273 raise errors.JobLost("Job %s not found" % job_id)
2274
2275 @_RequireOpenQueue
2277 """Update a job's on disk storage.
2278
2279 After a job has been modified, this function needs to be called in
2280 order to write the changes to disk and replicate them to the other
2281 nodes.
2282
2283 @type job: L{_QueuedJob}
2284 @param job: the changed job
2285 @type replicate: boolean
2286 @param replicate: whether to replicate the change to remote nodes
2287
2288 """
2289 if __debug__:
2290 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2291 assert (finalized ^ (job.end_timestamp is None))
2292 assert job.writable, "Can't update read-only job"
2293
2294 filename = self._GetJobPath(job.id)
2295 data = serializer.DumpJson(job.Serialize())
2296 logging.debug("Writing job %s to %s", job.id, filename)
2297 self._UpdateJobQueueFile(filename, data, replicate)
2298
2299 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2300 timeout):
2301 """Waits for changes in a job.
2302
2303 @type job_id: string
2304 @param job_id: Job identifier
2305 @type fields: list of strings
2306 @param fields: Which fields to check for changes
2307 @type prev_job_info: list or None
2308 @param prev_job_info: Last job information returned
2309 @type prev_log_serial: int
2310 @param prev_log_serial: Last job message serial number
2311 @type timeout: float
2312 @param timeout: maximum time to wait in seconds
2313 @rtype: tuple (job info, log entries)
2314 @return: a tuple of the job information as required via
2315 the fields parameter, and the log entries as a list
2316
2317 if the job has not changed and the timeout has expired,
2318 we instead return a special value,
2319 L{constants.JOB_NOTCHANGED}, which should be interpreted
2320 as such by the clients
2321
2322 """
2323 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2324 writable=False)
2325
2326 helper = _WaitForJobChangesHelper()
2327
2328 return helper(self._GetJobPath(job_id), load_fn,
2329 fields, prev_job_info, prev_log_serial, timeout)
2330
2331 @locking.ssynchronized(_LOCK)
2332 @_RequireOpenQueue
2334 """Cancels a job.
2335
2336 This will only succeed if the job has not started yet.
2337
2338 @type job_id: string
2339 @param job_id: job ID of job to be cancelled.
2340
2341 """
2342 logging.info("Cancelling job %s", job_id)
2343
2344 job = self._LoadJobUnlocked(job_id)
2345 if not job:
2346 logging.debug("Job %s not found", job_id)
2347 return (False, "Job %s not found" % job_id)
2348
2349 assert job.writable, "Can't cancel read-only job"
2350
2351 (success, msg) = job.Cancel()
2352
2353 if success:
2354
2355
2356 self.UpdateJobUnlocked(job)
2357
2358 return (success, msg)
2359
2360 @_RequireOpenQueue
2362 """Archives jobs.
2363
2364 @type jobs: list of L{_QueuedJob}
2365 @param jobs: Job objects
2366 @rtype: int
2367 @return: Number of archived jobs
2368
2369 """
2370 archive_jobs = []
2371 rename_files = []
2372 for job in jobs:
2373 assert job.writable, "Can't archive read-only job"
2374
2375 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2376 logging.debug("Job %s is not yet done", job.id)
2377 continue
2378
2379 archive_jobs.append(job)
2380
2381 old = self._GetJobPath(job.id)
2382 new = self._GetArchivedJobPath(job.id)
2383 rename_files.append((old, new))
2384
2385
2386 self._RenameFilesUnlocked(rename_files)
2387
2388 logging.debug("Successfully archived job(s) %s",
2389 utils.CommaJoin(job.id for job in archive_jobs))
2390
2391
2392
2393
2394
2395 self._UpdateQueueSizeUnlocked()
2396 return len(archive_jobs)
2397
2398 @locking.ssynchronized(_LOCK)
2399 @_RequireOpenQueue
2401 """Archives a job.
2402
2403 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2404
2405 @type job_id: string
2406 @param job_id: Job ID of job to be archived.
2407 @rtype: bool
2408 @return: Whether job was archived
2409
2410 """
2411 logging.info("Archiving job %s", job_id)
2412
2413 job = self._LoadJobUnlocked(job_id)
2414 if not job:
2415 logging.debug("Job %s not found", job_id)
2416 return False
2417
2418 return self._ArchiveJobsUnlocked([job]) == 1
2419
2420 @locking.ssynchronized(_LOCK)
2421 @_RequireOpenQueue
2423 """Archives all jobs based on age.
2424
2425 The method will archive all jobs which are older than the age
2426 parameter. For jobs that don't have an end timestamp, the start
2427 timestamp will be considered. The special '-1' age will cause
2428 archival of all jobs (that are not running or queued).
2429
2430 @type age: int
2431 @param age: the minimum age in seconds
2432
2433 """
2434 logging.info("Archiving jobs with age more than %s seconds", age)
2435
2436 now = time.time()
2437 end_time = now + timeout
2438 archived_count = 0
2439 last_touched = 0
2440
2441 all_job_ids = self._GetJobIDsUnlocked()
2442 pending = []
2443 for idx, job_id in enumerate(all_job_ids):
2444 last_touched = idx + 1
2445
2446
2447
2448
2449 if time.time() > end_time:
2450 break
2451
2452
2453 job = self._LoadJobUnlocked(job_id)
2454 if job:
2455 if job.end_timestamp is None:
2456 if job.start_timestamp is None:
2457 job_age = job.received_timestamp
2458 else:
2459 job_age = job.start_timestamp
2460 else:
2461 job_age = job.end_timestamp
2462
2463 if age == -1 or now - job_age[0] > age:
2464 pending.append(job)
2465
2466
2467 if len(pending) >= 10:
2468 archived_count += self._ArchiveJobsUnlocked(pending)
2469 pending = []
2470
2471 if pending:
2472 archived_count += self._ArchiveJobsUnlocked(pending)
2473
2474 return (archived_count, len(all_job_ids) - last_touched)
2475
2476 - def _Query(self, fields, qfilter):
2477 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2478 namefield="id")
2479
2480 job_ids = qobj.RequestedNames()
2481
2482 list_all = (job_ids is None)
2483
2484 if list_all:
2485
2486
2487 job_ids = self._GetJobIDsUnlocked()
2488
2489 jobs = []
2490
2491 for job_id in job_ids:
2492 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2493 if job is not None or not list_all:
2494 jobs.append((job_id, job))
2495
2496 return (qobj, jobs, list_all)
2497
2499 """Returns a list of jobs in queue.
2500
2501 @type fields: sequence
2502 @param fields: List of wanted fields
2503 @type qfilter: None or query2 filter (list)
2504 @param qfilter: Query filter
2505
2506 """
2507 (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2508
2509 return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2510
2512 """Returns a list of jobs in queue.
2513
2514 @type job_ids: list
2515 @param job_ids: sequence of job identifiers or None for all
2516 @type fields: list
2517 @param fields: names of fields to return
2518 @rtype: list
2519 @return: list one element per job, each element being list with
2520 the requested fields
2521
2522 """
2523 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2524
2525 (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2526
2527 return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2528
2529 @locking.ssynchronized(_LOCK)
2531 """Prepare to stop the job queue.
2532
2533 Disables execution of jobs in the workerpool and returns whether there are
2534 any jobs currently running. If the latter is the case, the job queue is not
2535 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2536 be called without interfering with any job. Queued and unfinished jobs will
2537 be resumed next time.
2538
2539 Once this function has been called no new job submissions will be accepted
2540 (see L{_RequireNonDrainedQueue}).
2541
2542 @rtype: bool
2543 @return: Whether there are any running jobs
2544
2545 """
2546 if self._accepting_jobs:
2547 self._accepting_jobs = False
2548
2549
2550 self._wpool.SetActive(False)
2551
2552 return self._wpool.HasRunningTasks()
2553
2555 """Returns whether jobs are accepted.
2556
2557 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2558 queue is shutting down.
2559
2560 @rtype: bool
2561
2562 """
2563 return self._accepting_jobs
2564
2565 @locking.ssynchronized(_LOCK)
2566 @_RequireOpenQueue
2568 """Stops the job queue.
2569
2570 This shutdowns all the worker threads an closes the queue.
2571
2572 """
2573 self._wpool.TerminateWorkers()
2574
2575 self._queue_filelock.Close()
2576 self._queue_filelock = None
2577