1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the job queue handling.
32
33 Locking: there's a single, large lock in the L{JobQueue} class. It's
34 used by all other classes in this module.
35
36 @var JOBQUEUE_THREADS: the number of worker threads we start for
37 processing jobs
38
39 """
40
41 import logging
42 import errno
43 import time
44 import weakref
45 import threading
46 import itertools
47 import operator
48 import os
49
50 try:
51
52 from pyinotify import pyinotify
53 except ImportError:
54 import pyinotify
55
56 from ganeti import asyncnotifier
57 from ganeti import constants
58 from ganeti import serializer
59 from ganeti import workerpool
60 from ganeti import locking
61 from ganeti import luxi
62 from ganeti import opcodes
63 from ganeti import opcodes_base
64 from ganeti import errors
65 from ganeti import mcpu
66 from ganeti import utils
67 from ganeti import jstore
68 import ganeti.rpc.node as rpc
69 from ganeti import runtime
70 from ganeti import netutils
71 from ganeti import compat
72 from ganeti import ht
73 from ganeti import query
74 from ganeti import qlang
75 from ganeti import pathutils
76 from ganeti import vcluster
77 from ganeti.cmdlib import cluster
78
79
80 JOBQUEUE_THREADS = 1
81
82
83 _LOCK = "_lock"
84 _QUEUE = "_queue"
85
86
87 _GetIdAttr = operator.attrgetter("id")
91 """Special exception to cancel a job.
92
93 """
94
97 """Returns the current timestamp.
98
99 @rtype: tuple
100 @return: the current time in the (seconds, microseconds) format
101
102 """
103 return utils.SplitTime(time.time())
104
112
115 """Encapsulates an opcode object.
116
117 @ivar log: holds the execution log and consists of tuples
118 of the form C{(log_serial, timestamp, level, message)}
119 @ivar input: the OpCode we encapsulate
120 @ivar status: the current status
121 @ivar result: the result of the LU execution
122 @ivar start_timestamp: timestamp for the start of the execution
123 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
124 @ivar stop_timestamp: timestamp for the end of the execution
125
126 """
127 __slots__ = ["input", "status", "result", "log", "priority",
128 "start_timestamp", "exec_timestamp", "end_timestamp",
129 "__weakref__"]
130
132 """Initializes instances of this class.
133
134 @type op: L{opcodes.OpCode}
135 @param op: the opcode we encapsulate
136
137 """
138 self.input = op
139 self.status = constants.OP_STATUS_QUEUED
140 self.result = None
141 self.log = []
142 self.start_timestamp = None
143 self.exec_timestamp = None
144 self.end_timestamp = None
145
146
147 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
148
149 @classmethod
151 """Restore the _QueuedOpCode from the serialized form.
152
153 @type state: dict
154 @param state: the serialized state
155 @rtype: _QueuedOpCode
156 @return: a new _QueuedOpCode instance
157
158 """
159 obj = _QueuedOpCode.__new__(cls)
160 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
161 obj.status = state["status"]
162 obj.result = state["result"]
163 obj.log = state["log"]
164 obj.start_timestamp = state.get("start_timestamp", None)
165 obj.exec_timestamp = state.get("exec_timestamp", None)
166 obj.end_timestamp = state.get("end_timestamp", None)
167 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
168 return obj
169
171 """Serializes this _QueuedOpCode.
172
173 @rtype: dict
174 @return: the dictionary holding the serialized state
175
176 """
177 return {
178 "input": self.input.__getstate__(),
179 "status": self.status,
180 "result": self.result,
181 "log": self.log,
182 "start_timestamp": self.start_timestamp,
183 "exec_timestamp": self.exec_timestamp,
184 "end_timestamp": self.end_timestamp,
185 "priority": self.priority,
186 }
187
190 """In-memory job representation.
191
192 This is what we use to track the user-submitted jobs. Locking must
193 be taken care of by users of this class.
194
195 @type queue: L{JobQueue}
196 @ivar queue: the parent queue
197 @ivar id: the job ID
198 @type ops: list
199 @ivar ops: the list of _QueuedOpCode that constitute the job
200 @type log_serial: int
201 @ivar log_serial: holds the index for the next log entry
202 @ivar received_timestamp: the timestamp for when the job was received
203 @ivar start_timestmap: the timestamp for start of execution
204 @ivar end_timestamp: the timestamp for end of execution
205 @ivar writable: Whether the job is allowed to be modified
206
207 """
208
209 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
210 "received_timestamp", "start_timestamp", "end_timestamp",
211 "processor_lock", "writable", "archived",
212 "livelock", "process_id",
213 "__weakref__"]
214
216 """Extend the reason trail
217
218 Add the reason for all the opcodes of this job to be executed.
219
220 """
221 count = 0
222 for queued_op in self.ops:
223 op = queued_op.input
224 if pickup:
225 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
226 else:
227 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
228 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
229 reason_src_prefix)
230 reason_text = "job=%d;index=%d" % (self.id, count)
231 reason = getattr(op, "reason", [])
232 reason.append((reason_src, reason_text, utils.EpochNano()))
233 op.reason = reason
234 count = count + 1
235
236 - def __init__(self, queue, job_id, ops, writable):
237 """Constructor for the _QueuedJob.
238
239 @type queue: L{JobQueue}
240 @param queue: our parent queue
241 @type job_id: job_id
242 @param job_id: our job id
243 @type ops: list
244 @param ops: the list of opcodes we hold, which will be encapsulated
245 in _QueuedOpCodes
246 @type writable: bool
247 @param writable: Whether job can be modified
248
249 """
250 if not ops:
251 raise errors.GenericError("A job needs at least one opcode")
252
253 self.queue = queue
254 self.id = int(job_id)
255 self.ops = [_QueuedOpCode(op) for op in ops]
256 self.AddReasons()
257 self.log_serial = 0
258 self.received_timestamp = TimeStampNow()
259 self.start_timestamp = None
260 self.end_timestamp = None
261 self.archived = False
262 self.livelock = None
263 self.process_id = None
264
265 self._InitInMemory(self, writable)
266
267 assert not self.archived, "New jobs can not be marked as archived"
268
269 @staticmethod
271 """Initializes in-memory variables.
272
273 """
274 obj.writable = writable
275 obj.ops_iter = None
276 obj.cur_opctx = None
277
278
279 if writable:
280 obj.processor_lock = threading.Lock()
281 else:
282 obj.processor_lock = None
283
285 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
286 "id=%s" % self.id,
287 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
288
289 return "<%s at %#x>" % (" ".join(status), id(self))
290
291 @classmethod
292 - def Restore(cls, queue, state, writable, archived):
293 """Restore a _QueuedJob from serialized state:
294
295 @type queue: L{JobQueue}
296 @param queue: to which queue the restored job belongs
297 @type state: dict
298 @param state: the serialized state
299 @type writable: bool
300 @param writable: Whether job can be modified
301 @type archived: bool
302 @param archived: Whether job was already archived
303 @rtype: _JobQueue
304 @return: the restored _JobQueue instance
305
306 """
307 obj = _QueuedJob.__new__(cls)
308 obj.queue = queue
309 obj.id = int(state["id"])
310 obj.received_timestamp = state.get("received_timestamp", None)
311 obj.start_timestamp = state.get("start_timestamp", None)
312 obj.end_timestamp = state.get("end_timestamp", None)
313 obj.archived = archived
314 obj.livelock = state.get("livelock", None)
315 obj.process_id = state.get("process_id", None)
316 if obj.process_id is not None:
317 obj.process_id = int(obj.process_id)
318
319 obj.ops = []
320 obj.log_serial = 0
321 for op_state in state["ops"]:
322 op = _QueuedOpCode.Restore(op_state)
323 for log_entry in op.log:
324 obj.log_serial = max(obj.log_serial, log_entry[0])
325 obj.ops.append(op)
326
327 cls._InitInMemory(obj, writable)
328
329 return obj
330
332 """Serialize the _JobQueue instance.
333
334 @rtype: dict
335 @return: the serialized state
336
337 """
338 return {
339 "id": self.id,
340 "ops": [op.Serialize() for op in self.ops],
341 "start_timestamp": self.start_timestamp,
342 "end_timestamp": self.end_timestamp,
343 "received_timestamp": self.received_timestamp,
344 "livelock": self.livelock,
345 "process_id": self.process_id,
346 }
347
400
402 """Gets the current priority for this job.
403
404 Only unfinished opcodes are considered. When all are done, the default
405 priority is used.
406
407 @rtype: int
408
409 """
410 priorities = [op.priority for op in self.ops
411 if op.status not in constants.OPS_FINALIZED]
412
413 if not priorities:
414
415 return constants.OP_PRIO_DEFAULT
416
417 return min(priorities)
418
420 """Selectively returns the log entries.
421
422 @type newer_than: None or int
423 @param newer_than: if this is None, return all log entries,
424 otherwise return only the log entries with serial higher
425 than this value
426 @rtype: list
427 @return: the list of the log entries selected
428
429 """
430 if newer_than is None:
431 serial = -1
432 else:
433 serial = newer_than
434
435 entries = []
436 for op in self.ops:
437 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
438
439 return entries
440
442 """Mark unfinished opcodes with a given status and result.
443
444 This is an utility function for marking all running or waiting to
445 be run opcodes with a given status. Opcodes which are already
446 finalised are not changed.
447
448 @param status: a given opcode status
449 @param result: the opcode result
450
451 """
452 not_marked = True
453 for op in self.ops:
454 if op.status in constants.OPS_FINALIZED:
455 assert not_marked, "Finalized opcodes found after non-finalized ones"
456 continue
457 op.status = status
458 op.result = result
459 not_marked = False
460
462 """Marks the job as finalized.
463
464 """
465 self.end_timestamp = TimeStampNow()
466
491
493 """Changes the job priority.
494
495 @type priority: int
496 @param priority: New priority
497 @rtype: tuple; (bool, string)
498 @return: Boolean describing whether job's priority was successfully changed
499 and a text message
500
501 """
502 status = self.CalcStatus()
503
504 if status in constants.JOBS_FINALIZED:
505 return (False, "Job %s is finished" % self.id)
506 elif status == constants.JOB_STATUS_CANCELING:
507 return (False, "Job %s is cancelling" % self.id)
508 else:
509 assert status in (constants.JOB_STATUS_QUEUED,
510 constants.JOB_STATUS_WAITING,
511 constants.JOB_STATUS_RUNNING)
512
513 changed = False
514 for op in self.ops:
515 if (op.status == constants.OP_STATUS_RUNNING or
516 op.status in constants.OPS_FINALIZED):
517 assert not changed, \
518 ("Found opcode for which priority should not be changed after"
519 " priority has been changed for previous opcodes")
520 continue
521
522 assert op.status in (constants.OP_STATUS_QUEUED,
523 constants.OP_STATUS_WAITING)
524
525 changed = True
526
527
528 op.priority = priority
529
530 if changed:
531 return (True, ("Priorities of pending opcodes for job %s have been"
532 " changed to %s" % (self.id, priority)))
533 else:
534 return (False, "Job %s had no pending opcodes" % self.id)
535
537 """Sets the job's process ID
538
539 @type pid: int
540 @param pid: the process ID
541
542 """
543 status = self.CalcStatus()
544
545 if status in (constants.JOB_STATUS_QUEUED,
546 constants.JOB_STATUS_WAITING):
547 if self.process_id is not None:
548 logging.warning("Replacing the process id %s of job %s with %s",
549 self.process_id, self.id, pid)
550 self.process_id = pid
551 else:
552 logging.warning("Can set pid only for queued/waiting jobs")
553
556
558 """Initializes this class.
559
560 @type queue: L{JobQueue}
561 @param queue: Job queue
562 @type job: L{_QueuedJob}
563 @param job: Job object
564 @type op: L{_QueuedOpCode}
565 @param op: OpCode
566
567 """
568 super(_OpExecCallbacks, self).__init__()
569
570 assert queue, "Queue is missing"
571 assert job, "Job is missing"
572 assert op, "Opcode is missing"
573
574 self._queue = queue
575 self._job = job
576 self._op = op
577
579 """Raises an exception to cancel the job if asked to.
580
581 """
582
583 if self._op.status == constants.OP_STATUS_CANCELING:
584 logging.debug("Canceling opcode")
585 raise CancelJob()
586
587 @locking.ssynchronized(_QUEUE, shared=1)
589 """Mark the opcode as running, not lock-waiting.
590
591 This is called from the mcpu code as a notifier function, when the LU is
592 finally about to start the Exec() method. Of course, to have end-user
593 visible results, the opcode must be initially (before calling into
594 Processor.ExecOpCode) set to OP_STATUS_WAITING.
595
596 """
597 assert self._op in self._job.ops
598 assert self._op.status in (constants.OP_STATUS_WAITING,
599 constants.OP_STATUS_CANCELING)
600
601
602 self._CheckCancel()
603
604 logging.debug("Opcode is now running")
605
606 self._op.status = constants.OP_STATUS_RUNNING
607 self._op.exec_timestamp = TimeStampNow()
608
609
610 self._queue.UpdateJobUnlocked(self._job)
611
612 @locking.ssynchronized(_QUEUE, shared=1)
614 """Mark opcode again as lock-waiting.
615
616 This is called from the mcpu code just after calling PrepareRetry.
617 The opcode will now again acquire locks (more, hopefully).
618
619 """
620 self._op.status = constants.OP_STATUS_WAITING
621 logging.debug("Opcode will be retried. Back to waiting.")
622
623 @locking.ssynchronized(_QUEUE, shared=1)
625 """Internal feedback append function, with locks
626
627 @type timestamp: tuple (int, int)
628 @param timestamp: timestamp of the log message
629
630 @type log_type: string
631 @param log_type: log type (one of Types.ELogType)
632
633 @type log_msgs: any
634 @param log_msgs: log data to append
635 """
636
637
638
639
640
641 if log_type == constants.ELOG_MESSAGE_LIST:
642 log_type = constants.ELOG_MESSAGE
643 else:
644 log_msgs = [log_msgs]
645
646 for msg in log_msgs:
647 self._job.log_serial += 1
648 self._op.log.append((self._job.log_serial, timestamp, log_type, msg))
649 self._queue.UpdateJobUnlocked(self._job, replicate=False)
650
651
653 """Append a log entry.
654
655 Calling conventions:
656 arg[0]: (optional) string, message type (Types.ELogType)
657 arg[1]: data to be interpreted as a message
658 """
659 assert len(args) < 3
660
661
662 if len(args) == 1:
663 log_type = constants.ELOG_MESSAGE
664 log_msg = args[0]
665 else:
666 (log_type, log_msg) = args
667
668
669
670 timestamp = utils.SplitTime(time.time())
671 self._AppendFeedback(timestamp, log_type, log_msg)
672
684
686 """Submits jobs for processing.
687
688 See L{JobQueue.SubmitManyJobs}.
689
690 """
691
692 return self._queue.SubmitManyJobs(jobs)
693
705
709 """Initializes this class.
710
711 """
712 self._fn = fn
713 self._next = None
714
716 """Gets the next timeout if necessary.
717
718 """
719 if self._next is None:
720 self._next = self._fn()
721
723 """Returns the next timeout.
724
725 """
726 self._Advance()
727 return self._next
728
730 """Returns the current timeout and advances the internal state.
731
732 """
733 self._Advance()
734 result = self._next
735 self._next = None
736 return result
737
738
739 -class _OpExecContext:
740 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
741 """Initializes this class.
742
743 """
744 self.op = op
745 self.index = index
746 self.log_prefix = log_prefix
747 self.summary = op.input.Summary()
748
749
750 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
751 self.jobdeps = op.input.depends[:]
752 else:
753 self.jobdeps = None
754
755 self._timeout_strategy_factory = timeout_strategy_factory
756 self._ResetTimeoutStrategy()
757
759 """Creates a new timeout strategy.
760
761 """
762 self._timeout_strategy = \
763 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
764
766 """Checks whether priority can and should be increased.
767
768 Called when locks couldn't be acquired.
769
770 """
771 op = self.op
772
773
774
775 if (self._timeout_strategy.Peek() is None and
776 op.priority > constants.OP_PRIO_HIGHEST):
777 logging.debug("Increasing priority")
778 op.priority -= 1
779 self._ResetTimeoutStrategy()
780 return True
781
782 return False
783
785 """Returns the next lock acquire timeout.
786
787 """
788 return self._timeout_strategy.Next()
789
792 (DEFER,
793 WAITDEP,
794 FINISHED) = range(1, 4)
795
798 """Initializes this class.
799
800 """
801 self.queue = queue
802 self.opexec_fn = opexec_fn
803 self.job = job
804 self._timeout_strategy_factory = _timeout_strategy_factory
805
806 @staticmethod
808 """Locates the next opcode to run.
809
810 @type job: L{_QueuedJob}
811 @param job: Job object
812 @param timeout_strategy_factory: Callable to create new timeout strategy
813
814 """
815
816
817
818
819 if job.ops_iter is None:
820 job.ops_iter = enumerate(job.ops)
821
822
823 while True:
824 try:
825 (idx, op) = job.ops_iter.next()
826 except StopIteration:
827 raise errors.ProgrammerError("Called for a finished job")
828
829 if op.status == constants.OP_STATUS_RUNNING:
830
831 raise errors.ProgrammerError("Called for job marked as running")
832
833 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
834 timeout_strategy_factory)
835
836 if op.status not in constants.OPS_FINALIZED:
837 return opctx
838
839
840
841
842
843 logging.info("%s: opcode %s already processed, skipping",
844 opctx.log_prefix, opctx.summary)
845
846 @staticmethod
881
882 @staticmethod
884 """Checks if an opcode has dependencies and if so, processes them.
885
886 @type queue: L{JobQueue}
887 @param queue: Queue object
888 @type job: L{_QueuedJob}
889 @param job: Job object
890 @type opctx: L{_OpExecContext}
891 @param opctx: Opcode execution context
892 @rtype: bool
893 @return: Whether opcode will be re-scheduled by dependency tracker
894
895 """
896 op = opctx.op
897
898 result = False
899
900 while opctx.jobdeps:
901 (dep_job_id, dep_status) = opctx.jobdeps[0]
902
903 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
904 dep_status)
905 assert ht.TNonEmptyString(depmsg), "No dependency message"
906
907 logging.info("%s: %s", opctx.log_prefix, depmsg)
908
909 if depresult == _JobDependencyManager.CONTINUE:
910
911 opctx.jobdeps.pop(0)
912
913 elif depresult == _JobDependencyManager.WAIT:
914
915
916 result = True
917 break
918
919 elif depresult == _JobDependencyManager.CANCEL:
920
921 job.Cancel()
922 assert op.status == constants.OP_STATUS_CANCELING
923 break
924
925 elif depresult in (_JobDependencyManager.WRONGSTATUS,
926 _JobDependencyManager.ERROR):
927
928 op.status = constants.OP_STATUS_ERROR
929 op.result = _EncodeOpError(errors.OpExecError(depmsg))
930 break
931
932 else:
933 raise errors.ProgrammerError("Unknown dependency result '%s'" %
934 depresult)
935
936 return result
937
939 """Processes one opcode and returns the result.
940
941 """
942 op = opctx.op
943
944 assert op.status in (constants.OP_STATUS_WAITING,
945 constants.OP_STATUS_CANCELING)
946
947
948 if op.status == constants.OP_STATUS_CANCELING:
949 return (constants.OP_STATUS_CANCELING, None)
950
951 timeout = opctx.GetNextLockTimeout()
952
953 try:
954
955 result = self.opexec_fn(op.input,
956 _OpExecCallbacks(self.queue, self.job, op),
957 timeout=timeout)
958 except mcpu.LockAcquireTimeout:
959 assert timeout is not None, "Received timeout for blocking acquire"
960 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
961
962 assert op.status in (constants.OP_STATUS_WAITING,
963 constants.OP_STATUS_CANCELING)
964
965
966 if op.status == constants.OP_STATUS_CANCELING:
967 return (constants.OP_STATUS_CANCELING, None)
968
969
970 return (constants.OP_STATUS_WAITING, None)
971 except CancelJob:
972 logging.exception("%s: Canceling job", opctx.log_prefix)
973 assert op.status == constants.OP_STATUS_CANCELING
974 return (constants.OP_STATUS_CANCELING, None)
975
976 except Exception, err:
977 logging.exception("%s: Caught exception in %s",
978 opctx.log_prefix, opctx.summary)
979 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
980 else:
981 logging.debug("%s: %s successful",
982 opctx.log_prefix, opctx.summary)
983 return (constants.OP_STATUS_SUCCESS, result)
984
986 """Continues execution of a job.
987
988 @param _nextop_fn: Callback function for tests
989 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
990 be deferred and C{WAITDEP} if the dependency manager
991 (L{_JobDependencyManager}) will re-schedule the job when appropriate
992
993 """
994 queue = self.queue
995 job = self.job
996
997 logging.debug("Processing job %s", job.id)
998
999 queue.acquire(shared=1)
1000 try:
1001 opcount = len(job.ops)
1002
1003 assert job.writable, "Expected writable job"
1004
1005
1006 if job.CalcStatus() in constants.JOBS_FINALIZED:
1007 return self.FINISHED
1008
1009
1010 if job.cur_opctx:
1011 opctx = job.cur_opctx
1012 job.cur_opctx = None
1013 else:
1014 if __debug__ and _nextop_fn:
1015 _nextop_fn()
1016 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1017
1018 op = opctx.op
1019
1020
1021 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1022 constants.OP_STATUS_CANCELING)
1023 for i in job.ops[opctx.index + 1:])
1024
1025 assert op.status in (constants.OP_STATUS_QUEUED,
1026 constants.OP_STATUS_WAITING,
1027 constants.OP_STATUS_CANCELING)
1028
1029 assert (op.priority <= constants.OP_PRIO_LOWEST and
1030 op.priority >= constants.OP_PRIO_HIGHEST)
1031
1032 waitjob = None
1033
1034 if op.status != constants.OP_STATUS_CANCELING:
1035 assert op.status in (constants.OP_STATUS_QUEUED,
1036 constants.OP_STATUS_WAITING)
1037
1038
1039 if self._MarkWaitlock(job, op):
1040
1041 queue.UpdateJobUnlocked(job)
1042
1043 assert op.status == constants.OP_STATUS_WAITING
1044 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1045 assert job.start_timestamp and op.start_timestamp
1046 assert waitjob is None
1047
1048
1049 waitjob = self._CheckDependencies(queue, job, opctx)
1050
1051 assert op.status in (constants.OP_STATUS_WAITING,
1052 constants.OP_STATUS_CANCELING,
1053 constants.OP_STATUS_ERROR)
1054
1055 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1056 constants.OP_STATUS_ERROR)):
1057 logging.info("%s: opcode %s waiting for locks",
1058 opctx.log_prefix, opctx.summary)
1059
1060 assert not opctx.jobdeps, "Not all dependencies were removed"
1061
1062 queue.release()
1063 try:
1064 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1065 finally:
1066 queue.acquire(shared=1)
1067
1068 op.status = op_status
1069 op.result = op_result
1070
1071 assert not waitjob
1072
1073 if op.status in (constants.OP_STATUS_WAITING,
1074 constants.OP_STATUS_QUEUED):
1075
1076
1077 assert not op.end_timestamp
1078 else:
1079
1080 op.end_timestamp = TimeStampNow()
1081
1082 if op.status == constants.OP_STATUS_CANCELING:
1083 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1084 for i in job.ops[opctx.index:])
1085 else:
1086 assert op.status in constants.OPS_FINALIZED
1087
1088 if op.status == constants.OP_STATUS_QUEUED:
1089
1090 assert not waitjob
1091
1092 finalize = False
1093
1094
1095 job.cur_opctx = None
1096
1097
1098 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1099
1100 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1101 finalize = False
1102
1103 if not waitjob and opctx.CheckPriorityIncrease():
1104
1105 queue.UpdateJobUnlocked(job)
1106
1107
1108 job.cur_opctx = opctx
1109
1110 assert (op.priority <= constants.OP_PRIO_LOWEST and
1111 op.priority >= constants.OP_PRIO_HIGHEST)
1112
1113
1114 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1115
1116 else:
1117
1118 assert (opctx.index == 0 or
1119 compat.all(i.status == constants.OP_STATUS_SUCCESS
1120 for i in job.ops[:opctx.index]))
1121
1122
1123 job.cur_opctx = None
1124
1125 if op.status == constants.OP_STATUS_SUCCESS:
1126 finalize = False
1127
1128 elif op.status == constants.OP_STATUS_ERROR:
1129
1130
1131
1132
1133 to_encode = errors.OpExecError("Preceding opcode failed")
1134 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1135 _EncodeOpError(to_encode))
1136 finalize = True
1137 elif op.status == constants.OP_STATUS_CANCELING:
1138 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1139 "Job canceled by request")
1140 finalize = True
1141
1142 else:
1143 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1144
1145 if opctx.index == (opcount - 1):
1146
1147 finalize = True
1148
1149 if finalize:
1150
1151 job.Finalize()
1152
1153
1154
1155 queue.UpdateJobUnlocked(job)
1156
1157 assert not waitjob
1158
1159 if finalize:
1160 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1161 return self.FINISHED
1162
1163 assert not waitjob or queue.depmgr.JobWaiting(job)
1164
1165 if waitjob:
1166 return self.WAITDEP
1167 else:
1168 return self.DEFER
1169 finally:
1170 assert job.writable, "Job became read-only while being processed"
1171 queue.release()
1172
1195
1198 """The actual job workers.
1199
1200 """
1202 """Job executor.
1203
1204 @type job: L{_QueuedJob}
1205 @param job: the job to be processed
1206
1207 """
1208 assert job.writable, "Expected writable job"
1209
1210
1211
1212
1213 job.processor_lock.acquire()
1214 try:
1215 return self._RunTaskInner(job)
1216 finally:
1217 job.processor_lock.release()
1218
1239
1240 @staticmethod
1242 """Updates the worker thread name to include a short summary of the opcode.
1243
1244 @param setname_fn: Callable setting worker thread name
1245 @param execop_fn: Callable for executing opcode (usually
1246 L{mcpu.Processor.ExecOpCode})
1247
1248 """
1249 setname_fn(op)
1250 try:
1251 return execop_fn(op, *args, **kwargs)
1252 finally:
1253 setname_fn(None)
1254
1255 @staticmethod
1257 """Sets the worker thread name.
1258
1259 @type job: L{_QueuedJob}
1260 @type op: L{opcodes.OpCode}
1261
1262 """
1263 parts = ["Job%s" % job.id]
1264
1265 if op:
1266 parts.append(op.TinySummary())
1267
1268 return "/".join(parts)
1269
1272 """Simple class implementing a job-processing workerpool.
1273
1274 """
1280
1283 """Keeps track of job dependencies.
1284
1285 """
1286 (WAIT,
1287 ERROR,
1288 CANCEL,
1289 CONTINUE,
1290 WRONGSTATUS) = range(1, 6)
1291
1292 - def __init__(self, getstatus_fn, enqueue_fn):
1293 """Initializes this class.
1294
1295 """
1296 self._getstatus_fn = getstatus_fn
1297 self._enqueue_fn = enqueue_fn
1298
1299 self._waiters = {}
1300 self._lock = locking.SharedLock("JobDepMgr")
1301
1302 @locking.ssynchronized(_LOCK, shared=1)
1304 """Retrieves information about waiting jobs.
1305
1306 @type requested: set
1307 @param requested: Requested information, see C{query.LQ_*}
1308
1309 """
1310
1311
1312
1313 return [("job/%s" % job_id, None, None,
1314 [("job", [job.id for job in waiters])])
1315 for job_id, waiters in self._waiters.items()
1316 if waiters]
1317
1318 @locking.ssynchronized(_LOCK, shared=1)
1320 """Checks if a job is waiting.
1321
1322 """
1323 return compat.any(job in jobs
1324 for jobs in self._waiters.values())
1325
1326 @locking.ssynchronized(_LOCK)
1328 """Checks if a dependency job has the requested status.
1329
1330 If the other job is not yet in a finalized status, the calling job will be
1331 notified (re-added to the workerpool) at a later point.
1332
1333 @type job: L{_QueuedJob}
1334 @param job: Job object
1335 @type dep_job_id: int
1336 @param dep_job_id: ID of dependency job
1337 @type dep_status: list
1338 @param dep_status: Required status
1339
1340 """
1341 assert ht.TJobId(job.id)
1342 assert ht.TJobId(dep_job_id)
1343 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1344
1345 if job.id == dep_job_id:
1346 return (self.ERROR, "Job can't depend on itself")
1347
1348
1349 try:
1350 status = self._getstatus_fn(dep_job_id)
1351 except errors.JobLost, err:
1352 return (self.ERROR, "Dependency error: %s" % err)
1353
1354 assert status in constants.JOB_STATUS_ALL
1355
1356 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1357
1358 if status not in constants.JOBS_FINALIZED:
1359
1360 job_id_waiters.add(job)
1361 return (self.WAIT,
1362 "Need to wait for job %s, wanted status '%s'" %
1363 (dep_job_id, dep_status))
1364
1365
1366 if job in job_id_waiters:
1367 job_id_waiters.remove(job)
1368
1369 if (status == constants.JOB_STATUS_CANCELED and
1370 constants.JOB_STATUS_CANCELED not in dep_status):
1371 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1372
1373 elif not dep_status or status in dep_status:
1374 return (self.CONTINUE,
1375 "Dependency job %s finished with status '%s'" %
1376 (dep_job_id, status))
1377
1378 else:
1379 return (self.WRONGSTATUS,
1380 "Dependency job %s finished with status '%s',"
1381 " not one of '%s' as required" %
1382 (dep_job_id, status, utils.CommaJoin(dep_status)))
1383
1385 """Remove all jobs without actual waiters.
1386
1387 """
1388 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1389 if not waiters]:
1390 del self._waiters[job_id]
1391
1393 """Notifies all jobs waiting for a certain job ID.
1394
1395 @attention: Do not call until L{CheckAndRegister} returned a status other
1396 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1397 @type job_id: int
1398 @param job_id: Job ID
1399
1400 """
1401 assert ht.TJobId(job_id)
1402
1403 self._lock.acquire()
1404 try:
1405 self._RemoveEmptyWaitersUnlocked()
1406
1407 jobs = self._waiters.pop(job_id, None)
1408 finally:
1409 self._lock.release()
1410
1411 if jobs:
1412
1413 logging.debug("Re-adding %s jobs which were waiting for job %s",
1414 len(jobs), job_id)
1415 self._enqueue_fn(jobs)
1416
1419 """Queue used to manage the jobs.
1420
1421 """
1423 """Constructor for JobQueue.
1424
1425 The constructor will initialize the job queue object and then
1426 start loading the current jobs from disk, either for starting them
1427 (if they were queue) or for aborting them (if they were already
1428 running).
1429
1430 @type context: GanetiContext
1431 @param context: the context object for access to the configuration
1432 data and other ganeti objects
1433
1434 """
1435 self.primary_jid = None
1436 self.context = context
1437 self._memcache = weakref.WeakValueDictionary()
1438 self._my_hostname = netutils.Hostname.GetSysName()
1439
1440
1441
1442
1443
1444
1445 self._lock = locking.SharedLock("JobQueue")
1446
1447 self.acquire = self._lock.acquire
1448 self.release = self._lock.release
1449
1450
1451 self._last_serial = jstore.ReadSerial()
1452 assert self._last_serial is not None, ("Serial file was modified between"
1453 " check in jstore and here")
1454
1455
1456 self._nodes = dict((n.name, n.primary_ip)
1457 for n in cfg.GetAllNodesInfo().values()
1458 if n.master_candidate)
1459
1460
1461 self._nodes.pop(self._my_hostname, None)
1462
1463
1464
1465 self._queue_size = None
1466 self._UpdateQueueSizeUnlocked()
1467 assert ht.TInt(self._queue_size)
1468
1469
1470 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1471 self._EnqueueJobs)
1472
1473
1474 self._wpool = _JobQueueWorkerPool(self)
1475
1477 """Load a job from the job queue
1478
1479 Pick up a job that already is in the job queue and start/resume it.
1480
1481 """
1482 if self.primary_jid:
1483 logging.warning("Job process asked to pick up %s, but already has %s",
1484 job_id, self.primary_jid)
1485
1486 self.primary_jid = int(job_id)
1487
1488 job = self._LoadJobUnlocked(job_id)
1489
1490 if job is None:
1491 logging.warning("Job %s could not be read", job_id)
1492 return
1493
1494 job.AddReasons(pickup=True)
1495
1496 status = job.CalcStatus()
1497 if status == constants.JOB_STATUS_QUEUED:
1498 job.SetPid(os.getpid())
1499 self._EnqueueJobsUnlocked([job])
1500 logging.info("Restarting job %s", job.id)
1501
1502 elif status in (constants.JOB_STATUS_RUNNING,
1503 constants.JOB_STATUS_WAITING,
1504 constants.JOB_STATUS_CANCELING):
1505 logging.warning("Unfinished job %s found: %s", job.id, job)
1506
1507 if status == constants.JOB_STATUS_WAITING:
1508 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1509 job.SetPid(os.getpid())
1510 self._EnqueueJobsUnlocked([job])
1511 logging.info("Restarting job %s", job.id)
1512 else:
1513 to_encode = errors.OpExecError("Unclean master daemon shutdown")
1514 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1515 _EncodeOpError(to_encode))
1516 job.Finalize()
1517
1518 self.UpdateJobUnlocked(job)
1519
1520 @locking.ssynchronized(_LOCK)
1523
1525 """Gets RPC runner with context.
1526
1527 """
1528 return rpc.JobQueueRunner(self.context, address_list)
1529
1530 @locking.ssynchronized(_LOCK)
1532 """Register a new node with the queue.
1533
1534 @type node: L{objects.Node}
1535 @param node: the node object to be added
1536
1537 """
1538 node_name = node.name
1539 assert node_name != self._my_hostname
1540
1541
1542 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1543 msg = result.fail_msg
1544 if msg:
1545 logging.warning("Cannot cleanup queue directory on node %s: %s",
1546 node_name, msg)
1547
1548 if not node.master_candidate:
1549
1550 self._nodes.pop(node_name, None)
1551
1552 return
1553
1554
1555 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1556
1557
1558 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1559
1560
1561 addrs = [node.primary_ip]
1562
1563 for file_name in files:
1564
1565 content = utils.ReadFile(file_name)
1566
1567 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1568 file_name, content)
1569 msg = result[node_name].fail_msg
1570 if msg:
1571 logging.error("Failed to upload file %s to node %s: %s",
1572 file_name, node_name, msg)
1573
1574 msg = result[node_name].fail_msg
1575 if msg:
1576 logging.error("Failed to set queue drained flag on node %s: %s",
1577 node_name, msg)
1578
1579 self._nodes[node_name] = node.primary_ip
1580
1581 @locking.ssynchronized(_LOCK)
1583 """Callback called when removing nodes from the cluster.
1584
1585 @type node_name: str
1586 @param node_name: the name of the node to remove
1587
1588 """
1589 self._nodes.pop(node_name, None)
1590
1591 @staticmethod
1593 """Verifies the status of an RPC call.
1594
1595 Since we aim to keep consistency should this node (the current
1596 master) fail, we will log errors if our rpc fail, and especially
1597 log the case when more than half of the nodes fails.
1598
1599 @param result: the data as returned from the rpc call
1600 @type nodes: list
1601 @param nodes: the list of nodes we made the call to
1602 @type failmsg: str
1603 @param failmsg: the identifier to be used for logging
1604
1605 """
1606 failed = []
1607 success = []
1608
1609 for node in nodes:
1610 msg = result[node].fail_msg
1611 if msg:
1612 failed.append(node)
1613 logging.error("RPC call %s (%s) failed on node %s: %s",
1614 result[node].call, failmsg, node, msg)
1615 else:
1616 success.append(node)
1617
1618
1619 if (len(success) + 1) < len(failed):
1620
1621 logging.error("More than half of the nodes failed")
1622
1624 """Helper for returning the node name/ip list.
1625
1626 @rtype: (list, list)
1627 @return: a tuple of two lists, the first one with the node
1628 names and the second one with the node addresses
1629
1630 """
1631
1632 name_list = self._nodes.keys()
1633 addr_list = [self._nodes[name] for name in name_list]
1634 return name_list, addr_list
1635
1637 """Writes a file locally and then replicates it to all nodes.
1638
1639 This function will replace the contents of a file on the local
1640 node and then replicate it to all the other nodes we have.
1641
1642 @type file_name: str
1643 @param file_name: the path of the file to be replicated
1644 @type data: str
1645 @param data: the new contents of the file
1646 @type replicate: boolean
1647 @param replicate: whether to spread the changes to the remote nodes
1648
1649 """
1650 getents = runtime.GetEnts()
1651 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1652 gid=getents.daemons_gid,
1653 mode=constants.JOB_QUEUE_FILES_PERMS)
1654
1655 if replicate:
1656 names, addrs = self._GetNodeIp()
1657 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1658 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1659
1661 """Renames a file locally and then replicate the change.
1662
1663 This function will rename a file in the local queue directory
1664 and then replicate this rename to all the other nodes we have.
1665
1666 @type rename: list of (old, new)
1667 @param rename: List containing tuples mapping old to new names
1668
1669 """
1670
1671 for old, new in rename:
1672 utils.RenameFile(old, new, mkdir=True)
1673
1674
1675 names, addrs = self._GetNodeIp()
1676 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1677 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1678
1679 @staticmethod
1681 """Returns the job file for a given job id.
1682
1683 @type job_id: str
1684 @param job_id: the job identifier
1685 @rtype: str
1686 @return: the path to the job file
1687
1688 """
1689 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1690
1691 @staticmethod
1693 """Returns the archived job file for a give job id.
1694
1695 @type job_id: str
1696 @param job_id: the job identifier
1697 @rtype: str
1698 @return: the path to the archived job file
1699
1700 """
1701 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1702 jstore.GetArchiveDirectory(job_id),
1703 "job-%s" % job_id)
1704
1705 @staticmethod
1707 """Build list of directories containing job files.
1708
1709 @type archived: bool
1710 @param archived: Whether to include directories for archived jobs
1711 @rtype: list
1712
1713 """
1714 result = [pathutils.QUEUE_DIR]
1715
1716 if archived:
1717 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1718 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1719 utils.ListVisibleFiles(archive_path)))
1720
1721 return result
1722
1723 @classmethod
1725 """Return all known job IDs.
1726
1727 The method only looks at disk because it's a requirement that all
1728 jobs are present on disk (so in the _memcache we don't have any
1729 extra IDs).
1730
1731 @type sort: boolean
1732 @param sort: perform sorting on the returned job ids
1733 @rtype: list
1734 @return: the list of job IDs
1735
1736 """
1737 jlist = []
1738
1739 for path in cls._DetermineJobDirectories(archived):
1740 for filename in utils.ListVisibleFiles(path):
1741 m = constants.JOB_FILE_RE.match(filename)
1742 if m:
1743 jlist.append(int(m.group(1)))
1744
1745 if sort:
1746 jlist.sort()
1747 return jlist
1748
1750 """Loads a job from the disk or memory.
1751
1752 Given a job id, this will return the cached job object if
1753 existing, or try to load the job from the disk. If loading from
1754 disk, it will also add the job to the cache.
1755
1756 @type job_id: int
1757 @param job_id: the job id
1758 @rtype: L{_QueuedJob} or None
1759 @return: either None or the job object
1760
1761 """
1762 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1763
1764 job = self._memcache.get(job_id, None)
1765 if job:
1766 logging.debug("Found job %s in memcache", job_id)
1767 assert job.writable, "Found read-only job in memcache"
1768 return job
1769
1770 try:
1771 job = self._LoadJobFromDisk(job_id, False)
1772 if job is None:
1773 return job
1774 except errors.JobFileCorrupted:
1775 old_path = self._GetJobPath(job_id)
1776 new_path = self._GetArchivedJobPath(job_id)
1777 if old_path == new_path:
1778
1779 logging.exception("Can't parse job %s", job_id)
1780 else:
1781
1782 logging.exception("Can't parse job %s, will archive.", job_id)
1783 self._RenameFilesUnlocked([(old_path, new_path)])
1784 return None
1785
1786 assert job.writable, "Job just loaded is not writable"
1787
1788 self._memcache[job_id] = job
1789 logging.debug("Added job %s to the cache", job_id)
1790 return job
1791
1793 """Load the given job file from disk.
1794
1795 Given a job file, read, load and restore it in a _QueuedJob format.
1796
1797 @type job_id: int
1798 @param job_id: job identifier
1799 @type try_archived: bool
1800 @param try_archived: Whether to try loading an archived job
1801 @rtype: L{_QueuedJob} or None
1802 @return: either None or the job object
1803
1804 """
1805 path_functions = [(self._GetJobPath, False)]
1806
1807 if try_archived:
1808 path_functions.append((self._GetArchivedJobPath, True))
1809
1810 raw_data = None
1811 archived = None
1812
1813 for (fn, archived) in path_functions:
1814 filepath = fn(job_id)
1815 logging.debug("Loading job from %s", filepath)
1816 try:
1817 raw_data = utils.ReadFile(filepath)
1818 except EnvironmentError, err:
1819 if err.errno != errno.ENOENT:
1820 raise
1821 else:
1822 break
1823
1824 if not raw_data:
1825 logging.debug("No data available for job %s", job_id)
1826 if int(job_id) == self.primary_jid:
1827 logging.warning("My own job file (%s) disappeared;"
1828 " this should only happy at cluster desctruction",
1829 job_id)
1830 if mcpu.lusExecuting[0] == 0:
1831 logging.warning("Not in execution; cleaning up myself due to missing"
1832 " job file")
1833 logging.shutdown()
1834 os._exit(1)
1835 return None
1836
1837 if writable is None:
1838 writable = not archived
1839
1840 try:
1841 data = serializer.LoadJson(raw_data)
1842 job = _QueuedJob.Restore(self, data, writable, archived)
1843 except Exception, err:
1844 raise errors.JobFileCorrupted(err)
1845
1846 return job
1847
1849 """Load the given job file from disk.
1850
1851 Given a job file, read, load and restore it in a _QueuedJob format.
1852 In case of error reading the job, it gets returned as None, and the
1853 exception is logged.
1854
1855 @type job_id: int
1856 @param job_id: job identifier
1857 @type try_archived: bool
1858 @param try_archived: Whether to try loading an archived job
1859 @rtype: L{_QueuedJob} or None
1860 @return: either None or the job object
1861
1862 """
1863 try:
1864 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1865 except (errors.JobFileCorrupted, EnvironmentError):
1866 logging.exception("Can't load/parse job %s", job_id)
1867 return None
1868
1870 """Update the queue size.
1871
1872 """
1873 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1874
1875 @classmethod
1881
1882 @staticmethod
1889
1890 @staticmethod
1892 """Resolves relative job IDs in dependencies.
1893
1894 @type resolve_fn: callable
1895 @param resolve_fn: Function to resolve a relative job ID
1896 @type deps: list
1897 @param deps: Dependencies
1898 @rtype: tuple; (boolean, string or list)
1899 @return: If successful (first tuple item), the returned list contains
1900 resolved job IDs along with the requested status; if not successful,
1901 the second element is an error message
1902
1903 """
1904 result = []
1905
1906 for (dep_job_id, dep_status) in deps:
1907 if ht.TRelativeJobId(dep_job_id):
1908 assert ht.TInt(dep_job_id) and dep_job_id < 0
1909 try:
1910 job_id = resolve_fn(dep_job_id)
1911 except IndexError:
1912
1913 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
1914 else:
1915 job_id = dep_job_id
1916
1917 result.append((job_id, dep_status))
1918
1919 return (True, result)
1920
1921 @locking.ssynchronized(_LOCK)
1923 """Helper function to add jobs to worker pool's queue.
1924
1925 @type jobs: list
1926 @param jobs: List of all jobs
1927
1928 """
1929 return self._EnqueueJobsUnlocked(jobs)
1930
1932 """Helper function to add jobs to worker pool's queue.
1933
1934 @type jobs: list
1935 @param jobs: List of all jobs
1936
1937 """
1938 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
1939 self._wpool.AddManyTasks([(job, ) for job in jobs],
1940 priority=[job.CalcPriority() for job in jobs],
1941 task_id=map(_GetIdAttr, jobs))
1942
1944 """Gets the status of a job for dependencies.
1945
1946 @type job_id: int
1947 @param job_id: Job ID
1948 @raise errors.JobLost: If job can't be found
1949
1950 """
1951
1952
1953
1954 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1955
1956 if job:
1957 assert not job.writable, "Got writable job"
1958
1959 if job:
1960 return job.CalcStatus()
1961
1962 raise errors.JobLost("Job %s not found" % job_id)
1963
1965 """Update a job's on disk storage.
1966
1967 After a job has been modified, this function needs to be called in
1968 order to write the changes to disk and replicate them to the other
1969 nodes.
1970
1971 @type job: L{_QueuedJob}
1972 @param job: the changed job
1973 @type replicate: boolean
1974 @param replicate: whether to replicate the change to remote nodes
1975
1976 """
1977 if __debug__:
1978 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1979 assert (finalized ^ (job.end_timestamp is None))
1980 assert job.writable, "Can't update read-only job"
1981 assert not job.archived, "Can't update archived job"
1982
1983 filename = self._GetJobPath(job.id)
1984 data = serializer.DumpJson(job.Serialize())
1985 logging.debug("Writing job %s to %s", job.id, filename)
1986 self._UpdateJobQueueFile(filename, data, replicate)
1987
1989 """Checks if a job has been finalized.
1990
1991 @type job_id: int
1992 @param job_id: Job identifier
1993 @rtype: boolean
1994 @return: True if the job has been finalized,
1995 False if the timeout has been reached,
1996 None if the job doesn't exist
1997
1998 """
1999 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2000 if job is not None:
2001 return job.CalcStatus() in constants.JOBS_FINALIZED
2002 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed:
2003
2004
2005
2006 return True
2007 else:
2008 return None
2009
2010 @locking.ssynchronized(_LOCK)
2012 """Cancels a job.
2013
2014 This will only succeed if the job has not started yet.
2015
2016 @type job_id: int
2017 @param job_id: job ID of job to be cancelled.
2018
2019 """
2020 logging.info("Cancelling job %s", job_id)
2021
2022 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2023
2024 @locking.ssynchronized(_LOCK)
2026 """Changes a job's priority.
2027
2028 @type job_id: int
2029 @param job_id: ID of the job whose priority should be changed
2030 @type priority: int
2031 @param priority: New priority
2032
2033 """
2034 logging.info("Changing priority of job %s to %s", job_id, priority)
2035
2036 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2037 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2038 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2039 (priority, allowed))
2040
2041 def fn(job):
2042 (success, msg) = job.ChangePriority(priority)
2043
2044 if success:
2045 try:
2046 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2047 except workerpool.NoSuchTask:
2048 logging.debug("Job %s is not in workerpool at this time", job.id)
2049
2050 return (success, msg)
2051
2052 return self._ModifyJobUnlocked(job_id, fn)
2053
2055 """Modifies a job.
2056
2057 @type job_id: int
2058 @param job_id: Job ID
2059 @type mod_fn: callable
2060 @param mod_fn: Modifying function, receiving job object as parameter,
2061 returning tuple of (status boolean, message string)
2062
2063 """
2064 job = self._LoadJobUnlocked(job_id)
2065 if not job:
2066 logging.debug("Job %s not found", job_id)
2067 return (False, "Job %s not found" % job_id)
2068
2069 assert job.writable, "Can't modify read-only job"
2070 assert not job.archived, "Can't modify archived job"
2071
2072 (success, msg) = mod_fn(job)
2073
2074 if success:
2075
2076
2077 self.UpdateJobUnlocked(job)
2078
2079 return (success, msg)
2080
2082 """Archives jobs.
2083
2084 @type jobs: list of L{_QueuedJob}
2085 @param jobs: Job objects
2086 @rtype: int
2087 @return: Number of archived jobs
2088
2089 """
2090 archive_jobs = []
2091 rename_files = []
2092 for job in jobs:
2093 assert job.writable, "Can't archive read-only job"
2094 assert not job.archived, "Can't cancel archived job"
2095
2096 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2097 logging.debug("Job %s is not yet done", job.id)
2098 continue
2099
2100 archive_jobs.append(job)
2101
2102 old = self._GetJobPath(job.id)
2103 new = self._GetArchivedJobPath(job.id)
2104 rename_files.append((old, new))
2105
2106
2107 self._RenameFilesUnlocked(rename_files)
2108
2109 logging.debug("Successfully archived job(s) %s",
2110 utils.CommaJoin(job.id for job in archive_jobs))
2111
2112
2113
2114
2115
2116 self._UpdateQueueSizeUnlocked()
2117 return len(archive_jobs)
2118
2119 - def _Query(self, fields, qfilter):
2120 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2121 namefield="id")
2122
2123
2124
2125
2126 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2127
2128 job_ids = qobj.RequestedNames()
2129
2130 list_all = (job_ids is None)
2131
2132 if list_all:
2133
2134
2135 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2136
2137 jobs = []
2138
2139 for job_id in job_ids:
2140 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2141 if job is not None or not list_all:
2142 jobs.append((job_id, job))
2143
2144 return (qobj, jobs, list_all)
2145
2147 """Returns a list of jobs in queue.
2148
2149 @type fields: sequence
2150 @param fields: List of wanted fields
2151 @type qfilter: None or query2 filter (list)
2152 @param qfilter: Query filter
2153
2154 """
2155 (qobj, ctx, _) = self._Query(fields, qfilter)
2156
2157 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2158
2160 """Returns a list of jobs in queue.
2161
2162 @type job_ids: list
2163 @param job_ids: sequence of job identifiers or None for all
2164 @type fields: list
2165 @param fields: names of fields to return
2166 @rtype: list
2167 @return: list one element per job, each element being list with
2168 the requested fields
2169
2170 """
2171
2172 job_ids = [int(jid) for jid in job_ids]
2173 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2174
2175 (qobj, ctx, _) = self._Query(fields, qfilter)
2176
2177 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2178
2179 @locking.ssynchronized(_LOCK)
2181 """Prepare to stop the job queue.
2182
2183 Returns whether there are any jobs currently running. If the latter is the
2184 case, the job queue is not yet ready for shutdown. Once this function
2185 returns C{True} L{Shutdown} can be called without interfering with any job.
2186
2187 @rtype: bool
2188 @return: Whether there are any running jobs
2189
2190 """
2191 return self._wpool.HasRunningTasks()
2192
2193 @locking.ssynchronized(_LOCK)
2195 """Stops the job queue.
2196
2197 This shutdowns all the worker threads an closes the queue.
2198
2199 """
2200 self._wpool.TerminateWorkers()
2201