1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28 processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38 import operator
39
40 try:
41
42 from pyinotify import pyinotify
43 except ImportError:
44 import pyinotify
45
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
60 from ganeti import ht
61 from ganeti import query
62 from ganeti import qlang
63 from ganeti import pathutils
64 from ganeti import vcluster
65
66
67 JOBQUEUE_THREADS = 25
68
69
70 _LOCK = "_lock"
71 _QUEUE = "_queue"
72
73
74 _GetIdAttr = operator.attrgetter("id")
78 """Special exception to cancel a job.
79
80 """
81
84 """Special exception to abort a job when the job queue is shutting down.
85
86 """
87
90 """Returns the current timestamp.
91
92 @rtype: tuple
93 @return: the current time in the (seconds, microseconds) format
94
95 """
96 return utils.SplitTime(time.time())
97
105
108 """Wrapper for job queries.
109
110 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111
112 """
118
120 """Executes a job query using cached field list.
121
122 """
123 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124
127 """Encapsulates an opcode object.
128
129 @ivar log: holds the execution log and consists of tuples
130 of the form C{(log_serial, timestamp, level, message)}
131 @ivar input: the OpCode we encapsulate
132 @ivar status: the current status
133 @ivar result: the result of the LU execution
134 @ivar start_timestamp: timestamp for the start of the execution
135 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136 @ivar stop_timestamp: timestamp for the end of the execution
137
138 """
139 __slots__ = ["input", "status", "result", "log", "priority",
140 "start_timestamp", "exec_timestamp", "end_timestamp",
141 "__weakref__"]
142
144 """Initializes instances of this class.
145
146 @type op: L{opcodes.OpCode}
147 @param op: the opcode we encapsulate
148
149 """
150 self.input = op
151 self.status = constants.OP_STATUS_QUEUED
152 self.result = None
153 self.log = []
154 self.start_timestamp = None
155 self.exec_timestamp = None
156 self.end_timestamp = None
157
158
159 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160
161 @classmethod
163 """Restore the _QueuedOpCode from the serialized form.
164
165 @type state: dict
166 @param state: the serialized state
167 @rtype: _QueuedOpCode
168 @return: a new _QueuedOpCode instance
169
170 """
171 obj = _QueuedOpCode.__new__(cls)
172 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173 obj.status = state["status"]
174 obj.result = state["result"]
175 obj.log = state["log"]
176 obj.start_timestamp = state.get("start_timestamp", None)
177 obj.exec_timestamp = state.get("exec_timestamp", None)
178 obj.end_timestamp = state.get("end_timestamp", None)
179 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180 return obj
181
183 """Serializes this _QueuedOpCode.
184
185 @rtype: dict
186 @return: the dictionary holding the serialized state
187
188 """
189 return {
190 "input": self.input.__getstate__(),
191 "status": self.status,
192 "result": self.result,
193 "log": self.log,
194 "start_timestamp": self.start_timestamp,
195 "exec_timestamp": self.exec_timestamp,
196 "end_timestamp": self.end_timestamp,
197 "priority": self.priority,
198 }
199
202 """In-memory job representation.
203
204 This is what we use to track the user-submitted jobs. Locking must
205 be taken care of by users of this class.
206
207 @type queue: L{JobQueue}
208 @ivar queue: the parent queue
209 @ivar id: the job ID
210 @type ops: list
211 @ivar ops: the list of _QueuedOpCode that constitute the job
212 @type log_serial: int
213 @ivar log_serial: holds the index for the next log entry
214 @ivar received_timestamp: the timestamp for when the job was received
215 @ivar start_timestmap: the timestamp for start of execution
216 @ivar end_timestamp: the timestamp for end of execution
217 @ivar writable: Whether the job is allowed to be modified
218
219 """
220
221 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222 "received_timestamp", "start_timestamp", "end_timestamp",
223 "__weakref__", "processor_lock", "writable", "archived"]
224
225 - def __init__(self, queue, job_id, ops, writable):
226 """Constructor for the _QueuedJob.
227
228 @type queue: L{JobQueue}
229 @param queue: our parent queue
230 @type job_id: job_id
231 @param job_id: our job id
232 @type ops: list
233 @param ops: the list of opcodes we hold, which will be encapsulated
234 in _QueuedOpCodes
235 @type writable: bool
236 @param writable: Whether job can be modified
237
238 """
239 if not ops:
240 raise errors.GenericError("A job needs at least one opcode")
241
242 self.queue = queue
243 self.id = int(job_id)
244 self.ops = [_QueuedOpCode(op) for op in ops]
245 self.log_serial = 0
246 self.received_timestamp = TimeStampNow()
247 self.start_timestamp = None
248 self.end_timestamp = None
249 self.archived = False
250
251 self._InitInMemory(self, writable)
252
253 assert not self.archived, "New jobs can not be marked as archived"
254
255 @staticmethod
257 """Initializes in-memory variables.
258
259 """
260 obj.writable = writable
261 obj.ops_iter = None
262 obj.cur_opctx = None
263
264
265 if writable:
266 obj.processor_lock = threading.Lock()
267 else:
268 obj.processor_lock = None
269
271 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
272 "id=%s" % self.id,
273 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
274
275 return "<%s at %#x>" % (" ".join(status), id(self))
276
277 @classmethod
278 - def Restore(cls, queue, state, writable, archived):
279 """Restore a _QueuedJob from serialized state:
280
281 @type queue: L{JobQueue}
282 @param queue: to which queue the restored job belongs
283 @type state: dict
284 @param state: the serialized state
285 @type writable: bool
286 @param writable: Whether job can be modified
287 @type archived: bool
288 @param archived: Whether job was already archived
289 @rtype: _JobQueue
290 @return: the restored _JobQueue instance
291
292 """
293 obj = _QueuedJob.__new__(cls)
294 obj.queue = queue
295 obj.id = int(state["id"])
296 obj.received_timestamp = state.get("received_timestamp", None)
297 obj.start_timestamp = state.get("start_timestamp", None)
298 obj.end_timestamp = state.get("end_timestamp", None)
299 obj.archived = archived
300
301 obj.ops = []
302 obj.log_serial = 0
303 for op_state in state["ops"]:
304 op = _QueuedOpCode.Restore(op_state)
305 for log_entry in op.log:
306 obj.log_serial = max(obj.log_serial, log_entry[0])
307 obj.ops.append(op)
308
309 cls._InitInMemory(obj, writable)
310
311 return obj
312
314 """Serialize the _JobQueue instance.
315
316 @rtype: dict
317 @return: the serialized state
318
319 """
320 return {
321 "id": self.id,
322 "ops": [op.Serialize() for op in self.ops],
323 "start_timestamp": self.start_timestamp,
324 "end_timestamp": self.end_timestamp,
325 "received_timestamp": self.received_timestamp,
326 }
327
380
382 """Gets the current priority for this job.
383
384 Only unfinished opcodes are considered. When all are done, the default
385 priority is used.
386
387 @rtype: int
388
389 """
390 priorities = [op.priority for op in self.ops
391 if op.status not in constants.OPS_FINALIZED]
392
393 if not priorities:
394
395 return constants.OP_PRIO_DEFAULT
396
397 return min(priorities)
398
400 """Selectively returns the log entries.
401
402 @type newer_than: None or int
403 @param newer_than: if this is None, return all log entries,
404 otherwise return only the log entries with serial higher
405 than this value
406 @rtype: list
407 @return: the list of the log entries selected
408
409 """
410 if newer_than is None:
411 serial = -1
412 else:
413 serial = newer_than
414
415 entries = []
416 for op in self.ops:
417 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
418
419 return entries
420
422 """Returns information about a job.
423
424 @type fields: list
425 @param fields: names of fields to return
426 @rtype: list
427 @return: list with one element for each field
428 @raise errors.OpExecError: when an invalid field
429 has been passed
430
431 """
432 return _SimpleJobQuery(fields)(self)
433
435 """Mark unfinished opcodes with a given status and result.
436
437 This is an utility function for marking all running or waiting to
438 be run opcodes with a given status. Opcodes which are already
439 finalised are not changed.
440
441 @param status: a given opcode status
442 @param result: the opcode result
443
444 """
445 not_marked = True
446 for op in self.ops:
447 if op.status in constants.OPS_FINALIZED:
448 assert not_marked, "Finalized opcodes found after non-finalized ones"
449 continue
450 op.status = status
451 op.result = result
452 not_marked = False
453
455 """Marks the job as finalized.
456
457 """
458 self.end_timestamp = TimeStampNow()
459
484
486 """Changes the job priority.
487
488 @type priority: int
489 @param priority: New priority
490 @rtype: tuple; (bool, string)
491 @return: Boolean describing whether job's priority was successfully changed
492 and a text message
493
494 """
495 status = self.CalcStatus()
496
497 if status in constants.JOBS_FINALIZED:
498 return (False, "Job %s is finished" % self.id)
499 elif status == constants.JOB_STATUS_CANCELING:
500 return (False, "Job %s is cancelling" % self.id)
501 else:
502 assert status in (constants.JOB_STATUS_QUEUED,
503 constants.JOB_STATUS_WAITING,
504 constants.JOB_STATUS_RUNNING)
505
506 changed = False
507 for op in self.ops:
508 if (op.status == constants.OP_STATUS_RUNNING or
509 op.status in constants.OPS_FINALIZED):
510 assert not changed, \
511 ("Found opcode for which priority should not be changed after"
512 " priority has been changed for previous opcodes")
513 continue
514
515 assert op.status in (constants.OP_STATUS_QUEUED,
516 constants.OP_STATUS_WAITING)
517
518 changed = True
519
520
521 op.priority = priority
522
523 if changed:
524 return (True, ("Priorities of pending opcodes for job %s have been"
525 " changed to %s" % (self.id, priority)))
526 else:
527 return (False, "Job %s had no pending opcodes" % self.id)
528
532 """Initializes this class.
533
534 @type queue: L{JobQueue}
535 @param queue: Job queue
536 @type job: L{_QueuedJob}
537 @param job: Job object
538 @type op: L{_QueuedOpCode}
539 @param op: OpCode
540
541 """
542 assert queue, "Queue is missing"
543 assert job, "Job is missing"
544 assert op, "Opcode is missing"
545
546 self._queue = queue
547 self._job = job
548 self._op = op
549
563
564 @locking.ssynchronized(_QUEUE, shared=1)
566 """Mark the opcode as running, not lock-waiting.
567
568 This is called from the mcpu code as a notifier function, when the LU is
569 finally about to start the Exec() method. Of course, to have end-user
570 visible results, the opcode must be initially (before calling into
571 Processor.ExecOpCode) set to OP_STATUS_WAITING.
572
573 """
574 assert self._op in self._job.ops
575 assert self._op.status in (constants.OP_STATUS_WAITING,
576 constants.OP_STATUS_CANCELING)
577
578
579 self._CheckCancel()
580
581 logging.debug("Opcode is now running")
582
583 self._op.status = constants.OP_STATUS_RUNNING
584 self._op.exec_timestamp = TimeStampNow()
585
586
587 self._queue.UpdateJobUnlocked(self._job)
588
589 @locking.ssynchronized(_QUEUE, shared=1)
591 """Internal feedback append function, with locks
592
593 """
594 self._job.log_serial += 1
595 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
596 self._queue.UpdateJobUnlocked(self._job, replicate=False)
597
614
626
628 """Submits jobs for processing.
629
630 See L{JobQueue.SubmitManyJobs}.
631
632 """
633
634 return self._queue.SubmitManyJobs(jobs)
635
638 - def __init__(self, fields, prev_job_info, prev_log_serial):
639 """Initializes this class.
640
641 @type fields: list of strings
642 @param fields: Fields requested by LUXI client
643 @type prev_job_info: string
644 @param prev_job_info: previous job info, as passed by the LUXI client
645 @type prev_log_serial: string
646 @param prev_log_serial: previous job serial, as passed by the LUXI client
647
648 """
649 self._squery = _SimpleJobQuery(fields)
650 self._prev_job_info = prev_job_info
651 self._prev_log_serial = prev_log_serial
652
689
692 - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
693 """Initializes this class.
694
695 @type filename: string
696 @param filename: Path to job file
697 @raises errors.InotifyError: if the notifier cannot be setup
698
699 """
700 self._wm = _inotify_wm_cls()
701 self._inotify_handler = \
702 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
703 self._notifier = \
704 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
705 try:
706 self._inotify_handler.enable()
707 except Exception:
708
709 self._notifier.stop()
710 raise
711
713 """Callback for inotify.
714
715 """
716 if not notifier_enabled:
717 self._inotify_handler.enable()
718
719 - def Wait(self, timeout):
720 """Waits for the job file to change.
721
722 @type timeout: float
723 @param timeout: Timeout in seconds
724 @return: Whether there have been events
725
726 """
727 assert timeout >= 0
728 have_events = self._notifier.check_events(timeout * 1000)
729 if have_events:
730 self._notifier.read_events()
731 self._notifier.process_events()
732 return have_events
733
735 """Closes underlying notifier and its file descriptor.
736
737 """
738 self._notifier.stop()
739
743 """Initializes this class.
744
745 @type filename: string
746 @param filename: Path to job file
747
748 """
749 self._filewaiter = None
750 self._filename = filename
751 self._waiter_cls = _waiter_cls
752
753 - def Wait(self, timeout):
754 """Waits for a job to change.
755
756 @type timeout: float
757 @param timeout: Timeout in seconds
758 @return: Whether there have been events
759
760 """
761 if self._filewaiter:
762 return self._filewaiter.Wait(timeout)
763
764
765
766
767
768 self._filewaiter = self._waiter_cls(self._filename)
769
770 return True
771
773 """Closes underlying waiter.
774
775 """
776 if self._filewaiter:
777 self._filewaiter.Close()
778
781 """Helper class using inotify to wait for changes in a job file.
782
783 This class takes a previous job status and serial, and alerts the client when
784 the current job status has changed.
785
786 """
787 @staticmethod
789 if counter.next() > 0:
790
791
792
793 time.sleep(0.1)
794
795 job = job_load_fn()
796 if not job:
797 raise errors.JobLost()
798
799 result = check_fn(job)
800 if result is None:
801 raise utils.RetryAgain()
802
803 return result
804
805 - def __call__(self, filename, job_load_fn,
806 fields, prev_job_info, prev_log_serial, timeout,
807 _waiter_cls=_JobChangesWaiter):
808 """Waits for changes on a job.
809
810 @type filename: string
811 @param filename: File on which to wait for changes
812 @type job_load_fn: callable
813 @param job_load_fn: Function to load job
814 @type fields: list of strings
815 @param fields: Which fields to check for changes
816 @type prev_job_info: list or None
817 @param prev_job_info: Last job information returned
818 @type prev_log_serial: int
819 @param prev_log_serial: Last job message serial number
820 @type timeout: float
821 @param timeout: maximum time to wait in seconds
822
823 """
824 counter = itertools.count()
825 try:
826 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
827 waiter = _waiter_cls(filename)
828 try:
829 return utils.Retry(compat.partial(self._CheckForChanges,
830 counter, job_load_fn, check_fn),
831 utils.RETRY_REMAINING_TIME, timeout,
832 wait_fn=waiter.Wait)
833 finally:
834 waiter.Close()
835 except errors.JobLost:
836 return None
837 except utils.RetryTimeout:
838 return constants.JOB_NOTCHANGED
839
851
855 """Initializes this class.
856
857 """
858 self._fn = fn
859 self._next = None
860
862 """Gets the next timeout if necessary.
863
864 """
865 if self._next is None:
866 self._next = self._fn()
867
869 """Returns the next timeout.
870
871 """
872 self._Advance()
873 return self._next
874
876 """Returns the current timeout and advances the internal state.
877
878 """
879 self._Advance()
880 result = self._next
881 self._next = None
882 return result
883
884
885 -class _OpExecContext:
886 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
887 """Initializes this class.
888
889 """
890 self.op = op
891 self.index = index
892 self.log_prefix = log_prefix
893 self.summary = op.input.Summary()
894
895
896 if getattr(op.input, opcodes.DEPEND_ATTR, None):
897 self.jobdeps = op.input.depends[:]
898 else:
899 self.jobdeps = None
900
901 self._timeout_strategy_factory = timeout_strategy_factory
902 self._ResetTimeoutStrategy()
903
905 """Creates a new timeout strategy.
906
907 """
908 self._timeout_strategy = \
909 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
910
912 """Checks whether priority can and should be increased.
913
914 Called when locks couldn't be acquired.
915
916 """
917 op = self.op
918
919
920
921 if (self._timeout_strategy.Peek() is None and
922 op.priority > constants.OP_PRIO_HIGHEST):
923 logging.debug("Increasing priority")
924 op.priority -= 1
925 self._ResetTimeoutStrategy()
926 return True
927
928 return False
929
931 """Returns the next lock acquire timeout.
932
933 """
934 return self._timeout_strategy.Next()
935
938 (DEFER,
939 WAITDEP,
940 FINISHED) = range(1, 4)
941
944 """Initializes this class.
945
946 """
947 self.queue = queue
948 self.opexec_fn = opexec_fn
949 self.job = job
950 self._timeout_strategy_factory = _timeout_strategy_factory
951
952 @staticmethod
954 """Locates the next opcode to run.
955
956 @type job: L{_QueuedJob}
957 @param job: Job object
958 @param timeout_strategy_factory: Callable to create new timeout strategy
959
960 """
961
962
963
964
965 if job.ops_iter is None:
966 job.ops_iter = enumerate(job.ops)
967
968
969 while True:
970 try:
971 (idx, op) = job.ops_iter.next()
972 except StopIteration:
973 raise errors.ProgrammerError("Called for a finished job")
974
975 if op.status == constants.OP_STATUS_RUNNING:
976
977 raise errors.ProgrammerError("Called for job marked as running")
978
979 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
980 timeout_strategy_factory)
981
982 if op.status not in constants.OPS_FINALIZED:
983 return opctx
984
985
986
987
988
989 logging.info("%s: opcode %s already processed, skipping",
990 opctx.log_prefix, opctx.summary)
991
992 @staticmethod
994 """Marks an opcode as waiting for locks.
995
996 The job's start timestamp is also set if necessary.
997
998 @type job: L{_QueuedJob}
999 @param job: Job object
1000 @type op: L{_QueuedOpCode}
1001 @param op: Opcode object
1002
1003 """
1004 assert op in job.ops
1005 assert op.status in (constants.OP_STATUS_QUEUED,
1006 constants.OP_STATUS_WAITING)
1007
1008 update = False
1009
1010 op.result = None
1011
1012 if op.status == constants.OP_STATUS_QUEUED:
1013 op.status = constants.OP_STATUS_WAITING
1014 update = True
1015
1016 if op.start_timestamp is None:
1017 op.start_timestamp = TimeStampNow()
1018 update = True
1019
1020 if job.start_timestamp is None:
1021 job.start_timestamp = op.start_timestamp
1022 update = True
1023
1024 assert op.status == constants.OP_STATUS_WAITING
1025
1026 return update
1027
1028 @staticmethod
1030 """Checks if an opcode has dependencies and if so, processes them.
1031
1032 @type queue: L{JobQueue}
1033 @param queue: Queue object
1034 @type job: L{_QueuedJob}
1035 @param job: Job object
1036 @type opctx: L{_OpExecContext}
1037 @param opctx: Opcode execution context
1038 @rtype: bool
1039 @return: Whether opcode will be re-scheduled by dependency tracker
1040
1041 """
1042 op = opctx.op
1043
1044 result = False
1045
1046 while opctx.jobdeps:
1047 (dep_job_id, dep_status) = opctx.jobdeps[0]
1048
1049 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1050 dep_status)
1051 assert ht.TNonEmptyString(depmsg), "No dependency message"
1052
1053 logging.info("%s: %s", opctx.log_prefix, depmsg)
1054
1055 if depresult == _JobDependencyManager.CONTINUE:
1056
1057 opctx.jobdeps.pop(0)
1058
1059 elif depresult == _JobDependencyManager.WAIT:
1060
1061
1062 result = True
1063 break
1064
1065 elif depresult == _JobDependencyManager.CANCEL:
1066
1067 job.Cancel()
1068 assert op.status == constants.OP_STATUS_CANCELING
1069 break
1070
1071 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1072 _JobDependencyManager.ERROR):
1073
1074 op.status = constants.OP_STATUS_ERROR
1075 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1076 break
1077
1078 else:
1079 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1080 depresult)
1081
1082 return result
1083
1085 """Processes one opcode and returns the result.
1086
1087 """
1088 op = opctx.op
1089
1090 assert op.status == constants.OP_STATUS_WAITING
1091
1092 timeout = opctx.GetNextLockTimeout()
1093
1094 try:
1095
1096 result = self.opexec_fn(op.input,
1097 _OpExecCallbacks(self.queue, self.job, op),
1098 timeout=timeout)
1099 except mcpu.LockAcquireTimeout:
1100 assert timeout is not None, "Received timeout for blocking acquire"
1101 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1102
1103 assert op.status in (constants.OP_STATUS_WAITING,
1104 constants.OP_STATUS_CANCELING)
1105
1106
1107 if op.status == constants.OP_STATUS_CANCELING:
1108 return (constants.OP_STATUS_CANCELING, None)
1109
1110
1111 if not self.queue.AcceptingJobsUnlocked():
1112 return (constants.OP_STATUS_QUEUED, None)
1113
1114
1115 return (constants.OP_STATUS_WAITING, None)
1116 except CancelJob:
1117 logging.exception("%s: Canceling job", opctx.log_prefix)
1118 assert op.status == constants.OP_STATUS_CANCELING
1119 return (constants.OP_STATUS_CANCELING, None)
1120
1121 except QueueShutdown:
1122 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1123
1124 assert op.status == constants.OP_STATUS_WAITING
1125
1126
1127 return (constants.OP_STATUS_QUEUED, None)
1128
1129 except Exception, err:
1130 logging.exception("%s: Caught exception in %s",
1131 opctx.log_prefix, opctx.summary)
1132 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1133 else:
1134 logging.debug("%s: %s successful",
1135 opctx.log_prefix, opctx.summary)
1136 return (constants.OP_STATUS_SUCCESS, result)
1137
1139 """Continues execution of a job.
1140
1141 @param _nextop_fn: Callback function for tests
1142 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1143 be deferred and C{WAITDEP} if the dependency manager
1144 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1145
1146 """
1147 queue = self.queue
1148 job = self.job
1149
1150 logging.debug("Processing job %s", job.id)
1151
1152 queue.acquire(shared=1)
1153 try:
1154 opcount = len(job.ops)
1155
1156 assert job.writable, "Expected writable job"
1157
1158
1159 if job.CalcStatus() in constants.JOBS_FINALIZED:
1160 return self.FINISHED
1161
1162
1163 if job.cur_opctx:
1164 opctx = job.cur_opctx
1165 job.cur_opctx = None
1166 else:
1167 if __debug__ and _nextop_fn:
1168 _nextop_fn()
1169 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1170
1171 op = opctx.op
1172
1173
1174 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1175 constants.OP_STATUS_CANCELING)
1176 for i in job.ops[opctx.index + 1:])
1177
1178 assert op.status in (constants.OP_STATUS_QUEUED,
1179 constants.OP_STATUS_WAITING,
1180 constants.OP_STATUS_CANCELING)
1181
1182 assert (op.priority <= constants.OP_PRIO_LOWEST and
1183 op.priority >= constants.OP_PRIO_HIGHEST)
1184
1185 waitjob = None
1186
1187 if op.status != constants.OP_STATUS_CANCELING:
1188 assert op.status in (constants.OP_STATUS_QUEUED,
1189 constants.OP_STATUS_WAITING)
1190
1191
1192 if self._MarkWaitlock(job, op):
1193
1194 queue.UpdateJobUnlocked(job)
1195
1196 assert op.status == constants.OP_STATUS_WAITING
1197 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1198 assert job.start_timestamp and op.start_timestamp
1199 assert waitjob is None
1200
1201
1202 waitjob = self._CheckDependencies(queue, job, opctx)
1203
1204 assert op.status in (constants.OP_STATUS_WAITING,
1205 constants.OP_STATUS_CANCELING,
1206 constants.OP_STATUS_ERROR)
1207
1208 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1209 constants.OP_STATUS_ERROR)):
1210 logging.info("%s: opcode %s waiting for locks",
1211 opctx.log_prefix, opctx.summary)
1212
1213 assert not opctx.jobdeps, "Not all dependencies were removed"
1214
1215 queue.release()
1216 try:
1217 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1218 finally:
1219 queue.acquire(shared=1)
1220
1221 op.status = op_status
1222 op.result = op_result
1223
1224 assert not waitjob
1225
1226 if op.status in (constants.OP_STATUS_WAITING,
1227 constants.OP_STATUS_QUEUED):
1228
1229
1230 assert not op.end_timestamp
1231 else:
1232
1233 op.end_timestamp = TimeStampNow()
1234
1235 if op.status == constants.OP_STATUS_CANCELING:
1236 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1237 for i in job.ops[opctx.index:])
1238 else:
1239 assert op.status in constants.OPS_FINALIZED
1240
1241 if op.status == constants.OP_STATUS_QUEUED:
1242
1243 assert not waitjob
1244
1245 finalize = False
1246
1247
1248 job.cur_opctx = None
1249
1250
1251 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1252
1253 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1254 finalize = False
1255
1256 if not waitjob and opctx.CheckPriorityIncrease():
1257
1258 queue.UpdateJobUnlocked(job)
1259
1260
1261 job.cur_opctx = opctx
1262
1263 assert (op.priority <= constants.OP_PRIO_LOWEST and
1264 op.priority >= constants.OP_PRIO_HIGHEST)
1265
1266
1267 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1268
1269 else:
1270
1271 assert (opctx.index == 0 or
1272 compat.all(i.status == constants.OP_STATUS_SUCCESS
1273 for i in job.ops[:opctx.index]))
1274
1275
1276 job.cur_opctx = None
1277
1278 if op.status == constants.OP_STATUS_SUCCESS:
1279 finalize = False
1280
1281 elif op.status == constants.OP_STATUS_ERROR:
1282
1283 assert errors.GetEncodedError(job.ops[opctx.index].result)
1284
1285 to_encode = errors.OpExecError("Preceding opcode failed")
1286 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1287 _EncodeOpError(to_encode))
1288 finalize = True
1289
1290
1291 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1292 errors.GetEncodedError(i.result)
1293 for i in job.ops[opctx.index:])
1294
1295 elif op.status == constants.OP_STATUS_CANCELING:
1296 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1297 "Job canceled by request")
1298 finalize = True
1299
1300 else:
1301 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1302
1303 if opctx.index == (opcount - 1):
1304
1305 finalize = True
1306
1307 if finalize:
1308
1309 job.Finalize()
1310
1311
1312
1313 queue.UpdateJobUnlocked(job)
1314
1315 assert not waitjob
1316
1317 if finalize:
1318 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1319 return self.FINISHED
1320
1321 assert not waitjob or queue.depmgr.JobWaiting(job)
1322
1323 if waitjob:
1324 return self.WAITDEP
1325 else:
1326 return self.DEFER
1327 finally:
1328 assert job.writable, "Job became read-only while being processed"
1329 queue.release()
1330
1353
1356 """The actual job workers.
1357
1358 """
1360 """Job executor.
1361
1362 @type job: L{_QueuedJob}
1363 @param job: the job to be processed
1364
1365 """
1366 assert job.writable, "Expected writable job"
1367
1368
1369
1370
1371 job.processor_lock.acquire()
1372 try:
1373 return self._RunTaskInner(job)
1374 finally:
1375 job.processor_lock.release()
1376
1397
1398 @staticmethod
1400 """Updates the worker thread name to include a short summary of the opcode.
1401
1402 @param setname_fn: Callable setting worker thread name
1403 @param execop_fn: Callable for executing opcode (usually
1404 L{mcpu.Processor.ExecOpCode})
1405
1406 """
1407 setname_fn(op)
1408 try:
1409 return execop_fn(op, *args, **kwargs)
1410 finally:
1411 setname_fn(None)
1412
1413 @staticmethod
1415 """Sets the worker thread name.
1416
1417 @type job: L{_QueuedJob}
1418 @type op: L{opcodes.OpCode}
1419
1420 """
1421 parts = ["Job%s" % job.id]
1422
1423 if op:
1424 parts.append(op.TinySummary())
1425
1426 return "/".join(parts)
1427
1430 """Simple class implementing a job-processing workerpool.
1431
1432 """
1438
1441 """Keeps track of job dependencies.
1442
1443 """
1444 (WAIT,
1445 ERROR,
1446 CANCEL,
1447 CONTINUE,
1448 WRONGSTATUS) = range(1, 6)
1449
1450 - def __init__(self, getstatus_fn, enqueue_fn):
1451 """Initializes this class.
1452
1453 """
1454 self._getstatus_fn = getstatus_fn
1455 self._enqueue_fn = enqueue_fn
1456
1457 self._waiters = {}
1458 self._lock = locking.SharedLock("JobDepMgr")
1459
1460 @locking.ssynchronized(_LOCK, shared=1)
1462 """Retrieves information about waiting jobs.
1463
1464 @type requested: set
1465 @param requested: Requested information, see C{query.LQ_*}
1466
1467 """
1468
1469
1470
1471 return [("job/%s" % job_id, None, None,
1472 [("job", [job.id for job in waiters])])
1473 for job_id, waiters in self._waiters.items()
1474 if waiters]
1475
1476 @locking.ssynchronized(_LOCK, shared=1)
1478 """Checks if a job is waiting.
1479
1480 """
1481 return compat.any(job in jobs
1482 for jobs in self._waiters.values())
1483
1484 @locking.ssynchronized(_LOCK)
1486 """Checks if a dependency job has the requested status.
1487
1488 If the other job is not yet in a finalized status, the calling job will be
1489 notified (re-added to the workerpool) at a later point.
1490
1491 @type job: L{_QueuedJob}
1492 @param job: Job object
1493 @type dep_job_id: int
1494 @param dep_job_id: ID of dependency job
1495 @type dep_status: list
1496 @param dep_status: Required status
1497
1498 """
1499 assert ht.TJobId(job.id)
1500 assert ht.TJobId(dep_job_id)
1501 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1502
1503 if job.id == dep_job_id:
1504 return (self.ERROR, "Job can't depend on itself")
1505
1506
1507 try:
1508 status = self._getstatus_fn(dep_job_id)
1509 except errors.JobLost, err:
1510 return (self.ERROR, "Dependency error: %s" % err)
1511
1512 assert status in constants.JOB_STATUS_ALL
1513
1514 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1515
1516 if status not in constants.JOBS_FINALIZED:
1517
1518 job_id_waiters.add(job)
1519 return (self.WAIT,
1520 "Need to wait for job %s, wanted status '%s'" %
1521 (dep_job_id, dep_status))
1522
1523
1524 if job in job_id_waiters:
1525 job_id_waiters.remove(job)
1526
1527 if (status == constants.JOB_STATUS_CANCELED and
1528 constants.JOB_STATUS_CANCELED not in dep_status):
1529 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1530
1531 elif not dep_status or status in dep_status:
1532 return (self.CONTINUE,
1533 "Dependency job %s finished with status '%s'" %
1534 (dep_job_id, status))
1535
1536 else:
1537 return (self.WRONGSTATUS,
1538 "Dependency job %s finished with status '%s',"
1539 " not one of '%s' as required" %
1540 (dep_job_id, status, utils.CommaJoin(dep_status)))
1541
1543 """Remove all jobs without actual waiters.
1544
1545 """
1546 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1547 if not waiters]:
1548 del self._waiters[job_id]
1549
1551 """Notifies all jobs waiting for a certain job ID.
1552
1553 @attention: Do not call until L{CheckAndRegister} returned a status other
1554 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1555 @type job_id: int
1556 @param job_id: Job ID
1557
1558 """
1559 assert ht.TJobId(job_id)
1560
1561 self._lock.acquire()
1562 try:
1563 self._RemoveEmptyWaitersUnlocked()
1564
1565 jobs = self._waiters.pop(job_id, None)
1566 finally:
1567 self._lock.release()
1568
1569 if jobs:
1570
1571 logging.debug("Re-adding %s jobs which were waiting for job %s",
1572 len(jobs), job_id)
1573 self._enqueue_fn(jobs)
1574
1577 """Decorator for "public" functions.
1578
1579 This function should be used for all 'public' functions. That is,
1580 functions usually called from other classes. Note that this should
1581 be applied only to methods (not plain functions), since it expects
1582 that the decorated function is called with a first argument that has
1583 a '_queue_filelock' argument.
1584
1585 @warning: Use this decorator only after locking.ssynchronized
1586
1587 Example::
1588 @locking.ssynchronized(_LOCK)
1589 @_RequireOpenQueue
1590 def Example(self):
1591 pass
1592
1593 """
1594 def wrapper(self, *args, **kwargs):
1595
1596 assert self._queue_filelock is not None, "Queue should be open"
1597 return fn(self, *args, **kwargs)
1598 return wrapper
1599
1602 """Decorator checking for a non-drained queue.
1603
1604 To be used with functions submitting new jobs.
1605
1606 """
1607 def wrapper(self, *args, **kwargs):
1608 """Wrapper function.
1609
1610 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1611
1612 """
1613
1614
1615
1616 if self._drained:
1617 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1618
1619 if not self._accepting_jobs:
1620 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1621
1622 return fn(self, *args, **kwargs)
1623 return wrapper
1624
1627 """Queue used to manage the jobs.
1628
1629 """
1631 """Constructor for JobQueue.
1632
1633 The constructor will initialize the job queue object and then
1634 start loading the current jobs from disk, either for starting them
1635 (if they were queue) or for aborting them (if they were already
1636 running).
1637
1638 @type context: GanetiContext
1639 @param context: the context object for access to the configuration
1640 data and other ganeti objects
1641
1642 """
1643 self.context = context
1644 self._memcache = weakref.WeakValueDictionary()
1645 self._my_hostname = netutils.Hostname.GetSysName()
1646
1647
1648
1649
1650
1651
1652 self._lock = locking.SharedLock("JobQueue")
1653
1654 self.acquire = self._lock.acquire
1655 self.release = self._lock.release
1656
1657
1658 self._accepting_jobs = True
1659
1660
1661
1662 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1663
1664
1665 self._last_serial = jstore.ReadSerial()
1666 assert self._last_serial is not None, ("Serial file was modified between"
1667 " check in jstore and here")
1668
1669
1670 self._nodes = dict((n.name, n.primary_ip)
1671 for n in self.context.cfg.GetAllNodesInfo().values()
1672 if n.master_candidate)
1673
1674
1675 self._nodes.pop(self._my_hostname, None)
1676
1677
1678
1679 self._queue_size = None
1680 self._UpdateQueueSizeUnlocked()
1681 assert ht.TInt(self._queue_size)
1682 self._drained = jstore.CheckDrainFlag()
1683
1684
1685 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1686 self._EnqueueJobs)
1687 self.context.glm.AddToLockMonitor(self.depmgr)
1688
1689
1690 self._wpool = _JobQueueWorkerPool(self)
1691 try:
1692 self._InspectQueue()
1693 except:
1694 self._wpool.TerminateWorkers()
1695 raise
1696
1697 @locking.ssynchronized(_LOCK)
1698 @_RequireOpenQueue
1700 """Loads the whole job queue and resumes unfinished jobs.
1701
1702 This function needs the lock here because WorkerPool.AddTask() may start a
1703 job while we're still doing our work.
1704
1705 """
1706 logging.info("Inspecting job queue")
1707
1708 restartjobs = []
1709
1710 all_job_ids = self._GetJobIDsUnlocked()
1711 jobs_count = len(all_job_ids)
1712 lastinfo = time.time()
1713 for idx, job_id in enumerate(all_job_ids):
1714
1715 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1716 idx == (jobs_count - 1)):
1717 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1718 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1719 lastinfo = time.time()
1720
1721 job = self._LoadJobUnlocked(job_id)
1722
1723
1724 if job is None:
1725 continue
1726
1727 status = job.CalcStatus()
1728
1729 if status == constants.JOB_STATUS_QUEUED:
1730 restartjobs.append(job)
1731
1732 elif status in (constants.JOB_STATUS_RUNNING,
1733 constants.JOB_STATUS_WAITING,
1734 constants.JOB_STATUS_CANCELING):
1735 logging.warning("Unfinished job %s found: %s", job.id, job)
1736
1737 if status == constants.JOB_STATUS_WAITING:
1738
1739 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740 restartjobs.append(job)
1741 else:
1742 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1743 "Unclean master daemon shutdown")
1744 job.Finalize()
1745
1746 self.UpdateJobUnlocked(job)
1747
1748 if restartjobs:
1749 logging.info("Restarting %s jobs", len(restartjobs))
1750 self._EnqueueJobsUnlocked(restartjobs)
1751
1752 logging.info("Job queue inspection finished")
1753
1755 """Gets RPC runner with context.
1756
1757 """
1758 return rpc.JobQueueRunner(self.context, address_list)
1759
1760 @locking.ssynchronized(_LOCK)
1761 @_RequireOpenQueue
1763 """Register a new node with the queue.
1764
1765 @type node: L{objects.Node}
1766 @param node: the node object to be added
1767
1768 """
1769 node_name = node.name
1770 assert node_name != self._my_hostname
1771
1772
1773 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1774 msg = result.fail_msg
1775 if msg:
1776 logging.warning("Cannot cleanup queue directory on node %s: %s",
1777 node_name, msg)
1778
1779 if not node.master_candidate:
1780
1781 self._nodes.pop(node_name, None)
1782
1783 return
1784
1785
1786 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1787
1788
1789 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1790
1791
1792 addrs = [node.primary_ip]
1793
1794 for file_name in files:
1795
1796 content = utils.ReadFile(file_name)
1797
1798 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1799 file_name, content)
1800 msg = result[node_name].fail_msg
1801 if msg:
1802 logging.error("Failed to upload file %s to node %s: %s",
1803 file_name, node_name, msg)
1804
1805
1806 result = \
1807 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1808 self._drained)
1809 msg = result[node_name].fail_msg
1810 if msg:
1811 logging.error("Failed to set queue drained flag on node %s: %s",
1812 node_name, msg)
1813
1814 self._nodes[node_name] = node.primary_ip
1815
1816 @locking.ssynchronized(_LOCK)
1817 @_RequireOpenQueue
1819 """Callback called when removing nodes from the cluster.
1820
1821 @type node_name: str
1822 @param node_name: the name of the node to remove
1823
1824 """
1825 self._nodes.pop(node_name, None)
1826
1827 @staticmethod
1829 """Verifies the status of an RPC call.
1830
1831 Since we aim to keep consistency should this node (the current
1832 master) fail, we will log errors if our rpc fail, and especially
1833 log the case when more than half of the nodes fails.
1834
1835 @param result: the data as returned from the rpc call
1836 @type nodes: list
1837 @param nodes: the list of nodes we made the call to
1838 @type failmsg: str
1839 @param failmsg: the identifier to be used for logging
1840
1841 """
1842 failed = []
1843 success = []
1844
1845 for node in nodes:
1846 msg = result[node].fail_msg
1847 if msg:
1848 failed.append(node)
1849 logging.error("RPC call %s (%s) failed on node %s: %s",
1850 result[node].call, failmsg, node, msg)
1851 else:
1852 success.append(node)
1853
1854
1855 if (len(success) + 1) < len(failed):
1856
1857 logging.error("More than half of the nodes failed")
1858
1860 """Helper for returning the node name/ip list.
1861
1862 @rtype: (list, list)
1863 @return: a tuple of two lists, the first one with the node
1864 names and the second one with the node addresses
1865
1866 """
1867
1868 name_list = self._nodes.keys()
1869 addr_list = [self._nodes[name] for name in name_list]
1870 return name_list, addr_list
1871
1873 """Writes a file locally and then replicates it to all nodes.
1874
1875 This function will replace the contents of a file on the local
1876 node and then replicate it to all the other nodes we have.
1877
1878 @type file_name: str
1879 @param file_name: the path of the file to be replicated
1880 @type data: str
1881 @param data: the new contents of the file
1882 @type replicate: boolean
1883 @param replicate: whether to spread the changes to the remote nodes
1884
1885 """
1886 getents = runtime.GetEnts()
1887 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1888 gid=getents.daemons_gid,
1889 mode=constants.JOB_QUEUE_FILES_PERMS)
1890
1891 if replicate:
1892 names, addrs = self._GetNodeIp()
1893 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1894 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1895
1897 """Renames a file locally and then replicate the change.
1898
1899 This function will rename a file in the local queue directory
1900 and then replicate this rename to all the other nodes we have.
1901
1902 @type rename: list of (old, new)
1903 @param rename: List containing tuples mapping old to new names
1904
1905 """
1906
1907 for old, new in rename:
1908 utils.RenameFile(old, new, mkdir=True)
1909
1910
1911 names, addrs = self._GetNodeIp()
1912 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1913 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1914
1916 """Generates a new job identifier.
1917
1918 Job identifiers are unique during the lifetime of a cluster.
1919
1920 @type count: integer
1921 @param count: how many serials to return
1922 @rtype: list of int
1923 @return: a list of job identifiers.
1924
1925 """
1926 assert ht.TNonNegativeInt(count)
1927
1928
1929 serial = self._last_serial + count
1930
1931
1932 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1933 "%s\n" % serial, True)
1934
1935 result = [jstore.FormatJobID(v)
1936 for v in range(self._last_serial + 1, serial + 1)]
1937
1938
1939 self._last_serial = serial
1940
1941 assert len(result) == count
1942
1943 return result
1944
1945 @staticmethod
1947 """Returns the job file for a given job id.
1948
1949 @type job_id: str
1950 @param job_id: the job identifier
1951 @rtype: str
1952 @return: the path to the job file
1953
1954 """
1955 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1956
1957 @staticmethod
1959 """Returns the archived job file for a give job id.
1960
1961 @type job_id: str
1962 @param job_id: the job identifier
1963 @rtype: str
1964 @return: the path to the archived job file
1965
1966 """
1967 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1968 jstore.GetArchiveDirectory(job_id),
1969 "job-%s" % job_id)
1970
1971 @staticmethod
1973 """Build list of directories containing job files.
1974
1975 @type archived: bool
1976 @param archived: Whether to include directories for archived jobs
1977 @rtype: list
1978
1979 """
1980 result = [pathutils.QUEUE_DIR]
1981
1982 if archived:
1983 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1984 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1985 utils.ListVisibleFiles(archive_path)))
1986
1987 return result
1988
1989 @classmethod
1991 """Return all known job IDs.
1992
1993 The method only looks at disk because it's a requirement that all
1994 jobs are present on disk (so in the _memcache we don't have any
1995 extra IDs).
1996
1997 @type sort: boolean
1998 @param sort: perform sorting on the returned job ids
1999 @rtype: list
2000 @return: the list of job IDs
2001
2002 """
2003 jlist = []
2004
2005 for path in cls._DetermineJobDirectories(archived):
2006 for filename in utils.ListVisibleFiles(path):
2007 m = constants.JOB_FILE_RE.match(filename)
2008 if m:
2009 jlist.append(int(m.group(1)))
2010
2011 if sort:
2012 jlist.sort()
2013 return jlist
2014
2016 """Loads a job from the disk or memory.
2017
2018 Given a job id, this will return the cached job object if
2019 existing, or try to load the job from the disk. If loading from
2020 disk, it will also add the job to the cache.
2021
2022 @type job_id: int
2023 @param job_id: the job id
2024 @rtype: L{_QueuedJob} or None
2025 @return: either None or the job object
2026
2027 """
2028 job = self._memcache.get(job_id, None)
2029 if job:
2030 logging.debug("Found job %s in memcache", job_id)
2031 assert job.writable, "Found read-only job in memcache"
2032 return job
2033
2034 try:
2035 job = self._LoadJobFromDisk(job_id, False)
2036 if job is None:
2037 return job
2038 except errors.JobFileCorrupted:
2039 old_path = self._GetJobPath(job_id)
2040 new_path = self._GetArchivedJobPath(job_id)
2041 if old_path == new_path:
2042
2043 logging.exception("Can't parse job %s", job_id)
2044 else:
2045
2046 logging.exception("Can't parse job %s, will archive.", job_id)
2047 self._RenameFilesUnlocked([(old_path, new_path)])
2048 return None
2049
2050 assert job.writable, "Job just loaded is not writable"
2051
2052 self._memcache[job_id] = job
2053 logging.debug("Added job %s to the cache", job_id)
2054 return job
2055
2057 """Load the given job file from disk.
2058
2059 Given a job file, read, load and restore it in a _QueuedJob format.
2060
2061 @type job_id: int
2062 @param job_id: job identifier
2063 @type try_archived: bool
2064 @param try_archived: Whether to try loading an archived job
2065 @rtype: L{_QueuedJob} or None
2066 @return: either None or the job object
2067
2068 """
2069 path_functions = [(self._GetJobPath, False)]
2070
2071 if try_archived:
2072 path_functions.append((self._GetArchivedJobPath, True))
2073
2074 raw_data = None
2075 archived = None
2076
2077 for (fn, archived) in path_functions:
2078 filepath = fn(job_id)
2079 logging.debug("Loading job from %s", filepath)
2080 try:
2081 raw_data = utils.ReadFile(filepath)
2082 except EnvironmentError, err:
2083 if err.errno != errno.ENOENT:
2084 raise
2085 else:
2086 break
2087
2088 if not raw_data:
2089 return None
2090
2091 if writable is None:
2092 writable = not archived
2093
2094 try:
2095 data = serializer.LoadJson(raw_data)
2096 job = _QueuedJob.Restore(self, data, writable, archived)
2097 except Exception, err:
2098 raise errors.JobFileCorrupted(err)
2099
2100 return job
2101
2103 """Load the given job file from disk.
2104
2105 Given a job file, read, load and restore it in a _QueuedJob format.
2106 In case of error reading the job, it gets returned as None, and the
2107 exception is logged.
2108
2109 @type job_id: int
2110 @param job_id: job identifier
2111 @type try_archived: bool
2112 @param try_archived: Whether to try loading an archived job
2113 @rtype: L{_QueuedJob} or None
2114 @return: either None or the job object
2115
2116 """
2117 try:
2118 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2119 except (errors.JobFileCorrupted, EnvironmentError):
2120 logging.exception("Can't load/parse job %s", job_id)
2121 return None
2122
2124 """Update the queue size.
2125
2126 """
2127 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2128
2129 @locking.ssynchronized(_LOCK)
2130 @_RequireOpenQueue
2132 """Sets the drain flag for the queue.
2133
2134 @type drain_flag: boolean
2135 @param drain_flag: Whether to set or unset the drain flag
2136
2137 """
2138
2139 jstore.SetDrainFlag(drain_flag)
2140
2141 self._drained = drain_flag
2142
2143
2144 (names, addrs) = self._GetNodeIp()
2145 result = \
2146 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2147 self._CheckRpcResult(result, self._nodes,
2148 "Setting queue drain flag to %s" % drain_flag)
2149
2150 return True
2151
2152 @_RequireOpenQueue
2154 """Create and store a new job.
2155
2156 This enters the job into our job queue and also puts it on the new
2157 queue, in order for it to be picked up by the queue processors.
2158
2159 @type job_id: job ID
2160 @param job_id: the job ID for the new job
2161 @type ops: list
2162 @param ops: The list of OpCodes that will become the new job.
2163 @rtype: L{_QueuedJob}
2164 @return: the job object to be queued
2165 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2166 @raise errors.GenericError: If an opcode is not valid
2167
2168 """
2169 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2170 raise errors.JobQueueFull()
2171
2172 job = _QueuedJob(self, job_id, ops, True)
2173
2174 for idx, op in enumerate(job.ops):
2175
2176 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2177 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2178 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2179 " are %s" % (idx, op.priority, allowed))
2180
2181
2182 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2183 if not opcodes.TNoRelativeJobDependencies(dependencies):
2184 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2185 " match %s: %s" %
2186 (idx, opcodes.TNoRelativeJobDependencies,
2187 dependencies))
2188
2189
2190 self.UpdateJobUnlocked(job)
2191
2192 self._queue_size += 1
2193
2194 logging.debug("Adding new job %s to the cache", job_id)
2195 self._memcache[job_id] = job
2196
2197 return job
2198
2199 @locking.ssynchronized(_LOCK)
2200 @_RequireOpenQueue
2201 @_RequireNonDrainedQueue
2211
2212 @locking.ssynchronized(_LOCK)
2213 @_RequireOpenQueue
2214 @_RequireNonDrainedQueue
2216 """Create and store multiple jobs.
2217
2218 @see: L{_SubmitJobUnlocked}
2219
2220 """
2221 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2222
2223 (results, added_jobs) = \
2224 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2225
2226 self._EnqueueJobsUnlocked(added_jobs)
2227
2228 return results
2229
2230 @staticmethod
2237
2238 @staticmethod
2240 """Resolves relative job IDs in dependencies.
2241
2242 @type resolve_fn: callable
2243 @param resolve_fn: Function to resolve a relative job ID
2244 @type deps: list
2245 @param deps: Dependencies
2246 @rtype: tuple; (boolean, string or list)
2247 @return: If successful (first tuple item), the returned list contains
2248 resolved job IDs along with the requested status; if not successful,
2249 the second element is an error message
2250
2251 """
2252 result = []
2253
2254 for (dep_job_id, dep_status) in deps:
2255 if ht.TRelativeJobId(dep_job_id):
2256 assert ht.TInt(dep_job_id) and dep_job_id < 0
2257 try:
2258 job_id = resolve_fn(dep_job_id)
2259 except IndexError:
2260
2261 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2262 else:
2263 job_id = dep_job_id
2264
2265 result.append((job_id, dep_status))
2266
2267 return (True, result)
2268
2270 """Create and store multiple jobs.
2271
2272 @see: L{_SubmitJobUnlocked}
2273
2274 """
2275 results = []
2276 added_jobs = []
2277
2278 def resolve_fn(job_idx, reljobid):
2279 assert reljobid < 0
2280 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2281
2282 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2283 for op in ops:
2284 if getattr(op, opcodes.DEPEND_ATTR, None):
2285 (status, data) = \
2286 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2287 op.depends)
2288 if not status:
2289
2290 assert ht.TNonEmptyString(data), "No error message"
2291 break
2292
2293 op.depends = data
2294 else:
2295 try:
2296 job = self._SubmitJobUnlocked(job_id, ops)
2297 except errors.GenericError, err:
2298 status = False
2299 data = self._FormatSubmitError(str(err), ops)
2300 else:
2301 status = True
2302 data = job_id
2303 added_jobs.append(job)
2304
2305 results.append((status, data))
2306
2307 return (results, added_jobs)
2308
2309 @locking.ssynchronized(_LOCK)
2311 """Helper function to add jobs to worker pool's queue.
2312
2313 @type jobs: list
2314 @param jobs: List of all jobs
2315
2316 """
2317 return self._EnqueueJobsUnlocked(jobs)
2318
2320 """Helper function to add jobs to worker pool's queue.
2321
2322 @type jobs: list
2323 @param jobs: List of all jobs
2324
2325 """
2326 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2327 self._wpool.AddManyTasks([(job, ) for job in jobs],
2328 priority=[job.CalcPriority() for job in jobs],
2329 task_id=map(_GetIdAttr, jobs))
2330
2332 """Gets the status of a job for dependencies.
2333
2334 @type job_id: int
2335 @param job_id: Job ID
2336 @raise errors.JobLost: If job can't be found
2337
2338 """
2339
2340
2341
2342 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2343
2344 assert not job.writable, "Got writable job"
2345
2346 if job:
2347 return job.CalcStatus()
2348
2349 raise errors.JobLost("Job %s not found" % job_id)
2350
2351 @_RequireOpenQueue
2353 """Update a job's on disk storage.
2354
2355 After a job has been modified, this function needs to be called in
2356 order to write the changes to disk and replicate them to the other
2357 nodes.
2358
2359 @type job: L{_QueuedJob}
2360 @param job: the changed job
2361 @type replicate: boolean
2362 @param replicate: whether to replicate the change to remote nodes
2363
2364 """
2365 if __debug__:
2366 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2367 assert (finalized ^ (job.end_timestamp is None))
2368 assert job.writable, "Can't update read-only job"
2369 assert not job.archived, "Can't update archived job"
2370
2371 filename = self._GetJobPath(job.id)
2372 data = serializer.DumpJson(job.Serialize())
2373 logging.debug("Writing job %s to %s", job.id, filename)
2374 self._UpdateJobQueueFile(filename, data, replicate)
2375
2376 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2377 timeout):
2378 """Waits for changes in a job.
2379
2380 @type job_id: int
2381 @param job_id: Job identifier
2382 @type fields: list of strings
2383 @param fields: Which fields to check for changes
2384 @type prev_job_info: list or None
2385 @param prev_job_info: Last job information returned
2386 @type prev_log_serial: int
2387 @param prev_log_serial: Last job message serial number
2388 @type timeout: float
2389 @param timeout: maximum time to wait in seconds
2390 @rtype: tuple (job info, log entries)
2391 @return: a tuple of the job information as required via
2392 the fields parameter, and the log entries as a list
2393
2394 if the job has not changed and the timeout has expired,
2395 we instead return a special value,
2396 L{constants.JOB_NOTCHANGED}, which should be interpreted
2397 as such by the clients
2398
2399 """
2400 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2401 writable=False)
2402
2403 helper = _WaitForJobChangesHelper()
2404
2405 return helper(self._GetJobPath(job_id), load_fn,
2406 fields, prev_job_info, prev_log_serial, timeout)
2407
2408 @locking.ssynchronized(_LOCK)
2409 @_RequireOpenQueue
2411 """Cancels a job.
2412
2413 This will only succeed if the job has not started yet.
2414
2415 @type job_id: int
2416 @param job_id: job ID of job to be cancelled.
2417
2418 """
2419 logging.info("Cancelling job %s", job_id)
2420
2421 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2422
2423 @locking.ssynchronized(_LOCK)
2424 @_RequireOpenQueue
2426 """Changes a job's priority.
2427
2428 @type job_id: int
2429 @param job_id: ID of the job whose priority should be changed
2430 @type priority: int
2431 @param priority: New priority
2432
2433 """
2434 logging.info("Changing priority of job %s to %s", job_id, priority)
2435
2436 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2437 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2438 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2439 (priority, allowed))
2440
2441 def fn(job):
2442 (success, msg) = job.ChangePriority(priority)
2443
2444 if success:
2445 try:
2446 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2447 except workerpool.NoSuchTask:
2448 logging.debug("Job %s is not in workerpool at this time", job.id)
2449
2450 return (success, msg)
2451
2452 return self._ModifyJobUnlocked(job_id, fn)
2453
2455 """Modifies a job.
2456
2457 @type job_id: int
2458 @param job_id: Job ID
2459 @type mod_fn: callable
2460 @param mod_fn: Modifying function, receiving job object as parameter,
2461 returning tuple of (status boolean, message string)
2462
2463 """
2464 job = self._LoadJobUnlocked(job_id)
2465 if not job:
2466 logging.debug("Job %s not found", job_id)
2467 return (False, "Job %s not found" % job_id)
2468
2469 assert job.writable, "Can't modify read-only job"
2470 assert not job.archived, "Can't modify archived job"
2471
2472 (success, msg) = mod_fn(job)
2473
2474 if success:
2475
2476
2477 self.UpdateJobUnlocked(job)
2478
2479 return (success, msg)
2480
2481 @_RequireOpenQueue
2483 """Archives jobs.
2484
2485 @type jobs: list of L{_QueuedJob}
2486 @param jobs: Job objects
2487 @rtype: int
2488 @return: Number of archived jobs
2489
2490 """
2491 archive_jobs = []
2492 rename_files = []
2493 for job in jobs:
2494 assert job.writable, "Can't archive read-only job"
2495 assert not job.archived, "Can't cancel archived job"
2496
2497 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2498 logging.debug("Job %s is not yet done", job.id)
2499 continue
2500
2501 archive_jobs.append(job)
2502
2503 old = self._GetJobPath(job.id)
2504 new = self._GetArchivedJobPath(job.id)
2505 rename_files.append((old, new))
2506
2507
2508 self._RenameFilesUnlocked(rename_files)
2509
2510 logging.debug("Successfully archived job(s) %s",
2511 utils.CommaJoin(job.id for job in archive_jobs))
2512
2513
2514
2515
2516
2517 self._UpdateQueueSizeUnlocked()
2518 return len(archive_jobs)
2519
2520 @locking.ssynchronized(_LOCK)
2521 @_RequireOpenQueue
2523 """Archives a job.
2524
2525 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2526
2527 @type job_id: int
2528 @param job_id: Job ID of job to be archived.
2529 @rtype: bool
2530 @return: Whether job was archived
2531
2532 """
2533 logging.info("Archiving job %s", job_id)
2534
2535 job = self._LoadJobUnlocked(job_id)
2536 if not job:
2537 logging.debug("Job %s not found", job_id)
2538 return False
2539
2540 return self._ArchiveJobsUnlocked([job]) == 1
2541
2542 @locking.ssynchronized(_LOCK)
2543 @_RequireOpenQueue
2545 """Archives all jobs based on age.
2546
2547 The method will archive all jobs which are older than the age
2548 parameter. For jobs that don't have an end timestamp, the start
2549 timestamp will be considered. The special '-1' age will cause
2550 archival of all jobs (that are not running or queued).
2551
2552 @type age: int
2553 @param age: the minimum age in seconds
2554
2555 """
2556 logging.info("Archiving jobs with age more than %s seconds", age)
2557
2558 now = time.time()
2559 end_time = now + timeout
2560 archived_count = 0
2561 last_touched = 0
2562
2563 all_job_ids = self._GetJobIDsUnlocked()
2564 pending = []
2565 for idx, job_id in enumerate(all_job_ids):
2566 last_touched = idx + 1
2567
2568
2569
2570
2571 if time.time() > end_time:
2572 break
2573
2574
2575 job = self._LoadJobUnlocked(job_id)
2576 if job:
2577 if job.end_timestamp is None:
2578 if job.start_timestamp is None:
2579 job_age = job.received_timestamp
2580 else:
2581 job_age = job.start_timestamp
2582 else:
2583 job_age = job.end_timestamp
2584
2585 if age == -1 or now - job_age[0] > age:
2586 pending.append(job)
2587
2588
2589 if len(pending) >= 10:
2590 archived_count += self._ArchiveJobsUnlocked(pending)
2591 pending = []
2592
2593 if pending:
2594 archived_count += self._ArchiveJobsUnlocked(pending)
2595
2596 return (archived_count, len(all_job_ids) - last_touched)
2597
2598 - def _Query(self, fields, qfilter):
2599 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2600 namefield="id")
2601
2602
2603
2604
2605 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2606
2607 job_ids = qobj.RequestedNames()
2608
2609 list_all = (job_ids is None)
2610
2611 if list_all:
2612
2613
2614 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2615
2616 jobs = []
2617
2618 for job_id in job_ids:
2619 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2620 if job is not None or not list_all:
2621 jobs.append((job_id, job))
2622
2623 return (qobj, jobs, list_all)
2624
2626 """Returns a list of jobs in queue.
2627
2628 @type fields: sequence
2629 @param fields: List of wanted fields
2630 @type qfilter: None or query2 filter (list)
2631 @param qfilter: Query filter
2632
2633 """
2634 (qobj, ctx, _) = self._Query(fields, qfilter)
2635
2636 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2637
2639 """Returns a list of jobs in queue.
2640
2641 @type job_ids: list
2642 @param job_ids: sequence of job identifiers or None for all
2643 @type fields: list
2644 @param fields: names of fields to return
2645 @rtype: list
2646 @return: list one element per job, each element being list with
2647 the requested fields
2648
2649 """
2650
2651 job_ids = [int(jid) for jid in job_ids]
2652 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2653
2654 (qobj, ctx, _) = self._Query(fields, qfilter)
2655
2656 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2657
2658 @locking.ssynchronized(_LOCK)
2660 """Prepare to stop the job queue.
2661
2662 Disables execution of jobs in the workerpool and returns whether there are
2663 any jobs currently running. If the latter is the case, the job queue is not
2664 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2665 be called without interfering with any job. Queued and unfinished jobs will
2666 be resumed next time.
2667
2668 Once this function has been called no new job submissions will be accepted
2669 (see L{_RequireNonDrainedQueue}).
2670
2671 @rtype: bool
2672 @return: Whether there are any running jobs
2673
2674 """
2675 if self._accepting_jobs:
2676 self._accepting_jobs = False
2677
2678
2679 self._wpool.SetActive(False)
2680
2681 return self._wpool.HasRunningTasks()
2682
2684 """Returns whether jobs are accepted.
2685
2686 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2687 queue is shutting down.
2688
2689 @rtype: bool
2690
2691 """
2692 return self._accepting_jobs
2693
2694 @locking.ssynchronized(_LOCK)
2695 @_RequireOpenQueue
2697 """Stops the job queue.
2698
2699 This shutdowns all the worker threads an closes the queue.
2700
2701 """
2702 self._wpool.TerminateWorkers()
2703
2704 self._queue_filelock.Close()
2705 self._queue_filelock = None
2706