1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28 processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40
41 from pyinotify import pyinotify
42 except ImportError:
43 import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60
61
62 JOBQUEUE_THREADS = 25
63 JOBS_PER_ARCHIVE_DIRECTORY = 10000
64
65
66 _LOCK = "_lock"
67 _QUEUE = "_queue"
71 """Special exception to cancel a job.
72
73 """
74
77 """Returns the current timestamp.
78
79 @rtype: tuple
80 @return: the current time in the (seconds, microseconds) format
81
82 """
83 return utils.SplitTime(time.time())
84
87 """Encapsulates an opcode object.
88
89 @ivar log: holds the execution log and consists of tuples
90 of the form C{(log_serial, timestamp, level, message)}
91 @ivar input: the OpCode we encapsulate
92 @ivar status: the current status
93 @ivar result: the result of the LU execution
94 @ivar start_timestamp: timestamp for the start of the execution
95 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96 @ivar stop_timestamp: timestamp for the end of the execution
97
98 """
99 __slots__ = ["input", "status", "result", "log", "priority",
100 "start_timestamp", "exec_timestamp", "end_timestamp",
101 "__weakref__"]
102
104 """Constructor for the _QuededOpCode.
105
106 @type op: L{opcodes.OpCode}
107 @param op: the opcode we encapsulate
108
109 """
110 self.input = op
111 self.status = constants.OP_STATUS_QUEUED
112 self.result = None
113 self.log = []
114 self.start_timestamp = None
115 self.exec_timestamp = None
116 self.end_timestamp = None
117
118
119 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120
121 @classmethod
123 """Restore the _QueuedOpCode from the serialized form.
124
125 @type state: dict
126 @param state: the serialized state
127 @rtype: _QueuedOpCode
128 @return: a new _QueuedOpCode instance
129
130 """
131 obj = _QueuedOpCode.__new__(cls)
132 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133 obj.status = state["status"]
134 obj.result = state["result"]
135 obj.log = state["log"]
136 obj.start_timestamp = state.get("start_timestamp", None)
137 obj.exec_timestamp = state.get("exec_timestamp", None)
138 obj.end_timestamp = state.get("end_timestamp", None)
139 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140 return obj
141
143 """Serializes this _QueuedOpCode.
144
145 @rtype: dict
146 @return: the dictionary holding the serialized state
147
148 """
149 return {
150 "input": self.input.__getstate__(),
151 "status": self.status,
152 "result": self.result,
153 "log": self.log,
154 "start_timestamp": self.start_timestamp,
155 "exec_timestamp": self.exec_timestamp,
156 "end_timestamp": self.end_timestamp,
157 "priority": self.priority,
158 }
159
162 """In-memory job representation.
163
164 This is what we use to track the user-submitted jobs. Locking must
165 be taken care of by users of this class.
166
167 @type queue: L{JobQueue}
168 @ivar queue: the parent queue
169 @ivar id: the job ID
170 @type ops: list
171 @ivar ops: the list of _QueuedOpCode that constitute the job
172 @type log_serial: int
173 @ivar log_serial: holds the index for the next log entry
174 @ivar received_timestamp: the timestamp for when the job was received
175 @ivar start_timestmap: the timestamp for start of execution
176 @ivar end_timestamp: the timestamp for end of execution
177 @ivar writable: Whether the job is allowed to be modified
178
179 """
180
181 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182 "received_timestamp", "start_timestamp", "end_timestamp",
183 "__weakref__", "processor_lock", "writable"]
184
185 - def __init__(self, queue, job_id, ops, writable):
186 """Constructor for the _QueuedJob.
187
188 @type queue: L{JobQueue}
189 @param queue: our parent queue
190 @type job_id: job_id
191 @param job_id: our job id
192 @type ops: list
193 @param ops: the list of opcodes we hold, which will be encapsulated
194 in _QueuedOpCodes
195 @type writable: bool
196 @param writable: Whether job can be modified
197
198 """
199 if not ops:
200 raise errors.GenericError("A job needs at least one opcode")
201
202 self.queue = queue
203 self.id = job_id
204 self.ops = [_QueuedOpCode(op) for op in ops]
205 self.log_serial = 0
206 self.received_timestamp = TimeStampNow()
207 self.start_timestamp = None
208 self.end_timestamp = None
209
210 self._InitInMemory(self, writable)
211
212 @staticmethod
214 """Initializes in-memory variables.
215
216 """
217 obj.writable = writable
218 obj.ops_iter = None
219 obj.cur_opctx = None
220
221
222 if writable:
223 obj.processor_lock = threading.Lock()
224 else:
225 obj.processor_lock = None
226
228 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
229 "id=%s" % self.id,
230 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
231
232 return "<%s at %#x>" % (" ".join(status), id(self))
233
234 @classmethod
235 - def Restore(cls, queue, state, writable):
236 """Restore a _QueuedJob from serialized state:
237
238 @type queue: L{JobQueue}
239 @param queue: to which queue the restored job belongs
240 @type state: dict
241 @param state: the serialized state
242 @type writable: bool
243 @param writable: Whether job can be modified
244 @rtype: _JobQueue
245 @return: the restored _JobQueue instance
246
247 """
248 obj = _QueuedJob.__new__(cls)
249 obj.queue = queue
250 obj.id = state["id"]
251 obj.received_timestamp = state.get("received_timestamp", None)
252 obj.start_timestamp = state.get("start_timestamp", None)
253 obj.end_timestamp = state.get("end_timestamp", None)
254
255 obj.ops = []
256 obj.log_serial = 0
257 for op_state in state["ops"]:
258 op = _QueuedOpCode.Restore(op_state)
259 for log_entry in op.log:
260 obj.log_serial = max(obj.log_serial, log_entry[0])
261 obj.ops.append(op)
262
263 cls._InitInMemory(obj, writable)
264
265 return obj
266
268 """Serialize the _JobQueue instance.
269
270 @rtype: dict
271 @return: the serialized state
272
273 """
274 return {
275 "id": self.id,
276 "ops": [op.Serialize() for op in self.ops],
277 "start_timestamp": self.start_timestamp,
278 "end_timestamp": self.end_timestamp,
279 "received_timestamp": self.received_timestamp,
280 }
281
334
336 """Gets the current priority for this job.
337
338 Only unfinished opcodes are considered. When all are done, the default
339 priority is used.
340
341 @rtype: int
342
343 """
344 priorities = [op.priority for op in self.ops
345 if op.status not in constants.OPS_FINALIZED]
346
347 if not priorities:
348
349 return constants.OP_PRIO_DEFAULT
350
351 return min(priorities)
352
354 """Selectively returns the log entries.
355
356 @type newer_than: None or int
357 @param newer_than: if this is None, return all log entries,
358 otherwise return only the log entries with serial higher
359 than this value
360 @rtype: list
361 @return: the list of the log entries selected
362
363 """
364 if newer_than is None:
365 serial = -1
366 else:
367 serial = newer_than
368
369 entries = []
370 for op in self.ops:
371 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
372
373 return entries
374
376 """Returns information about a job.
377
378 @type fields: list
379 @param fields: names of fields to return
380 @rtype: list
381 @return: list with one element for each field
382 @raise errors.OpExecError: when an invalid field
383 has been passed
384
385 """
386 row = []
387 for fname in fields:
388 if fname == "id":
389 row.append(self.id)
390 elif fname == "status":
391 row.append(self.CalcStatus())
392 elif fname == "priority":
393 row.append(self.CalcPriority())
394 elif fname == "ops":
395 row.append([op.input.__getstate__() for op in self.ops])
396 elif fname == "opresult":
397 row.append([op.result for op in self.ops])
398 elif fname == "opstatus":
399 row.append([op.status for op in self.ops])
400 elif fname == "oplog":
401 row.append([op.log for op in self.ops])
402 elif fname == "opstart":
403 row.append([op.start_timestamp for op in self.ops])
404 elif fname == "opexec":
405 row.append([op.exec_timestamp for op in self.ops])
406 elif fname == "opend":
407 row.append([op.end_timestamp for op in self.ops])
408 elif fname == "oppriority":
409 row.append([op.priority for op in self.ops])
410 elif fname == "received_ts":
411 row.append(self.received_timestamp)
412 elif fname == "start_ts":
413 row.append(self.start_timestamp)
414 elif fname == "end_ts":
415 row.append(self.end_timestamp)
416 elif fname == "summary":
417 row.append([op.input.Summary() for op in self.ops])
418 else:
419 raise errors.OpExecError("Invalid self query field '%s'" % fname)
420 return row
421
423 """Mark unfinished opcodes with a given status and result.
424
425 This is an utility function for marking all running or waiting to
426 be run opcodes with a given status. Opcodes which are already
427 finalised are not changed.
428
429 @param status: a given opcode status
430 @param result: the opcode result
431
432 """
433 not_marked = True
434 for op in self.ops:
435 if op.status in constants.OPS_FINALIZED:
436 assert not_marked, "Finalized opcodes found after non-finalized ones"
437 continue
438 op.status = status
439 op.result = result
440 not_marked = False
441
443 """Marks the job as finalized.
444
445 """
446 self.end_timestamp = TimeStampNow()
447
472
476 """Initializes this class.
477
478 @type queue: L{JobQueue}
479 @param queue: Job queue
480 @type job: L{_QueuedJob}
481 @param job: Job object
482 @type op: L{_QueuedOpCode}
483 @param op: OpCode
484
485 """
486 assert queue, "Queue is missing"
487 assert job, "Job is missing"
488 assert op, "Opcode is missing"
489
490 self._queue = queue
491 self._job = job
492 self._op = op
493
495 """Raises an exception to cancel the job if asked to.
496
497 """
498
499 if self._op.status == constants.OP_STATUS_CANCELING:
500 logging.debug("Canceling opcode")
501 raise CancelJob()
502
503 @locking.ssynchronized(_QUEUE, shared=1)
505 """Mark the opcode as running, not lock-waiting.
506
507 This is called from the mcpu code as a notifier function, when the LU is
508 finally about to start the Exec() method. Of course, to have end-user
509 visible results, the opcode must be initially (before calling into
510 Processor.ExecOpCode) set to OP_STATUS_WAITING.
511
512 """
513 assert self._op in self._job.ops
514 assert self._op.status in (constants.OP_STATUS_WAITING,
515 constants.OP_STATUS_CANCELING)
516
517
518 self._CheckCancel()
519
520 logging.debug("Opcode is now running")
521
522 self._op.status = constants.OP_STATUS_RUNNING
523 self._op.exec_timestamp = TimeStampNow()
524
525
526 self._queue.UpdateJobUnlocked(self._job)
527
528 @locking.ssynchronized(_QUEUE, shared=1)
530 """Internal feedback append function, with locks
531
532 """
533 self._job.log_serial += 1
534 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
535 self._queue.UpdateJobUnlocked(self._job, replicate=False)
536
553
563
565 """Submits jobs for processing.
566
567 See L{JobQueue.SubmitManyJobs}.
568
569 """
570
571 return self._queue.SubmitManyJobs(jobs)
572
575 - def __init__(self, fields, prev_job_info, prev_log_serial):
576 """Initializes this class.
577
578 @type fields: list of strings
579 @param fields: Fields requested by LUXI client
580 @type prev_job_info: string
581 @param prev_job_info: previous job info, as passed by the LUXI client
582 @type prev_log_serial: string
583 @param prev_log_serial: previous job serial, as passed by the LUXI client
584
585 """
586 self._fields = fields
587 self._prev_job_info = prev_job_info
588 self._prev_log_serial = prev_log_serial
589
626
630 """Initializes this class.
631
632 @type filename: string
633 @param filename: Path to job file
634 @raises errors.InotifyError: if the notifier cannot be setup
635
636 """
637 self._wm = pyinotify.WatchManager()
638 self._inotify_handler = \
639 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
640 self._notifier = \
641 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
642 try:
643 self._inotify_handler.enable()
644 except Exception:
645
646 self._notifier.stop()
647 raise
648
650 """Callback for inotify.
651
652 """
653 if not notifier_enabled:
654 self._inotify_handler.enable()
655
656 - def Wait(self, timeout):
657 """Waits for the job file to change.
658
659 @type timeout: float
660 @param timeout: Timeout in seconds
661 @return: Whether there have been events
662
663 """
664 assert timeout >= 0
665 have_events = self._notifier.check_events(timeout * 1000)
666 if have_events:
667 self._notifier.read_events()
668 self._notifier.process_events()
669 return have_events
670
672 """Closes underlying notifier and its file descriptor.
673
674 """
675 self._notifier.stop()
676
680 """Initializes this class.
681
682 @type filename: string
683 @param filename: Path to job file
684
685 """
686 self._filewaiter = None
687 self._filename = filename
688
689 - def Wait(self, timeout):
690 """Waits for a job to change.
691
692 @type timeout: float
693 @param timeout: Timeout in seconds
694 @return: Whether there have been events
695
696 """
697 if self._filewaiter:
698 return self._filewaiter.Wait(timeout)
699
700
701
702
703
704 self._filewaiter = _JobFileChangesWaiter(self._filename)
705
706 return True
707
709 """Closes underlying waiter.
710
711 """
712 if self._filewaiter:
713 self._filewaiter.Close()
714
717 """Helper class using inotify to wait for changes in a job file.
718
719 This class takes a previous job status and serial, and alerts the client when
720 the current job status has changed.
721
722 """
723 @staticmethod
725 if counter.next() > 0:
726
727
728
729 time.sleep(0.1)
730
731 job = job_load_fn()
732 if not job:
733 raise errors.JobLost()
734
735 result = check_fn(job)
736 if result is None:
737 raise utils.RetryAgain()
738
739 return result
740
741 - def __call__(self, filename, job_load_fn,
742 fields, prev_job_info, prev_log_serial, timeout):
743 """Waits for changes on a job.
744
745 @type filename: string
746 @param filename: File on which to wait for changes
747 @type job_load_fn: callable
748 @param job_load_fn: Function to load job
749 @type fields: list of strings
750 @param fields: Which fields to check for changes
751 @type prev_job_info: list or None
752 @param prev_job_info: Last job information returned
753 @type prev_log_serial: int
754 @param prev_log_serial: Last job message serial number
755 @type timeout: float
756 @param timeout: maximum time to wait in seconds
757
758 """
759 counter = itertools.count()
760 try:
761 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
762 waiter = _JobChangesWaiter(filename)
763 try:
764 return utils.Retry(compat.partial(self._CheckForChanges,
765 counter, job_load_fn, check_fn),
766 utils.RETRY_REMAINING_TIME, timeout,
767 wait_fn=waiter.Wait)
768 finally:
769 waiter.Close()
770 except (errors.InotifyError, errors.JobLost):
771 return None
772 except utils.RetryTimeout:
773 return constants.JOB_NOTCHANGED
774
786
790 """Initializes this class.
791
792 """
793 self._fn = fn
794 self._next = None
795
797 """Gets the next timeout if necessary.
798
799 """
800 if self._next is None:
801 self._next = self._fn()
802
804 """Returns the next timeout.
805
806 """
807 self._Advance()
808 return self._next
809
811 """Returns the current timeout and advances the internal state.
812
813 """
814 self._Advance()
815 result = self._next
816 self._next = None
817 return result
818
819
820 -class _OpExecContext:
821 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
822 """Initializes this class.
823
824 """
825 self.op = op
826 self.index = index
827 self.log_prefix = log_prefix
828 self.summary = op.input.Summary()
829
830
831 if getattr(op.input, opcodes.DEPEND_ATTR, None):
832 self.jobdeps = op.input.depends[:]
833 else:
834 self.jobdeps = None
835
836 self._timeout_strategy_factory = timeout_strategy_factory
837 self._ResetTimeoutStrategy()
838
840 """Creates a new timeout strategy.
841
842 """
843 self._timeout_strategy = \
844 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
845
847 """Checks whether priority can and should be increased.
848
849 Called when locks couldn't be acquired.
850
851 """
852 op = self.op
853
854
855
856 if (self._timeout_strategy.Peek() is None and
857 op.priority > constants.OP_PRIO_HIGHEST):
858 logging.debug("Increasing priority")
859 op.priority -= 1
860 self._ResetTimeoutStrategy()
861 return True
862
863 return False
864
866 """Returns the next lock acquire timeout.
867
868 """
869 return self._timeout_strategy.Next()
870
873 (DEFER,
874 WAITDEP,
875 FINISHED) = range(1, 4)
876
879 """Initializes this class.
880
881 """
882 self.queue = queue
883 self.opexec_fn = opexec_fn
884 self.job = job
885 self._timeout_strategy_factory = _timeout_strategy_factory
886
887 @staticmethod
889 """Locates the next opcode to run.
890
891 @type job: L{_QueuedJob}
892 @param job: Job object
893 @param timeout_strategy_factory: Callable to create new timeout strategy
894
895 """
896
897
898
899
900 if job.ops_iter is None:
901 job.ops_iter = enumerate(job.ops)
902
903
904 while True:
905 try:
906 (idx, op) = job.ops_iter.next()
907 except StopIteration:
908 raise errors.ProgrammerError("Called for a finished job")
909
910 if op.status == constants.OP_STATUS_RUNNING:
911
912 raise errors.ProgrammerError("Called for job marked as running")
913
914 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
915 timeout_strategy_factory)
916
917 if op.status not in constants.OPS_FINALIZED:
918 return opctx
919
920
921
922
923
924 logging.info("%s: opcode %s already processed, skipping",
925 opctx.log_prefix, opctx.summary)
926
927 @staticmethod
929 """Marks an opcode as waiting for locks.
930
931 The job's start timestamp is also set if necessary.
932
933 @type job: L{_QueuedJob}
934 @param job: Job object
935 @type op: L{_QueuedOpCode}
936 @param op: Opcode object
937
938 """
939 assert op in job.ops
940 assert op.status in (constants.OP_STATUS_QUEUED,
941 constants.OP_STATUS_WAITING)
942
943 update = False
944
945 op.result = None
946
947 if op.status == constants.OP_STATUS_QUEUED:
948 op.status = constants.OP_STATUS_WAITING
949 update = True
950
951 if op.start_timestamp is None:
952 op.start_timestamp = TimeStampNow()
953 update = True
954
955 if job.start_timestamp is None:
956 job.start_timestamp = op.start_timestamp
957 update = True
958
959 assert op.status == constants.OP_STATUS_WAITING
960
961 return update
962
963 @staticmethod
965 """Checks if an opcode has dependencies and if so, processes them.
966
967 @type queue: L{JobQueue}
968 @param queue: Queue object
969 @type job: L{_QueuedJob}
970 @param job: Job object
971 @type opctx: L{_OpExecContext}
972 @param opctx: Opcode execution context
973 @rtype: bool
974 @return: Whether opcode will be re-scheduled by dependency tracker
975
976 """
977 op = opctx.op
978
979 result = False
980
981 while opctx.jobdeps:
982 (dep_job_id, dep_status) = opctx.jobdeps[0]
983
984 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
985 dep_status)
986 assert ht.TNonEmptyString(depmsg), "No dependency message"
987
988 logging.info("%s: %s", opctx.log_prefix, depmsg)
989
990 if depresult == _JobDependencyManager.CONTINUE:
991
992 opctx.jobdeps.pop(0)
993
994 elif depresult == _JobDependencyManager.WAIT:
995
996
997 result = True
998 break
999
1000 elif depresult == _JobDependencyManager.CANCEL:
1001
1002 job.Cancel()
1003 assert op.status == constants.OP_STATUS_CANCELING
1004 break
1005
1006 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1007 _JobDependencyManager.ERROR):
1008
1009 op.status = constants.OP_STATUS_ERROR
1010 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1011 break
1012
1013 else:
1014 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1015 depresult)
1016
1017 return result
1018
1020 """Processes one opcode and returns the result.
1021
1022 """
1023 op = opctx.op
1024
1025 assert op.status == constants.OP_STATUS_WAITING
1026
1027 timeout = opctx.GetNextLockTimeout()
1028
1029 try:
1030
1031 result = self.opexec_fn(op.input,
1032 _OpExecCallbacks(self.queue, self.job, op),
1033 timeout=timeout, priority=op.priority)
1034 except mcpu.LockAcquireTimeout:
1035 assert timeout is not None, "Received timeout for blocking acquire"
1036 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1037
1038 assert op.status in (constants.OP_STATUS_WAITING,
1039 constants.OP_STATUS_CANCELING)
1040
1041
1042 if op.status == constants.OP_STATUS_CANCELING:
1043 return (constants.OP_STATUS_CANCELING, None)
1044
1045
1046 return (constants.OP_STATUS_WAITING, None)
1047 except CancelJob:
1048 logging.exception("%s: Canceling job", opctx.log_prefix)
1049 assert op.status == constants.OP_STATUS_CANCELING
1050 return (constants.OP_STATUS_CANCELING, None)
1051 except Exception, err:
1052 logging.exception("%s: Caught exception in %s",
1053 opctx.log_prefix, opctx.summary)
1054 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1055 else:
1056 logging.debug("%s: %s successful",
1057 opctx.log_prefix, opctx.summary)
1058 return (constants.OP_STATUS_SUCCESS, result)
1059
1061 """Continues execution of a job.
1062
1063 @param _nextop_fn: Callback function for tests
1064 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1065 be deferred and C{WAITDEP} if the dependency manager
1066 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1067
1068 """
1069 queue = self.queue
1070 job = self.job
1071
1072 logging.debug("Processing job %s", job.id)
1073
1074 queue.acquire(shared=1)
1075 try:
1076 opcount = len(job.ops)
1077
1078 assert job.writable, "Expected writable job"
1079
1080
1081 if job.CalcStatus() in constants.JOBS_FINALIZED:
1082 return self.FINISHED
1083
1084
1085 if job.cur_opctx:
1086 opctx = job.cur_opctx
1087 job.cur_opctx = None
1088 else:
1089 if __debug__ and _nextop_fn:
1090 _nextop_fn()
1091 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1092
1093 op = opctx.op
1094
1095
1096 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1097 constants.OP_STATUS_CANCELING)
1098 for i in job.ops[opctx.index + 1:])
1099
1100 assert op.status in (constants.OP_STATUS_QUEUED,
1101 constants.OP_STATUS_WAITING,
1102 constants.OP_STATUS_CANCELING)
1103
1104 assert (op.priority <= constants.OP_PRIO_LOWEST and
1105 op.priority >= constants.OP_PRIO_HIGHEST)
1106
1107 waitjob = None
1108
1109 if op.status != constants.OP_STATUS_CANCELING:
1110 assert op.status in (constants.OP_STATUS_QUEUED,
1111 constants.OP_STATUS_WAITING)
1112
1113
1114 if self._MarkWaitlock(job, op):
1115
1116 queue.UpdateJobUnlocked(job)
1117
1118 assert op.status == constants.OP_STATUS_WAITING
1119 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1120 assert job.start_timestamp and op.start_timestamp
1121 assert waitjob is None
1122
1123
1124 waitjob = self._CheckDependencies(queue, job, opctx)
1125
1126 assert op.status in (constants.OP_STATUS_WAITING,
1127 constants.OP_STATUS_CANCELING,
1128 constants.OP_STATUS_ERROR)
1129
1130 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1131 constants.OP_STATUS_ERROR)):
1132 logging.info("%s: opcode %s waiting for locks",
1133 opctx.log_prefix, opctx.summary)
1134
1135 assert not opctx.jobdeps, "Not all dependencies were removed"
1136
1137 queue.release()
1138 try:
1139 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1140 finally:
1141 queue.acquire(shared=1)
1142
1143 op.status = op_status
1144 op.result = op_result
1145
1146 assert not waitjob
1147
1148 if op.status == constants.OP_STATUS_WAITING:
1149
1150 assert not op.end_timestamp
1151 else:
1152
1153 op.end_timestamp = TimeStampNow()
1154
1155 if op.status == constants.OP_STATUS_CANCELING:
1156 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1157 for i in job.ops[opctx.index:])
1158 else:
1159 assert op.status in constants.OPS_FINALIZED
1160
1161 if op.status == constants.OP_STATUS_WAITING or waitjob:
1162 finalize = False
1163
1164 if not waitjob and opctx.CheckPriorityIncrease():
1165
1166 queue.UpdateJobUnlocked(job)
1167
1168
1169 job.cur_opctx = opctx
1170
1171 assert (op.priority <= constants.OP_PRIO_LOWEST and
1172 op.priority >= constants.OP_PRIO_HIGHEST)
1173
1174
1175 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1176
1177 else:
1178
1179 assert (opctx.index == 0 or
1180 compat.all(i.status == constants.OP_STATUS_SUCCESS
1181 for i in job.ops[:opctx.index]))
1182
1183
1184 job.cur_opctx = None
1185
1186 if op.status == constants.OP_STATUS_SUCCESS:
1187 finalize = False
1188
1189 elif op.status == constants.OP_STATUS_ERROR:
1190
1191 assert errors.GetEncodedError(job.ops[opctx.index].result)
1192
1193 to_encode = errors.OpExecError("Preceding opcode failed")
1194 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1195 _EncodeOpError(to_encode))
1196 finalize = True
1197
1198
1199 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1200 errors.GetEncodedError(i.result)
1201 for i in job.ops[opctx.index:])
1202
1203 elif op.status == constants.OP_STATUS_CANCELING:
1204 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1205 "Job canceled by request")
1206 finalize = True
1207
1208 else:
1209 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1210
1211 if opctx.index == (opcount - 1):
1212
1213 finalize = True
1214
1215 if finalize:
1216
1217 job.Finalize()
1218
1219
1220
1221 queue.UpdateJobUnlocked(job)
1222
1223 assert not waitjob
1224
1225 if finalize:
1226 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1227 return self.FINISHED
1228
1229 assert not waitjob or queue.depmgr.JobWaiting(job)
1230
1231 if waitjob:
1232 return self.WAITDEP
1233 else:
1234 return self.DEFER
1235 finally:
1236 assert job.writable, "Job became read-only while being processed"
1237 queue.release()
1238
1241 """The actual job workers.
1242
1243 """
1245 """Job executor.
1246
1247 @type job: L{_QueuedJob}
1248 @param job: the job to be processed
1249
1250 """
1251 assert job.writable, "Expected writable job"
1252
1253
1254
1255
1256 job.processor_lock.acquire()
1257 try:
1258 return self._RunTaskInner(job)
1259 finally:
1260 job.processor_lock.release()
1261
1297
1298 @staticmethod
1300 """Updates the worker thread name to include a short summary of the opcode.
1301
1302 @param setname_fn: Callable setting worker thread name
1303 @param execop_fn: Callable for executing opcode (usually
1304 L{mcpu.Processor.ExecOpCode})
1305
1306 """
1307 setname_fn(op)
1308 try:
1309 return execop_fn(op, *args, **kwargs)
1310 finally:
1311 setname_fn(None)
1312
1313 @staticmethod
1315 """Sets the worker thread name.
1316
1317 @type job: L{_QueuedJob}
1318 @type op: L{opcodes.OpCode}
1319
1320 """
1321 parts = ["Job%s" % job.id]
1322
1323 if op:
1324 parts.append(op.TinySummary())
1325
1326 return "/".join(parts)
1327
1330 """Simple class implementing a job-processing workerpool.
1331
1332 """
1338
1341 """Keeps track of job dependencies.
1342
1343 """
1344 (WAIT,
1345 ERROR,
1346 CANCEL,
1347 CONTINUE,
1348 WRONGSTATUS) = range(1, 6)
1349
1350 - def __init__(self, getstatus_fn, enqueue_fn):
1351 """Initializes this class.
1352
1353 """
1354 self._getstatus_fn = getstatus_fn
1355 self._enqueue_fn = enqueue_fn
1356
1357 self._waiters = {}
1358 self._lock = locking.SharedLock("JobDepMgr")
1359
1360 @locking.ssynchronized(_LOCK, shared=1)
1362 """Retrieves information about waiting jobs.
1363
1364 @type requested: set
1365 @param requested: Requested information, see C{query.LQ_*}
1366
1367 """
1368
1369
1370
1371 return [("job/%s" % job_id, None, None,
1372 [("job", [job.id for job in waiters])])
1373 for job_id, waiters in self._waiters.items()
1374 if waiters]
1375
1376 @locking.ssynchronized(_LOCK, shared=1)
1378 """Checks if a job is waiting.
1379
1380 """
1381 return compat.any(job in jobs
1382 for jobs in self._waiters.values())
1383
1384 @locking.ssynchronized(_LOCK)
1386 """Checks if a dependency job has the requested status.
1387
1388 If the other job is not yet in a finalized status, the calling job will be
1389 notified (re-added to the workerpool) at a later point.
1390
1391 @type job: L{_QueuedJob}
1392 @param job: Job object
1393 @type dep_job_id: string
1394 @param dep_job_id: ID of dependency job
1395 @type dep_status: list
1396 @param dep_status: Required status
1397
1398 """
1399 assert ht.TString(job.id)
1400 assert ht.TString(dep_job_id)
1401 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1402
1403 if job.id == dep_job_id:
1404 return (self.ERROR, "Job can't depend on itself")
1405
1406
1407 try:
1408 status = self._getstatus_fn(dep_job_id)
1409 except errors.JobLost, err:
1410 return (self.ERROR, "Dependency error: %s" % err)
1411
1412 assert status in constants.JOB_STATUS_ALL
1413
1414 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1415
1416 if status not in constants.JOBS_FINALIZED:
1417
1418 job_id_waiters.add(job)
1419 return (self.WAIT,
1420 "Need to wait for job %s, wanted status '%s'" %
1421 (dep_job_id, dep_status))
1422
1423
1424 if job in job_id_waiters:
1425 job_id_waiters.remove(job)
1426
1427 if (status == constants.JOB_STATUS_CANCELED and
1428 constants.JOB_STATUS_CANCELED not in dep_status):
1429 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1430
1431 elif not dep_status or status in dep_status:
1432 return (self.CONTINUE,
1433 "Dependency job %s finished with status '%s'" %
1434 (dep_job_id, status))
1435
1436 else:
1437 return (self.WRONGSTATUS,
1438 "Dependency job %s finished with status '%s',"
1439 " not one of '%s' as required" %
1440 (dep_job_id, status, utils.CommaJoin(dep_status)))
1441
1443 """Remove all jobs without actual waiters.
1444
1445 """
1446 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1447 if not waiters]:
1448 del self._waiters[job_id]
1449
1451 """Notifies all jobs waiting for a certain job ID.
1452
1453 @attention: Do not call until L{CheckAndRegister} returned a status other
1454 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1455 @type job_id: string
1456 @param job_id: Job ID
1457
1458 """
1459 assert ht.TString(job_id)
1460
1461 self._lock.acquire()
1462 try:
1463 self._RemoveEmptyWaitersUnlocked()
1464
1465 jobs = self._waiters.pop(job_id, None)
1466 finally:
1467 self._lock.release()
1468
1469 if jobs:
1470
1471 logging.debug("Re-adding %s jobs which were waiting for job %s",
1472 len(jobs), job_id)
1473 self._enqueue_fn(jobs)
1474
1477 """Decorator for "public" functions.
1478
1479 This function should be used for all 'public' functions. That is,
1480 functions usually called from other classes. Note that this should
1481 be applied only to methods (not plain functions), since it expects
1482 that the decorated function is called with a first argument that has
1483 a '_queue_filelock' argument.
1484
1485 @warning: Use this decorator only after locking.ssynchronized
1486
1487 Example::
1488 @locking.ssynchronized(_LOCK)
1489 @_RequireOpenQueue
1490 def Example(self):
1491 pass
1492
1493 """
1494 def wrapper(self, *args, **kwargs):
1495
1496 assert self._queue_filelock is not None, "Queue should be open"
1497 return fn(self, *args, **kwargs)
1498 return wrapper
1499
1502 """Queue used to manage the jobs.
1503
1504 """
1506 """Constructor for JobQueue.
1507
1508 The constructor will initialize the job queue object and then
1509 start loading the current jobs from disk, either for starting them
1510 (if they were queue) or for aborting them (if they were already
1511 running).
1512
1513 @type context: GanetiContext
1514 @param context: the context object for access to the configuration
1515 data and other ganeti objects
1516
1517 """
1518 self.context = context
1519 self._memcache = weakref.WeakValueDictionary()
1520 self._my_hostname = netutils.Hostname.GetSysName()
1521
1522
1523
1524
1525
1526
1527 self._lock = locking.SharedLock("JobQueue")
1528
1529 self.acquire = self._lock.acquire
1530 self.release = self._lock.release
1531
1532
1533
1534 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1535
1536
1537 self._last_serial = jstore.ReadSerial()
1538 assert self._last_serial is not None, ("Serial file was modified between"
1539 " check in jstore and here")
1540
1541
1542 self._nodes = dict((n.name, n.primary_ip)
1543 for n in self.context.cfg.GetAllNodesInfo().values()
1544 if n.master_candidate)
1545
1546
1547 self._nodes.pop(self._my_hostname, None)
1548
1549
1550
1551 self._queue_size = 0
1552 self._UpdateQueueSizeUnlocked()
1553 self._drained = jstore.CheckDrainFlag()
1554
1555
1556 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1557 self._EnqueueJobs)
1558 self.context.glm.AddToLockMonitor(self.depmgr)
1559
1560
1561 self._wpool = _JobQueueWorkerPool(self)
1562 try:
1563 self._InspectQueue()
1564 except:
1565 self._wpool.TerminateWorkers()
1566 raise
1567
1568 @locking.ssynchronized(_LOCK)
1569 @_RequireOpenQueue
1571 """Loads the whole job queue and resumes unfinished jobs.
1572
1573 This function needs the lock here because WorkerPool.AddTask() may start a
1574 job while we're still doing our work.
1575
1576 """
1577 logging.info("Inspecting job queue")
1578
1579 restartjobs = []
1580
1581 all_job_ids = self._GetJobIDsUnlocked()
1582 jobs_count = len(all_job_ids)
1583 lastinfo = time.time()
1584 for idx, job_id in enumerate(all_job_ids):
1585
1586 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1587 idx == (jobs_count - 1)):
1588 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1589 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1590 lastinfo = time.time()
1591
1592 job = self._LoadJobUnlocked(job_id)
1593
1594
1595 if job is None:
1596 continue
1597
1598 status = job.CalcStatus()
1599
1600 if status == constants.JOB_STATUS_QUEUED:
1601 restartjobs.append(job)
1602
1603 elif status in (constants.JOB_STATUS_RUNNING,
1604 constants.JOB_STATUS_WAITING,
1605 constants.JOB_STATUS_CANCELING):
1606 logging.warning("Unfinished job %s found: %s", job.id, job)
1607
1608 if status == constants.JOB_STATUS_WAITING:
1609
1610 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1611 restartjobs.append(job)
1612 else:
1613 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1614 "Unclean master daemon shutdown")
1615 job.Finalize()
1616
1617 self.UpdateJobUnlocked(job)
1618
1619 if restartjobs:
1620 logging.info("Restarting %s jobs", len(restartjobs))
1621 self._EnqueueJobsUnlocked(restartjobs)
1622
1623 logging.info("Job queue inspection finished")
1624
1625 @locking.ssynchronized(_LOCK)
1626 @_RequireOpenQueue
1628 """Register a new node with the queue.
1629
1630 @type node: L{objects.Node}
1631 @param node: the node object to be added
1632
1633 """
1634 node_name = node.name
1635 assert node_name != self._my_hostname
1636
1637
1638 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1639 msg = result.fail_msg
1640 if msg:
1641 logging.warning("Cannot cleanup queue directory on node %s: %s",
1642 node_name, msg)
1643
1644 if not node.master_candidate:
1645
1646 self._nodes.pop(node_name, None)
1647
1648 return
1649
1650
1651 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1652
1653
1654 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1655
1656 for file_name in files:
1657
1658 content = utils.ReadFile(file_name)
1659
1660 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1661 [node.primary_ip],
1662 file_name, content)
1663 msg = result[node_name].fail_msg
1664 if msg:
1665 logging.error("Failed to upload file %s to node %s: %s",
1666 file_name, node_name, msg)
1667
1668 self._nodes[node_name] = node.primary_ip
1669
1670 @locking.ssynchronized(_LOCK)
1671 @_RequireOpenQueue
1673 """Callback called when removing nodes from the cluster.
1674
1675 @type node_name: str
1676 @param node_name: the name of the node to remove
1677
1678 """
1679 self._nodes.pop(node_name, None)
1680
1681 @staticmethod
1683 """Verifies the status of an RPC call.
1684
1685 Since we aim to keep consistency should this node (the current
1686 master) fail, we will log errors if our rpc fail, and especially
1687 log the case when more than half of the nodes fails.
1688
1689 @param result: the data as returned from the rpc call
1690 @type nodes: list
1691 @param nodes: the list of nodes we made the call to
1692 @type failmsg: str
1693 @param failmsg: the identifier to be used for logging
1694
1695 """
1696 failed = []
1697 success = []
1698
1699 for node in nodes:
1700 msg = result[node].fail_msg
1701 if msg:
1702 failed.append(node)
1703 logging.error("RPC call %s (%s) failed on node %s: %s",
1704 result[node].call, failmsg, node, msg)
1705 else:
1706 success.append(node)
1707
1708
1709 if (len(success) + 1) < len(failed):
1710
1711 logging.error("More than half of the nodes failed")
1712
1714 """Helper for returning the node name/ip list.
1715
1716 @rtype: (list, list)
1717 @return: a tuple of two lists, the first one with the node
1718 names and the second one with the node addresses
1719
1720 """
1721
1722 name_list = self._nodes.keys()
1723 addr_list = [self._nodes[name] for name in name_list]
1724 return name_list, addr_list
1725
1727 """Writes a file locally and then replicates it to all nodes.
1728
1729 This function will replace the contents of a file on the local
1730 node and then replicate it to all the other nodes we have.
1731
1732 @type file_name: str
1733 @param file_name: the path of the file to be replicated
1734 @type data: str
1735 @param data: the new contents of the file
1736 @type replicate: boolean
1737 @param replicate: whether to spread the changes to the remote nodes
1738
1739 """
1740 getents = runtime.GetEnts()
1741 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1742 gid=getents.masterd_gid)
1743
1744 if replicate:
1745 names, addrs = self._GetNodeIp()
1746 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1747 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1748
1750 """Renames a file locally and then replicate the change.
1751
1752 This function will rename a file in the local queue directory
1753 and then replicate this rename to all the other nodes we have.
1754
1755 @type rename: list of (old, new)
1756 @param rename: List containing tuples mapping old to new names
1757
1758 """
1759
1760 for old, new in rename:
1761 utils.RenameFile(old, new, mkdir=True)
1762
1763
1764 names, addrs = self._GetNodeIp()
1765 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1766 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1767
1768 @staticmethod
1788
1789 @classmethod
1791 """Returns the archive directory for a job.
1792
1793 @type job_id: str
1794 @param job_id: Job identifier
1795 @rtype: str
1796 @return: Directory name
1797
1798 """
1799 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1800
1802 """Generates a new job identifier.
1803
1804 Job identifiers are unique during the lifetime of a cluster.
1805
1806 @type count: integer
1807 @param count: how many serials to return
1808 @rtype: str
1809 @return: a string representing the job identifier.
1810
1811 """
1812 assert ht.TPositiveInt(count)
1813
1814
1815 serial = self._last_serial + count
1816
1817
1818 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1819 "%s\n" % serial, True)
1820
1821 result = [self._FormatJobID(v)
1822 for v in range(self._last_serial + 1, serial + 1)]
1823
1824
1825 self._last_serial = serial
1826
1827 assert len(result) == count
1828
1829 return result
1830
1831 @staticmethod
1833 """Returns the job file for a given job id.
1834
1835 @type job_id: str
1836 @param job_id: the job identifier
1837 @rtype: str
1838 @return: the path to the job file
1839
1840 """
1841 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1842
1843 @classmethod
1845 """Returns the archived job file for a give job id.
1846
1847 @type job_id: str
1848 @param job_id: the job identifier
1849 @rtype: str
1850 @return: the path to the archived job file
1851
1852 """
1853 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1854 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1855
1856 @staticmethod
1858 """Return all known job IDs.
1859
1860 The method only looks at disk because it's a requirement that all
1861 jobs are present on disk (so in the _memcache we don't have any
1862 extra IDs).
1863
1864 @type sort: boolean
1865 @param sort: perform sorting on the returned job ids
1866 @rtype: list
1867 @return: the list of job IDs
1868
1869 """
1870 jlist = []
1871 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1872 m = constants.JOB_FILE_RE.match(filename)
1873 if m:
1874 jlist.append(m.group(1))
1875 if sort:
1876 jlist = utils.NiceSort(jlist)
1877 return jlist
1878
1880 """Loads a job from the disk or memory.
1881
1882 Given a job id, this will return the cached job object if
1883 existing, or try to load the job from the disk. If loading from
1884 disk, it will also add the job to the cache.
1885
1886 @param job_id: the job id
1887 @rtype: L{_QueuedJob} or None
1888 @return: either None or the job object
1889
1890 """
1891 job = self._memcache.get(job_id, None)
1892 if job:
1893 logging.debug("Found job %s in memcache", job_id)
1894 assert job.writable, "Found read-only job in memcache"
1895 return job
1896
1897 try:
1898 job = self._LoadJobFromDisk(job_id, False)
1899 if job is None:
1900 return job
1901 except errors.JobFileCorrupted:
1902 old_path = self._GetJobPath(job_id)
1903 new_path = self._GetArchivedJobPath(job_id)
1904 if old_path == new_path:
1905
1906 logging.exception("Can't parse job %s", job_id)
1907 else:
1908
1909 logging.exception("Can't parse job %s, will archive.", job_id)
1910 self._RenameFilesUnlocked([(old_path, new_path)])
1911 return None
1912
1913 assert job.writable, "Job just loaded is not writable"
1914
1915 self._memcache[job_id] = job
1916 logging.debug("Added job %s to the cache", job_id)
1917 return job
1918
1920 """Load the given job file from disk.
1921
1922 Given a job file, read, load and restore it in a _QueuedJob format.
1923
1924 @type job_id: string
1925 @param job_id: job identifier
1926 @type try_archived: bool
1927 @param try_archived: Whether to try loading an archived job
1928 @rtype: L{_QueuedJob} or None
1929 @return: either None or the job object
1930
1931 """
1932 path_functions = [(self._GetJobPath, True)]
1933
1934 if try_archived:
1935 path_functions.append((self._GetArchivedJobPath, False))
1936
1937 raw_data = None
1938 writable_default = None
1939
1940 for (fn, writable_default) in path_functions:
1941 filepath = fn(job_id)
1942 logging.debug("Loading job from %s", filepath)
1943 try:
1944 raw_data = utils.ReadFile(filepath)
1945 except EnvironmentError, err:
1946 if err.errno != errno.ENOENT:
1947 raise
1948 else:
1949 break
1950
1951 if not raw_data:
1952 return None
1953
1954 if writable is None:
1955 writable = writable_default
1956
1957 try:
1958 data = serializer.LoadJson(raw_data)
1959 job = _QueuedJob.Restore(self, data, writable)
1960 except Exception, err:
1961 raise errors.JobFileCorrupted(err)
1962
1963 return job
1964
1966 """Load the given job file from disk.
1967
1968 Given a job file, read, load and restore it in a _QueuedJob format.
1969 In case of error reading the job, it gets returned as None, and the
1970 exception is logged.
1971
1972 @type job_id: string
1973 @param job_id: job identifier
1974 @type try_archived: bool
1975 @param try_archived: Whether to try loading an archived job
1976 @rtype: L{_QueuedJob} or None
1977 @return: either None or the job object
1978
1979 """
1980 try:
1981 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1982 except (errors.JobFileCorrupted, EnvironmentError):
1983 logging.exception("Can't load/parse job %s", job_id)
1984 return None
1985
1987 """Update the queue size.
1988
1989 """
1990 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1991
1992 @locking.ssynchronized(_LOCK)
1993 @_RequireOpenQueue
1995 """Sets the drain flag for the queue.
1996
1997 @type drain_flag: boolean
1998 @param drain_flag: Whether to set or unset the drain flag
1999
2000 """
2001 jstore.SetDrainFlag(drain_flag)
2002
2003 self._drained = drain_flag
2004
2005 return True
2006
2007 @_RequireOpenQueue
2009 """Create and store a new job.
2010
2011 This enters the job into our job queue and also puts it on the new
2012 queue, in order for it to be picked up by the queue processors.
2013
2014 @type job_id: job ID
2015 @param job_id: the job ID for the new job
2016 @type ops: list
2017 @param ops: The list of OpCodes that will become the new job.
2018 @rtype: L{_QueuedJob}
2019 @return: the job object to be queued
2020 @raise errors.JobQueueDrainError: if the job queue is marked for draining
2021 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2022 @raise errors.GenericError: If an opcode is not valid
2023
2024 """
2025
2026
2027 if self._drained:
2028 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
2029
2030 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2031 raise errors.JobQueueFull()
2032
2033 job = _QueuedJob(self, job_id, ops, True)
2034
2035
2036 for idx, op in enumerate(job.ops):
2037 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2038 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2039 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2040 " are %s" % (idx, op.priority, allowed))
2041
2042 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2043 if not opcodes.TNoRelativeJobDependencies(dependencies):
2044 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2045 " match %s: %s" %
2046 (idx, opcodes.TNoRelativeJobDependencies,
2047 dependencies))
2048
2049
2050 self.UpdateJobUnlocked(job)
2051
2052 self._queue_size += 1
2053
2054 logging.debug("Adding new job %s to the cache", job_id)
2055 self._memcache[job_id] = job
2056
2057 return job
2058
2059 @locking.ssynchronized(_LOCK)
2060 @_RequireOpenQueue
2070
2071 @locking.ssynchronized(_LOCK)
2072 @_RequireOpenQueue
2074 """Create and store multiple jobs.
2075
2076 @see: L{_SubmitJobUnlocked}
2077
2078 """
2079 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2080
2081 (results, added_jobs) = \
2082 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2083
2084 self._EnqueueJobsUnlocked(added_jobs)
2085
2086 return results
2087
2088 @staticmethod
2095
2096 @staticmethod
2098 """Resolves relative job IDs in dependencies.
2099
2100 @type resolve_fn: callable
2101 @param resolve_fn: Function to resolve a relative job ID
2102 @type deps: list
2103 @param deps: Dependencies
2104 @rtype: list
2105 @return: Resolved dependencies
2106
2107 """
2108 result = []
2109
2110 for (dep_job_id, dep_status) in deps:
2111 if ht.TRelativeJobId(dep_job_id):
2112 assert ht.TInt(dep_job_id) and dep_job_id < 0
2113 try:
2114 job_id = resolve_fn(dep_job_id)
2115 except IndexError:
2116
2117 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2118 else:
2119 job_id = dep_job_id
2120
2121 result.append((job_id, dep_status))
2122
2123 return (True, result)
2124
2126 """Create and store multiple jobs.
2127
2128 @see: L{_SubmitJobUnlocked}
2129
2130 """
2131 results = []
2132 added_jobs = []
2133
2134 def resolve_fn(job_idx, reljobid):
2135 assert reljobid < 0
2136 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2137
2138 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2139 for op in ops:
2140 if getattr(op, opcodes.DEPEND_ATTR, None):
2141 (status, data) = \
2142 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2143 op.depends)
2144 if not status:
2145
2146 assert ht.TNonEmptyString(data), "No error message"
2147 break
2148
2149 op.depends = data
2150 else:
2151 try:
2152 job = self._SubmitJobUnlocked(job_id, ops)
2153 except errors.GenericError, err:
2154 status = False
2155 data = self._FormatSubmitError(str(err), ops)
2156 else:
2157 status = True
2158 data = job_id
2159 added_jobs.append(job)
2160
2161 results.append((status, data))
2162
2163 return (results, added_jobs)
2164
2165 @locking.ssynchronized(_LOCK)
2167 """Helper function to add jobs to worker pool's queue.
2168
2169 @type jobs: list
2170 @param jobs: List of all jobs
2171
2172 """
2173 return self._EnqueueJobsUnlocked(jobs)
2174
2176 """Helper function to add jobs to worker pool's queue.
2177
2178 @type jobs: list
2179 @param jobs: List of all jobs
2180
2181 """
2182 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2183 self._wpool.AddManyTasks([(job, ) for job in jobs],
2184 priority=[job.CalcPriority() for job in jobs])
2185
2187 """Gets the status of a job for dependencies.
2188
2189 @type job_id: string
2190 @param job_id: Job ID
2191 @raise errors.JobLost: If job can't be found
2192
2193 """
2194 if not isinstance(job_id, basestring):
2195 job_id = self._FormatJobID(job_id)
2196
2197
2198
2199
2200 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2201
2202 assert not job.writable, "Got writable job"
2203
2204 if job:
2205 return job.CalcStatus()
2206
2207 raise errors.JobLost("Job %s not found" % job_id)
2208
2209 @_RequireOpenQueue
2211 """Update a job's on disk storage.
2212
2213 After a job has been modified, this function needs to be called in
2214 order to write the changes to disk and replicate them to the other
2215 nodes.
2216
2217 @type job: L{_QueuedJob}
2218 @param job: the changed job
2219 @type replicate: boolean
2220 @param replicate: whether to replicate the change to remote nodes
2221
2222 """
2223 if __debug__:
2224 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2225 assert (finalized ^ (job.end_timestamp is None))
2226 assert job.writable, "Can't update read-only job"
2227
2228 filename = self._GetJobPath(job.id)
2229 data = serializer.DumpJson(job.Serialize(), indent=False)
2230 logging.debug("Writing job %s to %s", job.id, filename)
2231 self._UpdateJobQueueFile(filename, data, replicate)
2232
2233 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2234 timeout):
2235 """Waits for changes in a job.
2236
2237 @type job_id: string
2238 @param job_id: Job identifier
2239 @type fields: list of strings
2240 @param fields: Which fields to check for changes
2241 @type prev_job_info: list or None
2242 @param prev_job_info: Last job information returned
2243 @type prev_log_serial: int
2244 @param prev_log_serial: Last job message serial number
2245 @type timeout: float
2246 @param timeout: maximum time to wait in seconds
2247 @rtype: tuple (job info, log entries)
2248 @return: a tuple of the job information as required via
2249 the fields parameter, and the log entries as a list
2250
2251 if the job has not changed and the timeout has expired,
2252 we instead return a special value,
2253 L{constants.JOB_NOTCHANGED}, which should be interpreted
2254 as such by the clients
2255
2256 """
2257 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2258 writable=False)
2259
2260 helper = _WaitForJobChangesHelper()
2261
2262 return helper(self._GetJobPath(job_id), load_fn,
2263 fields, prev_job_info, prev_log_serial, timeout)
2264
2265 @locking.ssynchronized(_LOCK)
2266 @_RequireOpenQueue
2268 """Cancels a job.
2269
2270 This will only succeed if the job has not started yet.
2271
2272 @type job_id: string
2273 @param job_id: job ID of job to be cancelled.
2274
2275 """
2276 logging.info("Cancelling job %s", job_id)
2277
2278 job = self._LoadJobUnlocked(job_id)
2279 if not job:
2280 logging.debug("Job %s not found", job_id)
2281 return (False, "Job %s not found" % job_id)
2282
2283 assert job.writable, "Can't cancel read-only job"
2284
2285 (success, msg) = job.Cancel()
2286
2287 if success:
2288
2289
2290 self.UpdateJobUnlocked(job)
2291
2292 return (success, msg)
2293
2294 @_RequireOpenQueue
2296 """Archives jobs.
2297
2298 @type jobs: list of L{_QueuedJob}
2299 @param jobs: Job objects
2300 @rtype: int
2301 @return: Number of archived jobs
2302
2303 """
2304 archive_jobs = []
2305 rename_files = []
2306 for job in jobs:
2307 assert job.writable, "Can't archive read-only job"
2308
2309 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2310 logging.debug("Job %s is not yet done", job.id)
2311 continue
2312
2313 archive_jobs.append(job)
2314
2315 old = self._GetJobPath(job.id)
2316 new = self._GetArchivedJobPath(job.id)
2317 rename_files.append((old, new))
2318
2319
2320 self._RenameFilesUnlocked(rename_files)
2321
2322 logging.debug("Successfully archived job(s) %s",
2323 utils.CommaJoin(job.id for job in archive_jobs))
2324
2325
2326
2327
2328
2329 self._UpdateQueueSizeUnlocked()
2330 return len(archive_jobs)
2331
2332 @locking.ssynchronized(_LOCK)
2333 @_RequireOpenQueue
2335 """Archives a job.
2336
2337 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2338
2339 @type job_id: string
2340 @param job_id: Job ID of job to be archived.
2341 @rtype: bool
2342 @return: Whether job was archived
2343
2344 """
2345 logging.info("Archiving job %s", job_id)
2346
2347 job = self._LoadJobUnlocked(job_id)
2348 if not job:
2349 logging.debug("Job %s not found", job_id)
2350 return False
2351
2352 return self._ArchiveJobsUnlocked([job]) == 1
2353
2354 @locking.ssynchronized(_LOCK)
2355 @_RequireOpenQueue
2357 """Archives all jobs based on age.
2358
2359 The method will archive all jobs which are older than the age
2360 parameter. For jobs that don't have an end timestamp, the start
2361 timestamp will be considered. The special '-1' age will cause
2362 archival of all jobs (that are not running or queued).
2363
2364 @type age: int
2365 @param age: the minimum age in seconds
2366
2367 """
2368 logging.info("Archiving jobs with age more than %s seconds", age)
2369
2370 now = time.time()
2371 end_time = now + timeout
2372 archived_count = 0
2373 last_touched = 0
2374
2375 all_job_ids = self._GetJobIDsUnlocked()
2376 pending = []
2377 for idx, job_id in enumerate(all_job_ids):
2378 last_touched = idx + 1
2379
2380
2381
2382
2383 if time.time() > end_time:
2384 break
2385
2386
2387 job = self._LoadJobUnlocked(job_id)
2388 if job:
2389 if job.end_timestamp is None:
2390 if job.start_timestamp is None:
2391 job_age = job.received_timestamp
2392 else:
2393 job_age = job.start_timestamp
2394 else:
2395 job_age = job.end_timestamp
2396
2397 if age == -1 or now - job_age[0] > age:
2398 pending.append(job)
2399
2400
2401 if len(pending) >= 10:
2402 archived_count += self._ArchiveJobsUnlocked(pending)
2403 pending = []
2404
2405 if pending:
2406 archived_count += self._ArchiveJobsUnlocked(pending)
2407
2408 return (archived_count, len(all_job_ids) - last_touched)
2409
2411 """Returns a list of jobs in queue.
2412
2413 @type job_ids: list
2414 @param job_ids: sequence of job identifiers or None for all
2415 @type fields: list
2416 @param fields: names of fields to return
2417 @rtype: list
2418 @return: list one element per job, each element being list with
2419 the requested fields
2420
2421 """
2422 jobs = []
2423 list_all = False
2424 if not job_ids:
2425
2426
2427 job_ids = self._GetJobIDsUnlocked()
2428 list_all = True
2429
2430 for job_id in job_ids:
2431 job = self.SafeLoadJobFromDisk(job_id, True)
2432 if job is not None:
2433 jobs.append(job.GetInfo(fields))
2434 elif not list_all:
2435 jobs.append(None)
2436
2437 return jobs
2438
2439 @locking.ssynchronized(_LOCK)
2440 @_RequireOpenQueue
2442 """Stops the job queue.
2443
2444 This shutdowns all the worker threads an closes the queue.
2445
2446 """
2447 self._wpool.TerminateWorkers()
2448
2449 self._queue_filelock.Close()
2450 self._queue_filelock = None
2451