1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the job queue handling.
32
33 Locking: there's a single, large lock in the L{JobQueue} class. It's
34 used by all other classes in this module.
35
36 @var JOBQUEUE_THREADS: the number of worker threads we start for
37 processing jobs
38
39 """
40
41 import logging
42 import errno
43 import time
44 import weakref
45 import threading
46 import itertools
47 import operator
48
49 try:
50
51 from pyinotify import pyinotify
52 except ImportError:
53 import pyinotify
54
55 from ganeti import asyncnotifier
56 from ganeti import constants
57 from ganeti import serializer
58 from ganeti import workerpool
59 from ganeti import locking
60 from ganeti import luxi
61 from ganeti import opcodes
62 from ganeti import opcodes_base
63 from ganeti import errors
64 from ganeti import mcpu
65 from ganeti import utils
66 from ganeti import jstore
67 import ganeti.rpc.node as rpc
68 from ganeti import runtime
69 from ganeti import netutils
70 from ganeti import compat
71 from ganeti import ht
72 from ganeti import query
73 from ganeti import qlang
74 from ganeti import pathutils
75 from ganeti import vcluster
76
77
78 JOBQUEUE_THREADS = 25
79
80
81 _LOCK = "_lock"
82 _QUEUE = "_queue"
83
84
85 _GetIdAttr = operator.attrgetter("id")
89 """Special exception to cancel a job.
90
91 """
92
95 """Special exception to abort a job when the job queue is shutting down.
96
97 """
98
101 """Returns the current timestamp.
102
103 @rtype: tuple
104 @return: the current time in the (seconds, microseconds) format
105
106 """
107 return utils.SplitTime(time.time())
108
116
119 """Wrapper for job queries.
120
121 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
122
123 """
129
131 """Executes a job query using cached field list.
132
133 """
134 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
135
138 """Encapsulates an opcode object.
139
140 @ivar log: holds the execution log and consists of tuples
141 of the form C{(log_serial, timestamp, level, message)}
142 @ivar input: the OpCode we encapsulate
143 @ivar status: the current status
144 @ivar result: the result of the LU execution
145 @ivar start_timestamp: timestamp for the start of the execution
146 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
147 @ivar stop_timestamp: timestamp for the end of the execution
148
149 """
150 __slots__ = ["input", "status", "result", "log", "priority",
151 "start_timestamp", "exec_timestamp", "end_timestamp",
152 "__weakref__"]
153
155 """Initializes instances of this class.
156
157 @type op: L{opcodes.OpCode}
158 @param op: the opcode we encapsulate
159
160 """
161 self.input = op
162 self.status = constants.OP_STATUS_QUEUED
163 self.result = None
164 self.log = []
165 self.start_timestamp = None
166 self.exec_timestamp = None
167 self.end_timestamp = None
168
169
170 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
171
172 @classmethod
174 """Restore the _QueuedOpCode from the serialized form.
175
176 @type state: dict
177 @param state: the serialized state
178 @rtype: _QueuedOpCode
179 @return: a new _QueuedOpCode instance
180
181 """
182 obj = _QueuedOpCode.__new__(cls)
183 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
184 obj.status = state["status"]
185 obj.result = state["result"]
186 obj.log = state["log"]
187 obj.start_timestamp = state.get("start_timestamp", None)
188 obj.exec_timestamp = state.get("exec_timestamp", None)
189 obj.end_timestamp = state.get("end_timestamp", None)
190 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
191 return obj
192
194 """Serializes this _QueuedOpCode.
195
196 @rtype: dict
197 @return: the dictionary holding the serialized state
198
199 """
200 return {
201 "input": self.input.__getstate__(),
202 "status": self.status,
203 "result": self.result,
204 "log": self.log,
205 "start_timestamp": self.start_timestamp,
206 "exec_timestamp": self.exec_timestamp,
207 "end_timestamp": self.end_timestamp,
208 "priority": self.priority,
209 }
210
213 """In-memory job representation.
214
215 This is what we use to track the user-submitted jobs. Locking must
216 be taken care of by users of this class.
217
218 @type queue: L{JobQueue}
219 @ivar queue: the parent queue
220 @ivar id: the job ID
221 @type ops: list
222 @ivar ops: the list of _QueuedOpCode that constitute the job
223 @type log_serial: int
224 @ivar log_serial: holds the index for the next log entry
225 @ivar received_timestamp: the timestamp for when the job was received
226 @ivar start_timestmap: the timestamp for start of execution
227 @ivar end_timestamp: the timestamp for end of execution
228 @ivar writable: Whether the job is allowed to be modified
229
230 """
231
232 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
233 "received_timestamp", "start_timestamp", "end_timestamp",
234 "__weakref__", "processor_lock", "writable", "archived"]
235
237 """Extend the reason trail
238
239 Add the reason for all the opcodes of this job to be executed.
240
241 """
242 count = 0
243 for queued_op in self.ops:
244 op = queued_op.input
245 if pickup:
246 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
247 else:
248 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
249 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
250 reason_src_prefix)
251 reason_text = "job=%d;index=%d" % (self.id, count)
252 reason = getattr(op, "reason", [])
253 reason.append((reason_src, reason_text, utils.EpochNano()))
254 op.reason = reason
255 count = count + 1
256
257 - def __init__(self, queue, job_id, ops, writable):
258 """Constructor for the _QueuedJob.
259
260 @type queue: L{JobQueue}
261 @param queue: our parent queue
262 @type job_id: job_id
263 @param job_id: our job id
264 @type ops: list
265 @param ops: the list of opcodes we hold, which will be encapsulated
266 in _QueuedOpCodes
267 @type writable: bool
268 @param writable: Whether job can be modified
269
270 """
271 if not ops:
272 raise errors.GenericError("A job needs at least one opcode")
273
274 self.queue = queue
275 self.id = int(job_id)
276 self.ops = [_QueuedOpCode(op) for op in ops]
277 self.AddReasons()
278 self.log_serial = 0
279 self.received_timestamp = TimeStampNow()
280 self.start_timestamp = None
281 self.end_timestamp = None
282 self.archived = False
283
284 self._InitInMemory(self, writable)
285
286 assert not self.archived, "New jobs can not be marked as archived"
287
288 @staticmethod
290 """Initializes in-memory variables.
291
292 """
293 obj.writable = writable
294 obj.ops_iter = None
295 obj.cur_opctx = None
296
297
298 if writable:
299 obj.processor_lock = threading.Lock()
300 else:
301 obj.processor_lock = None
302
304 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
305 "id=%s" % self.id,
306 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
307
308 return "<%s at %#x>" % (" ".join(status), id(self))
309
310 @classmethod
311 - def Restore(cls, queue, state, writable, archived):
312 """Restore a _QueuedJob from serialized state:
313
314 @type queue: L{JobQueue}
315 @param queue: to which queue the restored job belongs
316 @type state: dict
317 @param state: the serialized state
318 @type writable: bool
319 @param writable: Whether job can be modified
320 @type archived: bool
321 @param archived: Whether job was already archived
322 @rtype: _JobQueue
323 @return: the restored _JobQueue instance
324
325 """
326 obj = _QueuedJob.__new__(cls)
327 obj.queue = queue
328 obj.id = int(state["id"])
329 obj.received_timestamp = state.get("received_timestamp", None)
330 obj.start_timestamp = state.get("start_timestamp", None)
331 obj.end_timestamp = state.get("end_timestamp", None)
332 obj.archived = archived
333
334 obj.ops = []
335 obj.log_serial = 0
336 for op_state in state["ops"]:
337 op = _QueuedOpCode.Restore(op_state)
338 for log_entry in op.log:
339 obj.log_serial = max(obj.log_serial, log_entry[0])
340 obj.ops.append(op)
341
342 cls._InitInMemory(obj, writable)
343
344 return obj
345
347 """Serialize the _JobQueue instance.
348
349 @rtype: dict
350 @return: the serialized state
351
352 """
353 return {
354 "id": self.id,
355 "ops": [op.Serialize() for op in self.ops],
356 "start_timestamp": self.start_timestamp,
357 "end_timestamp": self.end_timestamp,
358 "received_timestamp": self.received_timestamp,
359 }
360
413
415 """Gets the current priority for this job.
416
417 Only unfinished opcodes are considered. When all are done, the default
418 priority is used.
419
420 @rtype: int
421
422 """
423 priorities = [op.priority for op in self.ops
424 if op.status not in constants.OPS_FINALIZED]
425
426 if not priorities:
427
428 return constants.OP_PRIO_DEFAULT
429
430 return min(priorities)
431
433 """Selectively returns the log entries.
434
435 @type newer_than: None or int
436 @param newer_than: if this is None, return all log entries,
437 otherwise return only the log entries with serial higher
438 than this value
439 @rtype: list
440 @return: the list of the log entries selected
441
442 """
443 if newer_than is None:
444 serial = -1
445 else:
446 serial = newer_than
447
448 entries = []
449 for op in self.ops:
450 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
451
452 return entries
453
455 """Returns information about a job.
456
457 @type fields: list
458 @param fields: names of fields to return
459 @rtype: list
460 @return: list with one element for each field
461 @raise errors.OpExecError: when an invalid field
462 has been passed
463
464 """
465 return _SimpleJobQuery(fields)(self)
466
468 """Mark unfinished opcodes with a given status and result.
469
470 This is an utility function for marking all running or waiting to
471 be run opcodes with a given status. Opcodes which are already
472 finalised are not changed.
473
474 @param status: a given opcode status
475 @param result: the opcode result
476
477 """
478 not_marked = True
479 for op in self.ops:
480 if op.status in constants.OPS_FINALIZED:
481 assert not_marked, "Finalized opcodes found after non-finalized ones"
482 continue
483 op.status = status
484 op.result = result
485 not_marked = False
486
488 """Marks the job as finalized.
489
490 """
491 self.end_timestamp = TimeStampNow()
492
517
519 """Changes the job priority.
520
521 @type priority: int
522 @param priority: New priority
523 @rtype: tuple; (bool, string)
524 @return: Boolean describing whether job's priority was successfully changed
525 and a text message
526
527 """
528 status = self.CalcStatus()
529
530 if status in constants.JOBS_FINALIZED:
531 return (False, "Job %s is finished" % self.id)
532 elif status == constants.JOB_STATUS_CANCELING:
533 return (False, "Job %s is cancelling" % self.id)
534 else:
535 assert status in (constants.JOB_STATUS_QUEUED,
536 constants.JOB_STATUS_WAITING,
537 constants.JOB_STATUS_RUNNING)
538
539 changed = False
540 for op in self.ops:
541 if (op.status == constants.OP_STATUS_RUNNING or
542 op.status in constants.OPS_FINALIZED):
543 assert not changed, \
544 ("Found opcode for which priority should not be changed after"
545 " priority has been changed for previous opcodes")
546 continue
547
548 assert op.status in (constants.OP_STATUS_QUEUED,
549 constants.OP_STATUS_WAITING)
550
551 changed = True
552
553
554 op.priority = priority
555
556 if changed:
557 return (True, ("Priorities of pending opcodes for job %s have been"
558 " changed to %s" % (self.id, priority)))
559 else:
560 return (False, "Job %s had no pending opcodes" % self.id)
561
564
566 """Initializes this class.
567
568 @type queue: L{JobQueue}
569 @param queue: Job queue
570 @type job: L{_QueuedJob}
571 @param job: Job object
572 @type op: L{_QueuedOpCode}
573 @param op: OpCode
574
575 """
576 super(_OpExecCallbacks, self).__init__()
577
578 assert queue, "Queue is missing"
579 assert job, "Job is missing"
580 assert op, "Opcode is missing"
581
582 self._queue = queue
583 self._job = job
584 self._op = op
585
599
600 @locking.ssynchronized(_QUEUE, shared=1)
602 """Mark the opcode as running, not lock-waiting.
603
604 This is called from the mcpu code as a notifier function, when the LU is
605 finally about to start the Exec() method. Of course, to have end-user
606 visible results, the opcode must be initially (before calling into
607 Processor.ExecOpCode) set to OP_STATUS_WAITING.
608
609 """
610 assert self._op in self._job.ops
611 assert self._op.status in (constants.OP_STATUS_WAITING,
612 constants.OP_STATUS_CANCELING)
613
614
615 self._CheckCancel()
616
617 logging.debug("Opcode is now running")
618
619 self._op.status = constants.OP_STATUS_RUNNING
620 self._op.exec_timestamp = TimeStampNow()
621
622
623 self._queue.UpdateJobUnlocked(self._job)
624
625 @locking.ssynchronized(_QUEUE, shared=1)
627 """Internal feedback append function, with locks
628
629 """
630 self._job.log_serial += 1
631 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
632 self._queue.UpdateJobUnlocked(self._job, replicate=False)
633
650
662
664 """Submits jobs for processing.
665
666 See L{JobQueue.SubmitManyJobs}.
667
668 """
669
670 return self._queue.SubmitManyJobs(jobs)
671
674 - def __init__(self, fields, prev_job_info, prev_log_serial):
675 """Initializes this class.
676
677 @type fields: list of strings
678 @param fields: Fields requested by LUXI client
679 @type prev_job_info: string
680 @param prev_job_info: previous job info, as passed by the LUXI client
681 @type prev_log_serial: string
682 @param prev_log_serial: previous job serial, as passed by the LUXI client
683
684 """
685 self._squery = _SimpleJobQuery(fields)
686 self._prev_job_info = prev_job_info
687 self._prev_log_serial = prev_log_serial
688
725
728 - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
729 """Initializes this class.
730
731 @type filename: string
732 @param filename: Path to job file
733 @raises errors.InotifyError: if the notifier cannot be setup
734
735 """
736 self._wm = _inotify_wm_cls()
737 self._inotify_handler = \
738 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
739 self._notifier = \
740 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
741 try:
742 self._inotify_handler.enable()
743 except Exception:
744
745 self._notifier.stop()
746 raise
747
749 """Callback for inotify.
750
751 """
752 if not notifier_enabled:
753 self._inotify_handler.enable()
754
755 - def Wait(self, timeout):
756 """Waits for the job file to change.
757
758 @type timeout: float
759 @param timeout: Timeout in seconds
760 @return: Whether there have been events
761
762 """
763 assert timeout >= 0
764 have_events = self._notifier.check_events(timeout * 1000)
765 if have_events:
766 self._notifier.read_events()
767 self._notifier.process_events()
768 return have_events
769
771 """Closes underlying notifier and its file descriptor.
772
773 """
774 self._notifier.stop()
775
779 """Initializes this class.
780
781 @type filename: string
782 @param filename: Path to job file
783
784 """
785 self._filewaiter = None
786 self._filename = filename
787 self._waiter_cls = _waiter_cls
788
789 - def Wait(self, timeout):
790 """Waits for a job to change.
791
792 @type timeout: float
793 @param timeout: Timeout in seconds
794 @return: Whether there have been events
795
796 """
797 if self._filewaiter:
798 return self._filewaiter.Wait(timeout)
799
800
801
802
803
804 self._filewaiter = self._waiter_cls(self._filename)
805
806 return True
807
809 """Closes underlying waiter.
810
811 """
812 if self._filewaiter:
813 self._filewaiter.Close()
814
817 """Helper class using inotify to wait for changes in a job file.
818
819 This class takes a previous job status and serial, and alerts the client when
820 the current job status has changed.
821
822 """
823 @staticmethod
825 if counter.next() > 0:
826
827
828
829 time.sleep(0.1)
830
831 job = job_load_fn()
832 if not job:
833 raise errors.JobLost()
834
835 result = check_fn(job)
836 if result is None:
837 raise utils.RetryAgain()
838
839 return result
840
841 - def __call__(self, filename, job_load_fn,
842 fields, prev_job_info, prev_log_serial, timeout,
843 _waiter_cls=_JobChangesWaiter):
844 """Waits for changes on a job.
845
846 @type filename: string
847 @param filename: File on which to wait for changes
848 @type job_load_fn: callable
849 @param job_load_fn: Function to load job
850 @type fields: list of strings
851 @param fields: Which fields to check for changes
852 @type prev_job_info: list or None
853 @param prev_job_info: Last job information returned
854 @type prev_log_serial: int
855 @param prev_log_serial: Last job message serial number
856 @type timeout: float
857 @param timeout: maximum time to wait in seconds
858
859 """
860 counter = itertools.count()
861 try:
862 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
863 waiter = _waiter_cls(filename)
864 try:
865 return utils.Retry(compat.partial(self._CheckForChanges,
866 counter, job_load_fn, check_fn),
867 utils.RETRY_REMAINING_TIME, timeout,
868 wait_fn=waiter.Wait)
869 finally:
870 waiter.Close()
871 except errors.JobLost:
872 return None
873 except utils.RetryTimeout:
874 return constants.JOB_NOTCHANGED
875
887
891 """Initializes this class.
892
893 """
894 self._fn = fn
895 self._next = None
896
898 """Gets the next timeout if necessary.
899
900 """
901 if self._next is None:
902 self._next = self._fn()
903
905 """Returns the next timeout.
906
907 """
908 self._Advance()
909 return self._next
910
912 """Returns the current timeout and advances the internal state.
913
914 """
915 self._Advance()
916 result = self._next
917 self._next = None
918 return result
919
920
921 -class _OpExecContext:
922 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
923 """Initializes this class.
924
925 """
926 self.op = op
927 self.index = index
928 self.log_prefix = log_prefix
929 self.summary = op.input.Summary()
930
931
932 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
933 self.jobdeps = op.input.depends[:]
934 else:
935 self.jobdeps = None
936
937 self._timeout_strategy_factory = timeout_strategy_factory
938 self._ResetTimeoutStrategy()
939
941 """Creates a new timeout strategy.
942
943 """
944 self._timeout_strategy = \
945 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
946
948 """Checks whether priority can and should be increased.
949
950 Called when locks couldn't be acquired.
951
952 """
953 op = self.op
954
955
956
957 if (self._timeout_strategy.Peek() is None and
958 op.priority > constants.OP_PRIO_HIGHEST):
959 logging.debug("Increasing priority")
960 op.priority -= 1
961 self._ResetTimeoutStrategy()
962 return True
963
964 return False
965
967 """Returns the next lock acquire timeout.
968
969 """
970 return self._timeout_strategy.Next()
971
974 (DEFER,
975 WAITDEP,
976 FINISHED) = range(1, 4)
977
980 """Initializes this class.
981
982 """
983 self.queue = queue
984 self.opexec_fn = opexec_fn
985 self.job = job
986 self._timeout_strategy_factory = _timeout_strategy_factory
987
988 @staticmethod
990 """Locates the next opcode to run.
991
992 @type job: L{_QueuedJob}
993 @param job: Job object
994 @param timeout_strategy_factory: Callable to create new timeout strategy
995
996 """
997
998
999
1000
1001 if job.ops_iter is None:
1002 job.ops_iter = enumerate(job.ops)
1003
1004
1005 while True:
1006 try:
1007 (idx, op) = job.ops_iter.next()
1008 except StopIteration:
1009 raise errors.ProgrammerError("Called for a finished job")
1010
1011 if op.status == constants.OP_STATUS_RUNNING:
1012
1013 raise errors.ProgrammerError("Called for job marked as running")
1014
1015 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
1016 timeout_strategy_factory)
1017
1018 if op.status not in constants.OPS_FINALIZED:
1019 return opctx
1020
1021
1022
1023
1024
1025 logging.info("%s: opcode %s already processed, skipping",
1026 opctx.log_prefix, opctx.summary)
1027
1028 @staticmethod
1030 """Marks an opcode as waiting for locks.
1031
1032 The job's start timestamp is also set if necessary.
1033
1034 @type job: L{_QueuedJob}
1035 @param job: Job object
1036 @type op: L{_QueuedOpCode}
1037 @param op: Opcode object
1038
1039 """
1040 assert op in job.ops
1041 assert op.status in (constants.OP_STATUS_QUEUED,
1042 constants.OP_STATUS_WAITING)
1043
1044 update = False
1045
1046 op.result = None
1047
1048 if op.status == constants.OP_STATUS_QUEUED:
1049 op.status = constants.OP_STATUS_WAITING
1050 update = True
1051
1052 if op.start_timestamp is None:
1053 op.start_timestamp = TimeStampNow()
1054 update = True
1055
1056 if job.start_timestamp is None:
1057 job.start_timestamp = op.start_timestamp
1058 update = True
1059
1060 assert op.status == constants.OP_STATUS_WAITING
1061
1062 return update
1063
1064 @staticmethod
1066 """Checks if an opcode has dependencies and if so, processes them.
1067
1068 @type queue: L{JobQueue}
1069 @param queue: Queue object
1070 @type job: L{_QueuedJob}
1071 @param job: Job object
1072 @type opctx: L{_OpExecContext}
1073 @param opctx: Opcode execution context
1074 @rtype: bool
1075 @return: Whether opcode will be re-scheduled by dependency tracker
1076
1077 """
1078 op = opctx.op
1079
1080 result = False
1081
1082 while opctx.jobdeps:
1083 (dep_job_id, dep_status) = opctx.jobdeps[0]
1084
1085 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1086 dep_status)
1087 assert ht.TNonEmptyString(depmsg), "No dependency message"
1088
1089 logging.info("%s: %s", opctx.log_prefix, depmsg)
1090
1091 if depresult == _JobDependencyManager.CONTINUE:
1092
1093 opctx.jobdeps.pop(0)
1094
1095 elif depresult == _JobDependencyManager.WAIT:
1096
1097
1098 result = True
1099 break
1100
1101 elif depresult == _JobDependencyManager.CANCEL:
1102
1103 job.Cancel()
1104 assert op.status == constants.OP_STATUS_CANCELING
1105 break
1106
1107 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1108 _JobDependencyManager.ERROR):
1109
1110 op.status = constants.OP_STATUS_ERROR
1111 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1112 break
1113
1114 else:
1115 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1116 depresult)
1117
1118 return result
1119
1121 """Processes one opcode and returns the result.
1122
1123 """
1124 op = opctx.op
1125
1126 assert op.status in (constants.OP_STATUS_WAITING,
1127 constants.OP_STATUS_CANCELING)
1128
1129
1130 if op.status == constants.OP_STATUS_CANCELING:
1131 return (constants.OP_STATUS_CANCELING, None)
1132
1133 timeout = opctx.GetNextLockTimeout()
1134
1135 try:
1136
1137 result = self.opexec_fn(op.input,
1138 _OpExecCallbacks(self.queue, self.job, op),
1139 timeout=timeout)
1140 except mcpu.LockAcquireTimeout:
1141 assert timeout is not None, "Received timeout for blocking acquire"
1142 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1143
1144 assert op.status in (constants.OP_STATUS_WAITING,
1145 constants.OP_STATUS_CANCELING)
1146
1147
1148 if op.status == constants.OP_STATUS_CANCELING:
1149 return (constants.OP_STATUS_CANCELING, None)
1150
1151
1152 if not self.queue.AcceptingJobsUnlocked():
1153 return (constants.OP_STATUS_QUEUED, None)
1154
1155
1156 return (constants.OP_STATUS_WAITING, None)
1157 except CancelJob:
1158 logging.exception("%s: Canceling job", opctx.log_prefix)
1159 assert op.status == constants.OP_STATUS_CANCELING
1160 return (constants.OP_STATUS_CANCELING, None)
1161
1162 except QueueShutdown:
1163 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1164
1165 assert op.status == constants.OP_STATUS_WAITING
1166
1167
1168 return (constants.OP_STATUS_QUEUED, None)
1169
1170 except Exception, err:
1171 logging.exception("%s: Caught exception in %s",
1172 opctx.log_prefix, opctx.summary)
1173 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1174 else:
1175 logging.debug("%s: %s successful",
1176 opctx.log_prefix, opctx.summary)
1177 return (constants.OP_STATUS_SUCCESS, result)
1178
1180 """Continues execution of a job.
1181
1182 @param _nextop_fn: Callback function for tests
1183 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1184 be deferred and C{WAITDEP} if the dependency manager
1185 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1186
1187 """
1188 queue = self.queue
1189 job = self.job
1190
1191 logging.debug("Processing job %s", job.id)
1192
1193 queue.acquire(shared=1)
1194 try:
1195 opcount = len(job.ops)
1196
1197 assert job.writable, "Expected writable job"
1198
1199
1200 if job.CalcStatus() in constants.JOBS_FINALIZED:
1201 return self.FINISHED
1202
1203
1204 if job.cur_opctx:
1205 opctx = job.cur_opctx
1206 job.cur_opctx = None
1207 else:
1208 if __debug__ and _nextop_fn:
1209 _nextop_fn()
1210 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1211
1212 op = opctx.op
1213
1214
1215 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1216 constants.OP_STATUS_CANCELING)
1217 for i in job.ops[opctx.index + 1:])
1218
1219 assert op.status in (constants.OP_STATUS_QUEUED,
1220 constants.OP_STATUS_WAITING,
1221 constants.OP_STATUS_CANCELING)
1222
1223 assert (op.priority <= constants.OP_PRIO_LOWEST and
1224 op.priority >= constants.OP_PRIO_HIGHEST)
1225
1226 waitjob = None
1227
1228 if op.status != constants.OP_STATUS_CANCELING:
1229 assert op.status in (constants.OP_STATUS_QUEUED,
1230 constants.OP_STATUS_WAITING)
1231
1232
1233 if self._MarkWaitlock(job, op):
1234
1235 queue.UpdateJobUnlocked(job)
1236
1237 assert op.status == constants.OP_STATUS_WAITING
1238 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1239 assert job.start_timestamp and op.start_timestamp
1240 assert waitjob is None
1241
1242
1243 waitjob = self._CheckDependencies(queue, job, opctx)
1244
1245 assert op.status in (constants.OP_STATUS_WAITING,
1246 constants.OP_STATUS_CANCELING,
1247 constants.OP_STATUS_ERROR)
1248
1249 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1250 constants.OP_STATUS_ERROR)):
1251 logging.info("%s: opcode %s waiting for locks",
1252 opctx.log_prefix, opctx.summary)
1253
1254 assert not opctx.jobdeps, "Not all dependencies were removed"
1255
1256 queue.release()
1257 try:
1258 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1259 finally:
1260 queue.acquire(shared=1)
1261
1262 op.status = op_status
1263 op.result = op_result
1264
1265 assert not waitjob
1266
1267 if op.status in (constants.OP_STATUS_WAITING,
1268 constants.OP_STATUS_QUEUED):
1269
1270
1271 assert not op.end_timestamp
1272 else:
1273
1274 op.end_timestamp = TimeStampNow()
1275
1276 if op.status == constants.OP_STATUS_CANCELING:
1277 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1278 for i in job.ops[opctx.index:])
1279 else:
1280 assert op.status in constants.OPS_FINALIZED
1281
1282 if op.status == constants.OP_STATUS_QUEUED:
1283
1284 assert not waitjob
1285
1286 finalize = False
1287
1288
1289 job.cur_opctx = None
1290
1291
1292 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1293
1294 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1295 finalize = False
1296
1297 if not waitjob and opctx.CheckPriorityIncrease():
1298
1299 queue.UpdateJobUnlocked(job)
1300
1301
1302 job.cur_opctx = opctx
1303
1304 assert (op.priority <= constants.OP_PRIO_LOWEST and
1305 op.priority >= constants.OP_PRIO_HIGHEST)
1306
1307
1308 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1309
1310 else:
1311
1312 assert (opctx.index == 0 or
1313 compat.all(i.status == constants.OP_STATUS_SUCCESS
1314 for i in job.ops[:opctx.index]))
1315
1316
1317 job.cur_opctx = None
1318
1319 if op.status == constants.OP_STATUS_SUCCESS:
1320 finalize = False
1321
1322 elif op.status == constants.OP_STATUS_ERROR:
1323
1324 assert errors.GetEncodedError(job.ops[opctx.index].result)
1325
1326 to_encode = errors.OpExecError("Preceding opcode failed")
1327 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1328 _EncodeOpError(to_encode))
1329 finalize = True
1330
1331
1332 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1333 errors.GetEncodedError(i.result)
1334 for i in job.ops[opctx.index:])
1335
1336 elif op.status == constants.OP_STATUS_CANCELING:
1337 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1338 "Job canceled by request")
1339 finalize = True
1340
1341 else:
1342 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1343
1344 if opctx.index == (opcount - 1):
1345
1346 finalize = True
1347
1348 if finalize:
1349
1350 job.Finalize()
1351
1352
1353
1354 queue.UpdateJobUnlocked(job)
1355
1356 assert not waitjob
1357
1358 if finalize:
1359 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1360 return self.FINISHED
1361
1362 assert not waitjob or queue.depmgr.JobWaiting(job)
1363
1364 if waitjob:
1365 return self.WAITDEP
1366 else:
1367 return self.DEFER
1368 finally:
1369 assert job.writable, "Job became read-only while being processed"
1370 queue.release()
1371
1394
1397 """The actual job workers.
1398
1399 """
1401 """Job executor.
1402
1403 @type job: L{_QueuedJob}
1404 @param job: the job to be processed
1405
1406 """
1407 assert job.writable, "Expected writable job"
1408
1409
1410
1411
1412 job.processor_lock.acquire()
1413 try:
1414 return self._RunTaskInner(job)
1415 finally:
1416 job.processor_lock.release()
1417
1438
1439 @staticmethod
1441 """Updates the worker thread name to include a short summary of the opcode.
1442
1443 @param setname_fn: Callable setting worker thread name
1444 @param execop_fn: Callable for executing opcode (usually
1445 L{mcpu.Processor.ExecOpCode})
1446
1447 """
1448 setname_fn(op)
1449 try:
1450 return execop_fn(op, *args, **kwargs)
1451 finally:
1452 setname_fn(None)
1453
1454 @staticmethod
1456 """Sets the worker thread name.
1457
1458 @type job: L{_QueuedJob}
1459 @type op: L{opcodes.OpCode}
1460
1461 """
1462 parts = ["Job%s" % job.id]
1463
1464 if op:
1465 parts.append(op.TinySummary())
1466
1467 return "/".join(parts)
1468
1471 """Simple class implementing a job-processing workerpool.
1472
1473 """
1479
1482 """Keeps track of job dependencies.
1483
1484 """
1485 (WAIT,
1486 ERROR,
1487 CANCEL,
1488 CONTINUE,
1489 WRONGSTATUS) = range(1, 6)
1490
1491 - def __init__(self, getstatus_fn, enqueue_fn):
1492 """Initializes this class.
1493
1494 """
1495 self._getstatus_fn = getstatus_fn
1496 self._enqueue_fn = enqueue_fn
1497
1498 self._waiters = {}
1499 self._lock = locking.SharedLock("JobDepMgr")
1500
1501 @locking.ssynchronized(_LOCK, shared=1)
1503 """Retrieves information about waiting jobs.
1504
1505 @type requested: set
1506 @param requested: Requested information, see C{query.LQ_*}
1507
1508 """
1509
1510
1511
1512 return [("job/%s" % job_id, None, None,
1513 [("job", [job.id for job in waiters])])
1514 for job_id, waiters in self._waiters.items()
1515 if waiters]
1516
1517 @locking.ssynchronized(_LOCK, shared=1)
1519 """Checks if a job is waiting.
1520
1521 """
1522 return compat.any(job in jobs
1523 for jobs in self._waiters.values())
1524
1525 @locking.ssynchronized(_LOCK)
1527 """Checks if a dependency job has the requested status.
1528
1529 If the other job is not yet in a finalized status, the calling job will be
1530 notified (re-added to the workerpool) at a later point.
1531
1532 @type job: L{_QueuedJob}
1533 @param job: Job object
1534 @type dep_job_id: int
1535 @param dep_job_id: ID of dependency job
1536 @type dep_status: list
1537 @param dep_status: Required status
1538
1539 """
1540 assert ht.TJobId(job.id)
1541 assert ht.TJobId(dep_job_id)
1542 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1543
1544 if job.id == dep_job_id:
1545 return (self.ERROR, "Job can't depend on itself")
1546
1547
1548 try:
1549 status = self._getstatus_fn(dep_job_id)
1550 except errors.JobLost, err:
1551 return (self.ERROR, "Dependency error: %s" % err)
1552
1553 assert status in constants.JOB_STATUS_ALL
1554
1555 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1556
1557 if status not in constants.JOBS_FINALIZED:
1558
1559 job_id_waiters.add(job)
1560 return (self.WAIT,
1561 "Need to wait for job %s, wanted status '%s'" %
1562 (dep_job_id, dep_status))
1563
1564
1565 if job in job_id_waiters:
1566 job_id_waiters.remove(job)
1567
1568 if (status == constants.JOB_STATUS_CANCELED and
1569 constants.JOB_STATUS_CANCELED not in dep_status):
1570 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1571
1572 elif not dep_status or status in dep_status:
1573 return (self.CONTINUE,
1574 "Dependency job %s finished with status '%s'" %
1575 (dep_job_id, status))
1576
1577 else:
1578 return (self.WRONGSTATUS,
1579 "Dependency job %s finished with status '%s',"
1580 " not one of '%s' as required" %
1581 (dep_job_id, status, utils.CommaJoin(dep_status)))
1582
1584 """Remove all jobs without actual waiters.
1585
1586 """
1587 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1588 if not waiters]:
1589 del self._waiters[job_id]
1590
1592 """Notifies all jobs waiting for a certain job ID.
1593
1594 @attention: Do not call until L{CheckAndRegister} returned a status other
1595 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1596 @type job_id: int
1597 @param job_id: Job ID
1598
1599 """
1600 assert ht.TJobId(job_id)
1601
1602 self._lock.acquire()
1603 try:
1604 self._RemoveEmptyWaitersUnlocked()
1605
1606 jobs = self._waiters.pop(job_id, None)
1607 finally:
1608 self._lock.release()
1609
1610 if jobs:
1611
1612 logging.debug("Re-adding %s jobs which were waiting for job %s",
1613 len(jobs), job_id)
1614 self._enqueue_fn(jobs)
1615
1618 """Decorator for "public" functions.
1619
1620 This function should be used for all 'public' functions. That is,
1621 functions usually called from other classes. Note that this should
1622 be applied only to methods (not plain functions), since it expects
1623 that the decorated function is called with a first argument that has
1624 a '_queue_filelock' argument.
1625
1626 @warning: Use this decorator only after locking.ssynchronized
1627
1628 Example::
1629 @locking.ssynchronized(_LOCK)
1630 @_RequireOpenQueue
1631 def Example(self):
1632 pass
1633
1634 """
1635 def wrapper(self, *args, **kwargs):
1636
1637 assert self._queue_filelock is not None, "Queue should be open"
1638 return fn(self, *args, **kwargs)
1639 return wrapper
1640
1643 """Decorator checking for a non-drained queue.
1644
1645 To be used with functions submitting new jobs.
1646
1647 """
1648 def wrapper(self, *args, **kwargs):
1649 """Wrapper function.
1650
1651 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1652
1653 """
1654
1655
1656
1657 if self._drained:
1658 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1659
1660 if not self._accepting_jobs:
1661 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1662
1663 return fn(self, *args, **kwargs)
1664 return wrapper
1665
1668 """Queue used to manage the jobs.
1669
1670 """
1672 """Constructor for JobQueue.
1673
1674 The constructor will initialize the job queue object and then
1675 start loading the current jobs from disk, either for starting them
1676 (if they were queue) or for aborting them (if they were already
1677 running).
1678
1679 @type context: GanetiContext
1680 @param context: the context object for access to the configuration
1681 data and other ganeti objects
1682
1683 """
1684 self.context = context
1685 self._memcache = weakref.WeakValueDictionary()
1686 self._my_hostname = netutils.Hostname.GetSysName()
1687
1688
1689
1690
1691
1692
1693 self._lock = locking.SharedLock("JobQueue")
1694
1695 self.acquire = self._lock.acquire
1696 self.release = self._lock.release
1697
1698
1699 self._accepting_jobs = True
1700
1701
1702
1703 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1704
1705
1706 self._last_serial = jstore.ReadSerial()
1707 assert self._last_serial is not None, ("Serial file was modified between"
1708 " check in jstore and here")
1709
1710
1711 self._nodes = dict((n.name, n.primary_ip)
1712 for n in self.context.cfg.GetAllNodesInfo().values()
1713 if n.master_candidate)
1714
1715
1716 self._nodes.pop(self._my_hostname, None)
1717
1718
1719
1720 self._queue_size = None
1721 self._UpdateQueueSizeUnlocked()
1722 assert ht.TInt(self._queue_size)
1723 self._drained = jstore.CheckDrainFlag()
1724
1725
1726 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1727 self._EnqueueJobs)
1728 self.context.glm.AddToLockMonitor(self.depmgr)
1729
1730
1731 self._wpool = _JobQueueWorkerPool(self)
1732
1768
1769 @locking.ssynchronized(_LOCK)
1772
1774 """Gets RPC runner with context.
1775
1776 """
1777 return rpc.JobQueueRunner(self.context, address_list)
1778
1779 @locking.ssynchronized(_LOCK)
1780 @_RequireOpenQueue
1782 """Register a new node with the queue.
1783
1784 @type node: L{objects.Node}
1785 @param node: the node object to be added
1786
1787 """
1788 node_name = node.name
1789 assert node_name != self._my_hostname
1790
1791
1792 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1793 msg = result.fail_msg
1794 if msg:
1795 logging.warning("Cannot cleanup queue directory on node %s: %s",
1796 node_name, msg)
1797
1798 if not node.master_candidate:
1799
1800 self._nodes.pop(node_name, None)
1801
1802 return
1803
1804
1805 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1806
1807
1808 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1809
1810
1811 addrs = [node.primary_ip]
1812
1813 for file_name in files:
1814
1815 content = utils.ReadFile(file_name)
1816
1817 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1818 file_name, content)
1819 msg = result[node_name].fail_msg
1820 if msg:
1821 logging.error("Failed to upload file %s to node %s: %s",
1822 file_name, node_name, msg)
1823
1824
1825 result = \
1826 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1827 self._drained)
1828 msg = result[node_name].fail_msg
1829 if msg:
1830 logging.error("Failed to set queue drained flag on node %s: %s",
1831 node_name, msg)
1832
1833 self._nodes[node_name] = node.primary_ip
1834
1835 @locking.ssynchronized(_LOCK)
1836 @_RequireOpenQueue
1838 """Callback called when removing nodes from the cluster.
1839
1840 @type node_name: str
1841 @param node_name: the name of the node to remove
1842
1843 """
1844 self._nodes.pop(node_name, None)
1845
1846 @staticmethod
1848 """Verifies the status of an RPC call.
1849
1850 Since we aim to keep consistency should this node (the current
1851 master) fail, we will log errors if our rpc fail, and especially
1852 log the case when more than half of the nodes fails.
1853
1854 @param result: the data as returned from the rpc call
1855 @type nodes: list
1856 @param nodes: the list of nodes we made the call to
1857 @type failmsg: str
1858 @param failmsg: the identifier to be used for logging
1859
1860 """
1861 failed = []
1862 success = []
1863
1864 for node in nodes:
1865 msg = result[node].fail_msg
1866 if msg:
1867 failed.append(node)
1868 logging.error("RPC call %s (%s) failed on node %s: %s",
1869 result[node].call, failmsg, node, msg)
1870 else:
1871 success.append(node)
1872
1873
1874 if (len(success) + 1) < len(failed):
1875
1876 logging.error("More than half of the nodes failed")
1877
1879 """Helper for returning the node name/ip list.
1880
1881 @rtype: (list, list)
1882 @return: a tuple of two lists, the first one with the node
1883 names and the second one with the node addresses
1884
1885 """
1886
1887 name_list = self._nodes.keys()
1888 addr_list = [self._nodes[name] for name in name_list]
1889 return name_list, addr_list
1890
1892 """Writes a file locally and then replicates it to all nodes.
1893
1894 This function will replace the contents of a file on the local
1895 node and then replicate it to all the other nodes we have.
1896
1897 @type file_name: str
1898 @param file_name: the path of the file to be replicated
1899 @type data: str
1900 @param data: the new contents of the file
1901 @type replicate: boolean
1902 @param replicate: whether to spread the changes to the remote nodes
1903
1904 """
1905 getents = runtime.GetEnts()
1906 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1907 gid=getents.daemons_gid,
1908 mode=constants.JOB_QUEUE_FILES_PERMS)
1909
1910 if replicate:
1911 names, addrs = self._GetNodeIp()
1912 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1913 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1914
1916 """Renames a file locally and then replicate the change.
1917
1918 This function will rename a file in the local queue directory
1919 and then replicate this rename to all the other nodes we have.
1920
1921 @type rename: list of (old, new)
1922 @param rename: List containing tuples mapping old to new names
1923
1924 """
1925
1926 for old, new in rename:
1927 utils.RenameFile(old, new, mkdir=True)
1928
1929
1930 names, addrs = self._GetNodeIp()
1931 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1932 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1933
1934 @staticmethod
1936 """Returns the job file for a given job id.
1937
1938 @type job_id: str
1939 @param job_id: the job identifier
1940 @rtype: str
1941 @return: the path to the job file
1942
1943 """
1944 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1945
1946 @staticmethod
1948 """Returns the archived job file for a give job id.
1949
1950 @type job_id: str
1951 @param job_id: the job identifier
1952 @rtype: str
1953 @return: the path to the archived job file
1954
1955 """
1956 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1957 jstore.GetArchiveDirectory(job_id),
1958 "job-%s" % job_id)
1959
1960 @staticmethod
1962 """Build list of directories containing job files.
1963
1964 @type archived: bool
1965 @param archived: Whether to include directories for archived jobs
1966 @rtype: list
1967
1968 """
1969 result = [pathutils.QUEUE_DIR]
1970
1971 if archived:
1972 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1973 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1974 utils.ListVisibleFiles(archive_path)))
1975
1976 return result
1977
1978 @classmethod
1980 """Return all known job IDs.
1981
1982 The method only looks at disk because it's a requirement that all
1983 jobs are present on disk (so in the _memcache we don't have any
1984 extra IDs).
1985
1986 @type sort: boolean
1987 @param sort: perform sorting on the returned job ids
1988 @rtype: list
1989 @return: the list of job IDs
1990
1991 """
1992 jlist = []
1993
1994 for path in cls._DetermineJobDirectories(archived):
1995 for filename in utils.ListVisibleFiles(path):
1996 m = constants.JOB_FILE_RE.match(filename)
1997 if m:
1998 jlist.append(int(m.group(1)))
1999
2000 if sort:
2001 jlist.sort()
2002 return jlist
2003
2005 """Loads a job from the disk or memory.
2006
2007 Given a job id, this will return the cached job object if
2008 existing, or try to load the job from the disk. If loading from
2009 disk, it will also add the job to the cache.
2010
2011 @type job_id: int
2012 @param job_id: the job id
2013 @rtype: L{_QueuedJob} or None
2014 @return: either None or the job object
2015
2016 """
2017 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2018
2019 job = self._memcache.get(job_id, None)
2020 if job:
2021 logging.debug("Found job %s in memcache", job_id)
2022 assert job.writable, "Found read-only job in memcache"
2023 return job
2024
2025 try:
2026 job = self._LoadJobFromDisk(job_id, False)
2027 if job is None:
2028 return job
2029 except errors.JobFileCorrupted:
2030 old_path = self._GetJobPath(job_id)
2031 new_path = self._GetArchivedJobPath(job_id)
2032 if old_path == new_path:
2033
2034 logging.exception("Can't parse job %s", job_id)
2035 else:
2036
2037 logging.exception("Can't parse job %s, will archive.", job_id)
2038 self._RenameFilesUnlocked([(old_path, new_path)])
2039 return None
2040
2041 assert job.writable, "Job just loaded is not writable"
2042
2043 self._memcache[job_id] = job
2044 logging.debug("Added job %s to the cache", job_id)
2045 return job
2046
2048 """Load the given job file from disk.
2049
2050 Given a job file, read, load and restore it in a _QueuedJob format.
2051
2052 @type job_id: int
2053 @param job_id: job identifier
2054 @type try_archived: bool
2055 @param try_archived: Whether to try loading an archived job
2056 @rtype: L{_QueuedJob} or None
2057 @return: either None or the job object
2058
2059 """
2060 path_functions = [(self._GetJobPath, False)]
2061
2062 if try_archived:
2063 path_functions.append((self._GetArchivedJobPath, True))
2064
2065 raw_data = None
2066 archived = None
2067
2068 for (fn, archived) in path_functions:
2069 filepath = fn(job_id)
2070 logging.debug("Loading job from %s", filepath)
2071 try:
2072 raw_data = utils.ReadFile(filepath)
2073 except EnvironmentError, err:
2074 if err.errno != errno.ENOENT:
2075 raise
2076 else:
2077 break
2078
2079 if not raw_data:
2080 return None
2081
2082 if writable is None:
2083 writable = not archived
2084
2085 try:
2086 data = serializer.LoadJson(raw_data)
2087 job = _QueuedJob.Restore(self, data, writable, archived)
2088 except Exception, err:
2089 raise errors.JobFileCorrupted(err)
2090
2091 return job
2092
2094 """Load the given job file from disk.
2095
2096 Given a job file, read, load and restore it in a _QueuedJob format.
2097 In case of error reading the job, it gets returned as None, and the
2098 exception is logged.
2099
2100 @type job_id: int
2101 @param job_id: job identifier
2102 @type try_archived: bool
2103 @param try_archived: Whether to try loading an archived job
2104 @rtype: L{_QueuedJob} or None
2105 @return: either None or the job object
2106
2107 """
2108 try:
2109 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2110 except (errors.JobFileCorrupted, EnvironmentError):
2111 logging.exception("Can't load/parse job %s", job_id)
2112 return None
2113
2115 """Update the queue size.
2116
2117 """
2118 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2119
2120 @locking.ssynchronized(_LOCK)
2121 @_RequireOpenQueue
2123 """Sets the drain flag for the queue.
2124
2125 @type drain_flag: boolean
2126 @param drain_flag: Whether to set or unset the drain flag
2127
2128 """
2129
2130 jstore.SetDrainFlag(drain_flag)
2131
2132 self._drained = drain_flag
2133
2134
2135 (names, addrs) = self._GetNodeIp()
2136 result = \
2137 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2138 self._CheckRpcResult(result, self._nodes,
2139 "Setting queue drain flag to %s" % drain_flag)
2140
2141 return True
2142
2143 @classmethod
2149
2150 @classmethod
2159
2160 @classmethod
2166
2167 @staticmethod
2174
2175 @staticmethod
2177 """Resolves relative job IDs in dependencies.
2178
2179 @type resolve_fn: callable
2180 @param resolve_fn: Function to resolve a relative job ID
2181 @type deps: list
2182 @param deps: Dependencies
2183 @rtype: tuple; (boolean, string or list)
2184 @return: If successful (first tuple item), the returned list contains
2185 resolved job IDs along with the requested status; if not successful,
2186 the second element is an error message
2187
2188 """
2189 result = []
2190
2191 for (dep_job_id, dep_status) in deps:
2192 if ht.TRelativeJobId(dep_job_id):
2193 assert ht.TInt(dep_job_id) and dep_job_id < 0
2194 try:
2195 job_id = resolve_fn(dep_job_id)
2196 except IndexError:
2197
2198 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2199 else:
2200 job_id = dep_job_id
2201
2202 result.append((job_id, dep_status))
2203
2204 return (True, result)
2205
2206 @locking.ssynchronized(_LOCK)
2208 """Helper function to add jobs to worker pool's queue.
2209
2210 @type jobs: list
2211 @param jobs: List of all jobs
2212
2213 """
2214 return self._EnqueueJobsUnlocked(jobs)
2215
2217 """Helper function to add jobs to worker pool's queue.
2218
2219 @type jobs: list
2220 @param jobs: List of all jobs
2221
2222 """
2223 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2224 self._wpool.AddManyTasks([(job, ) for job in jobs],
2225 priority=[job.CalcPriority() for job in jobs],
2226 task_id=map(_GetIdAttr, jobs))
2227
2229 """Gets the status of a job for dependencies.
2230
2231 @type job_id: int
2232 @param job_id: Job ID
2233 @raise errors.JobLost: If job can't be found
2234
2235 """
2236
2237
2238
2239 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2240
2241 assert not job.writable, "Got writable job"
2242
2243 if job:
2244 return job.CalcStatus()
2245
2246 raise errors.JobLost("Job %s not found" % job_id)
2247
2248 @_RequireOpenQueue
2250 """Update a job's on disk storage.
2251
2252 After a job has been modified, this function needs to be called in
2253 order to write the changes to disk and replicate them to the other
2254 nodes.
2255
2256 @type job: L{_QueuedJob}
2257 @param job: the changed job
2258 @type replicate: boolean
2259 @param replicate: whether to replicate the change to remote nodes
2260
2261 """
2262 if __debug__:
2263 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2264 assert (finalized ^ (job.end_timestamp is None))
2265 assert job.writable, "Can't update read-only job"
2266 assert not job.archived, "Can't update archived job"
2267
2268 filename = self._GetJobPath(job.id)
2269 data = serializer.DumpJson(job.Serialize())
2270 logging.debug("Writing job %s to %s", job.id, filename)
2271 self._UpdateJobQueueFile(filename, data, replicate)
2272
2273 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2274 timeout):
2275 """Waits for changes in a job.
2276
2277 @type job_id: int
2278 @param job_id: Job identifier
2279 @type fields: list of strings
2280 @param fields: Which fields to check for changes
2281 @type prev_job_info: list or None
2282 @param prev_job_info: Last job information returned
2283 @type prev_log_serial: int
2284 @param prev_log_serial: Last job message serial number
2285 @type timeout: float
2286 @param timeout: maximum time to wait in seconds
2287 @rtype: tuple (job info, log entries)
2288 @return: a tuple of the job information as required via
2289 the fields parameter, and the log entries as a list
2290
2291 if the job has not changed and the timeout has expired,
2292 we instead return a special value,
2293 L{constants.JOB_NOTCHANGED}, which should be interpreted
2294 as such by the clients
2295
2296 """
2297 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2298 writable=False)
2299
2300 helper = _WaitForJobChangesHelper()
2301
2302 return helper(self._GetJobPath(job_id), load_fn,
2303 fields, prev_job_info, prev_log_serial, timeout)
2304
2305 @locking.ssynchronized(_LOCK)
2306 @_RequireOpenQueue
2308 """Cancels a job.
2309
2310 This will only succeed if the job has not started yet.
2311
2312 @type job_id: int
2313 @param job_id: job ID of job to be cancelled.
2314
2315 """
2316 logging.info("Cancelling job %s", job_id)
2317
2318 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2319
2320 @locking.ssynchronized(_LOCK)
2321 @_RequireOpenQueue
2323 """Changes a job's priority.
2324
2325 @type job_id: int
2326 @param job_id: ID of the job whose priority should be changed
2327 @type priority: int
2328 @param priority: New priority
2329
2330 """
2331 logging.info("Changing priority of job %s to %s", job_id, priority)
2332
2333 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2334 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2335 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2336 (priority, allowed))
2337
2338 def fn(job):
2339 (success, msg) = job.ChangePriority(priority)
2340
2341 if success:
2342 try:
2343 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2344 except workerpool.NoSuchTask:
2345 logging.debug("Job %s is not in workerpool at this time", job.id)
2346
2347 return (success, msg)
2348
2349 return self._ModifyJobUnlocked(job_id, fn)
2350
2352 """Modifies a job.
2353
2354 @type job_id: int
2355 @param job_id: Job ID
2356 @type mod_fn: callable
2357 @param mod_fn: Modifying function, receiving job object as parameter,
2358 returning tuple of (status boolean, message string)
2359
2360 """
2361 job = self._LoadJobUnlocked(job_id)
2362 if not job:
2363 logging.debug("Job %s not found", job_id)
2364 return (False, "Job %s not found" % job_id)
2365
2366 assert job.writable, "Can't modify read-only job"
2367 assert not job.archived, "Can't modify archived job"
2368
2369 (success, msg) = mod_fn(job)
2370
2371 if success:
2372
2373
2374 self.UpdateJobUnlocked(job)
2375
2376 return (success, msg)
2377
2378 @_RequireOpenQueue
2380 """Archives jobs.
2381
2382 @type jobs: list of L{_QueuedJob}
2383 @param jobs: Job objects
2384 @rtype: int
2385 @return: Number of archived jobs
2386
2387 """
2388 archive_jobs = []
2389 rename_files = []
2390 for job in jobs:
2391 assert job.writable, "Can't archive read-only job"
2392 assert not job.archived, "Can't cancel archived job"
2393
2394 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2395 logging.debug("Job %s is not yet done", job.id)
2396 continue
2397
2398 archive_jobs.append(job)
2399
2400 old = self._GetJobPath(job.id)
2401 new = self._GetArchivedJobPath(job.id)
2402 rename_files.append((old, new))
2403
2404
2405 self._RenameFilesUnlocked(rename_files)
2406
2407 logging.debug("Successfully archived job(s) %s",
2408 utils.CommaJoin(job.id for job in archive_jobs))
2409
2410
2411
2412
2413
2414 self._UpdateQueueSizeUnlocked()
2415 return len(archive_jobs)
2416
2417 @locking.ssynchronized(_LOCK)
2418 @_RequireOpenQueue
2420 """Archives a job.
2421
2422 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2423
2424 @type job_id: int
2425 @param job_id: Job ID of job to be archived.
2426 @rtype: bool
2427 @return: Whether job was archived
2428
2429 """
2430 logging.info("Archiving job %s", job_id)
2431
2432 job = self._LoadJobUnlocked(job_id)
2433 if not job:
2434 logging.debug("Job %s not found", job_id)
2435 return False
2436
2437 return self._ArchiveJobsUnlocked([job]) == 1
2438
2439 @locking.ssynchronized(_LOCK)
2440 @_RequireOpenQueue
2442 """Archives all jobs based on age.
2443
2444 The method will archive all jobs which are older than the age
2445 parameter. For jobs that don't have an end timestamp, the start
2446 timestamp will be considered. The special '-1' age will cause
2447 archival of all jobs (that are not running or queued).
2448
2449 @type age: int
2450 @param age: the minimum age in seconds
2451
2452 """
2453 logging.info("Archiving jobs with age more than %s seconds", age)
2454
2455 now = time.time()
2456 end_time = now + timeout
2457 archived_count = 0
2458 last_touched = 0
2459
2460 all_job_ids = self._GetJobIDsUnlocked()
2461 pending = []
2462 for idx, job_id in enumerate(all_job_ids):
2463 last_touched = idx + 1
2464
2465
2466
2467
2468 if time.time() > end_time:
2469 break
2470
2471
2472 job = self._LoadJobUnlocked(job_id)
2473 if job:
2474 if job.end_timestamp is None:
2475 if job.start_timestamp is None:
2476 job_age = job.received_timestamp
2477 else:
2478 job_age = job.start_timestamp
2479 else:
2480 job_age = job.end_timestamp
2481
2482 if age == -1 or now - job_age[0] > age:
2483 pending.append(job)
2484
2485
2486 if len(pending) >= 10:
2487 archived_count += self._ArchiveJobsUnlocked(pending)
2488 pending = []
2489
2490 if pending:
2491 archived_count += self._ArchiveJobsUnlocked(pending)
2492
2493 return (archived_count, len(all_job_ids) - last_touched)
2494
2495 - def _Query(self, fields, qfilter):
2496 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2497 namefield="id")
2498
2499
2500
2501
2502 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2503
2504 job_ids = qobj.RequestedNames()
2505
2506 list_all = (job_ids is None)
2507
2508 if list_all:
2509
2510
2511 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2512
2513 jobs = []
2514
2515 for job_id in job_ids:
2516 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2517 if job is not None or not list_all:
2518 jobs.append((job_id, job))
2519
2520 return (qobj, jobs, list_all)
2521
2523 """Returns a list of jobs in queue.
2524
2525 @type fields: sequence
2526 @param fields: List of wanted fields
2527 @type qfilter: None or query2 filter (list)
2528 @param qfilter: Query filter
2529
2530 """
2531 (qobj, ctx, _) = self._Query(fields, qfilter)
2532
2533 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2534
2536 """Returns a list of jobs in queue.
2537
2538 @type job_ids: list
2539 @param job_ids: sequence of job identifiers or None for all
2540 @type fields: list
2541 @param fields: names of fields to return
2542 @rtype: list
2543 @return: list one element per job, each element being list with
2544 the requested fields
2545
2546 """
2547
2548 job_ids = [int(jid) for jid in job_ids]
2549 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2550
2551 (qobj, ctx, _) = self._Query(fields, qfilter)
2552
2553 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2554
2555 @locking.ssynchronized(_LOCK)
2557 """Prepare to stop the job queue.
2558
2559 Disables execution of jobs in the workerpool and returns whether there are
2560 any jobs currently running. If the latter is the case, the job queue is not
2561 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2562 be called without interfering with any job. Queued and unfinished jobs will
2563 be resumed next time.
2564
2565 Once this function has been called no new job submissions will be accepted
2566 (see L{_RequireNonDrainedQueue}).
2567
2568 @rtype: bool
2569 @return: Whether there are any running jobs
2570
2571 """
2572 if self._accepting_jobs:
2573 self._accepting_jobs = False
2574
2575
2576 self._wpool.SetActive(False)
2577
2578 return self._wpool.HasRunningTasks()
2579
2581 """Returns whether jobs are accepted.
2582
2583 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2584 queue is shutting down.
2585
2586 @rtype: bool
2587
2588 """
2589 return self._accepting_jobs
2590
2591 @locking.ssynchronized(_LOCK)
2592 @_RequireOpenQueue
2594 """Stops the job queue.
2595
2596 This shutdowns all the worker threads an closes the queue.
2597
2598 """
2599 self._wpool.TerminateWorkers()
2600
2601 self._queue_filelock.Close()
2602 self._queue_filelock = None
2603