1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the job queue handling.
32
33 Locking: there's a single, large lock in the L{JobQueue} class. It's
34 used by all other classes in this module.
35
36 @var JOBQUEUE_THREADS: the number of worker threads we start for
37 processing jobs
38
39 """
40
41 import logging
42 import errno
43 import time
44 import weakref
45 import threading
46 import itertools
47 import operator
48 import os
49
50 try:
51
52 from pyinotify import pyinotify
53 except ImportError:
54 import pyinotify
55
56 from ganeti import asyncnotifier
57 from ganeti import constants
58 from ganeti import serializer
59 from ganeti import workerpool
60 from ganeti import locking
61 from ganeti import luxi
62 from ganeti import opcodes
63 from ganeti import opcodes_base
64 from ganeti import errors
65 from ganeti import mcpu
66 from ganeti import utils
67 from ganeti import jstore
68 import ganeti.rpc.node as rpc
69 from ganeti import runtime
70 from ganeti import netutils
71 from ganeti import compat
72 from ganeti import ht
73 from ganeti import query
74 from ganeti import qlang
75 from ganeti import pathutils
76 from ganeti import vcluster
77 from ganeti.cmdlib import cluster
78
79
80 JOBQUEUE_THREADS = 1
81
82
83 _LOCK = "_lock"
84 _QUEUE = "_queue"
85
86
87 _GetIdAttr = operator.attrgetter("id")
91 """Special exception to cancel a job.
92
93 """
94
97 """Returns the current timestamp.
98
99 @rtype: tuple
100 @return: the current time in the (seconds, microseconds) format
101
102 """
103 return utils.SplitTime(time.time())
104
112
115 """Encapsulates an opcode object.
116
117 @ivar log: holds the execution log and consists of tuples
118 of the form C{(log_serial, timestamp, level, message)}
119 @ivar input: the OpCode we encapsulate
120 @ivar status: the current status
121 @ivar result: the result of the LU execution
122 @ivar start_timestamp: timestamp for the start of the execution
123 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
124 @ivar stop_timestamp: timestamp for the end of the execution
125
126 """
127 __slots__ = ["input", "status", "result", "log", "priority",
128 "start_timestamp", "exec_timestamp", "end_timestamp",
129 "__weakref__"]
130
132 """Initializes instances of this class.
133
134 @type op: L{opcodes.OpCode}
135 @param op: the opcode we encapsulate
136
137 """
138 self.input = op
139 self.status = constants.OP_STATUS_QUEUED
140 self.result = None
141 self.log = []
142 self.start_timestamp = None
143 self.exec_timestamp = None
144 self.end_timestamp = None
145
146
147 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
148
149 @classmethod
151 """Restore the _QueuedOpCode from the serialized form.
152
153 @type state: dict
154 @param state: the serialized state
155 @rtype: _QueuedOpCode
156 @return: a new _QueuedOpCode instance
157
158 """
159 obj = _QueuedOpCode.__new__(cls)
160 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
161 obj.status = state["status"]
162 obj.result = state["result"]
163 obj.log = state["log"]
164 obj.start_timestamp = state.get("start_timestamp", None)
165 obj.exec_timestamp = state.get("exec_timestamp", None)
166 obj.end_timestamp = state.get("end_timestamp", None)
167 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
168 return obj
169
171 """Serializes this _QueuedOpCode.
172
173 @rtype: dict
174 @return: the dictionary holding the serialized state
175
176 """
177 return {
178 "input": self.input.__getstate__(),
179 "status": self.status,
180 "result": self.result,
181 "log": self.log,
182 "start_timestamp": self.start_timestamp,
183 "exec_timestamp": self.exec_timestamp,
184 "end_timestamp": self.end_timestamp,
185 "priority": self.priority,
186 }
187
190 """In-memory job representation.
191
192 This is what we use to track the user-submitted jobs. Locking must
193 be taken care of by users of this class.
194
195 @type queue: L{JobQueue}
196 @ivar queue: the parent queue
197 @ivar id: the job ID
198 @type ops: list
199 @ivar ops: the list of _QueuedOpCode that constitute the job
200 @type log_serial: int
201 @ivar log_serial: holds the index for the next log entry
202 @ivar received_timestamp: the timestamp for when the job was received
203 @ivar start_timestmap: the timestamp for start of execution
204 @ivar end_timestamp: the timestamp for end of execution
205 @ivar writable: Whether the job is allowed to be modified
206
207 """
208
209 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
210 "received_timestamp", "start_timestamp", "end_timestamp",
211 "processor_lock", "writable", "archived",
212 "livelock", "process_id",
213 "__weakref__"]
214
216 """Extend the reason trail
217
218 Add the reason for all the opcodes of this job to be executed.
219
220 """
221 count = 0
222 for queued_op in self.ops:
223 op = queued_op.input
224 if pickup:
225 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
226 else:
227 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
228 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
229 reason_src_prefix)
230 reason_text = "job=%d;index=%d" % (self.id, count)
231 reason = getattr(op, "reason", [])
232 reason.append((reason_src, reason_text, utils.EpochNano()))
233 op.reason = reason
234 count = count + 1
235
236 - def __init__(self, queue, job_id, ops, writable):
237 """Constructor for the _QueuedJob.
238
239 @type queue: L{JobQueue}
240 @param queue: our parent queue
241 @type job_id: job_id
242 @param job_id: our job id
243 @type ops: list
244 @param ops: the list of opcodes we hold, which will be encapsulated
245 in _QueuedOpCodes
246 @type writable: bool
247 @param writable: Whether job can be modified
248
249 """
250 if not ops:
251 raise errors.GenericError("A job needs at least one opcode")
252
253 self.queue = queue
254 self.id = int(job_id)
255 self.ops = [_QueuedOpCode(op) for op in ops]
256 self.AddReasons()
257 self.log_serial = 0
258 self.received_timestamp = TimeStampNow()
259 self.start_timestamp = None
260 self.end_timestamp = None
261 self.archived = False
262 self.livelock = None
263 self.process_id = None
264
265 self._InitInMemory(self, writable)
266
267 assert not self.archived, "New jobs can not be marked as archived"
268
269 @staticmethod
271 """Initializes in-memory variables.
272
273 """
274 obj.writable = writable
275 obj.ops_iter = None
276 obj.cur_opctx = None
277
278
279 if writable:
280 obj.processor_lock = threading.Lock()
281 else:
282 obj.processor_lock = None
283
285 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
286 "id=%s" % self.id,
287 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
288
289 return "<%s at %#x>" % (" ".join(status), id(self))
290
291 @classmethod
292 - def Restore(cls, queue, state, writable, archived):
293 """Restore a _QueuedJob from serialized state:
294
295 @type queue: L{JobQueue}
296 @param queue: to which queue the restored job belongs
297 @type state: dict
298 @param state: the serialized state
299 @type writable: bool
300 @param writable: Whether job can be modified
301 @type archived: bool
302 @param archived: Whether job was already archived
303 @rtype: _JobQueue
304 @return: the restored _JobQueue instance
305
306 """
307 obj = _QueuedJob.__new__(cls)
308 obj.queue = queue
309 obj.id = int(state["id"])
310 obj.received_timestamp = state.get("received_timestamp", None)
311 obj.start_timestamp = state.get("start_timestamp", None)
312 obj.end_timestamp = state.get("end_timestamp", None)
313 obj.archived = archived
314 obj.livelock = state.get("livelock", None)
315 obj.process_id = state.get("process_id", None)
316 if obj.process_id is not None:
317 obj.process_id = int(obj.process_id)
318
319 obj.ops = []
320 obj.log_serial = 0
321 for op_state in state["ops"]:
322 op = _QueuedOpCode.Restore(op_state)
323 for log_entry in op.log:
324 obj.log_serial = max(obj.log_serial, log_entry[0])
325 obj.ops.append(op)
326
327 cls._InitInMemory(obj, writable)
328
329 return obj
330
332 """Serialize the _JobQueue instance.
333
334 @rtype: dict
335 @return: the serialized state
336
337 """
338 return {
339 "id": self.id,
340 "ops": [op.Serialize() for op in self.ops],
341 "start_timestamp": self.start_timestamp,
342 "end_timestamp": self.end_timestamp,
343 "received_timestamp": self.received_timestamp,
344 "livelock": self.livelock,
345 "process_id": self.process_id,
346 }
347
400
402 """Gets the current priority for this job.
403
404 Only unfinished opcodes are considered. When all are done, the default
405 priority is used.
406
407 @rtype: int
408
409 """
410 priorities = [op.priority for op in self.ops
411 if op.status not in constants.OPS_FINALIZED]
412
413 if not priorities:
414
415 return constants.OP_PRIO_DEFAULT
416
417 return min(priorities)
418
420 """Selectively returns the log entries.
421
422 @type newer_than: None or int
423 @param newer_than: if this is None, return all log entries,
424 otherwise return only the log entries with serial higher
425 than this value
426 @rtype: list
427 @return: the list of the log entries selected
428
429 """
430 if newer_than is None:
431 serial = -1
432 else:
433 serial = newer_than
434
435 entries = []
436 for op in self.ops:
437 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
438
439 return entries
440
442 """Mark unfinished opcodes with a given status and result.
443
444 This is an utility function for marking all running or waiting to
445 be run opcodes with a given status. Opcodes which are already
446 finalised are not changed.
447
448 @param status: a given opcode status
449 @param result: the opcode result
450
451 """
452 not_marked = True
453 for op in self.ops:
454 if op.status in constants.OPS_FINALIZED:
455 assert not_marked, "Finalized opcodes found after non-finalized ones"
456 continue
457 op.status = status
458 op.result = result
459 not_marked = False
460
462 """Marks the job as finalized.
463
464 """
465 self.end_timestamp = TimeStampNow()
466
491
493 """Changes the job priority.
494
495 @type priority: int
496 @param priority: New priority
497 @rtype: tuple; (bool, string)
498 @return: Boolean describing whether job's priority was successfully changed
499 and a text message
500
501 """
502 status = self.CalcStatus()
503
504 if status in constants.JOBS_FINALIZED:
505 return (False, "Job %s is finished" % self.id)
506 elif status == constants.JOB_STATUS_CANCELING:
507 return (False, "Job %s is cancelling" % self.id)
508 else:
509 assert status in (constants.JOB_STATUS_QUEUED,
510 constants.JOB_STATUS_WAITING,
511 constants.JOB_STATUS_RUNNING)
512
513 changed = False
514 for op in self.ops:
515 if (op.status == constants.OP_STATUS_RUNNING or
516 op.status in constants.OPS_FINALIZED):
517 assert not changed, \
518 ("Found opcode for which priority should not be changed after"
519 " priority has been changed for previous opcodes")
520 continue
521
522 assert op.status in (constants.OP_STATUS_QUEUED,
523 constants.OP_STATUS_WAITING)
524
525 changed = True
526
527
528 op.priority = priority
529
530 if changed:
531 return (True, ("Priorities of pending opcodes for job %s have been"
532 " changed to %s" % (self.id, priority)))
533 else:
534 return (False, "Job %s had no pending opcodes" % self.id)
535
537 """Sets the job's process ID
538
539 @type pid: int
540 @param pid: the process ID
541
542 """
543 status = self.CalcStatus()
544
545 if status in (constants.JOB_STATUS_QUEUED,
546 constants.JOB_STATUS_WAITING):
547 if self.process_id is not None:
548 logging.warning("Replacing the process id %s of job %s with %s",
549 self.process_id, self.id, pid)
550 self.process_id = pid
551 else:
552 logging.warning("Can set pid only for queued/waiting jobs")
553
556
558 """Initializes this class.
559
560 @type queue: L{JobQueue}
561 @param queue: Job queue
562 @type job: L{_QueuedJob}
563 @param job: Job object
564 @type op: L{_QueuedOpCode}
565 @param op: OpCode
566
567 """
568 super(_OpExecCallbacks, self).__init__()
569
570 assert queue, "Queue is missing"
571 assert job, "Job is missing"
572 assert op, "Opcode is missing"
573
574 self._queue = queue
575 self._job = job
576 self._op = op
577
579 """Raises an exception to cancel the job if asked to.
580
581 """
582
583 if self._op.status == constants.OP_STATUS_CANCELING:
584 logging.debug("Canceling opcode")
585 raise CancelJob()
586
587 @locking.ssynchronized(_QUEUE, shared=1)
589 """Mark the opcode as running, not lock-waiting.
590
591 This is called from the mcpu code as a notifier function, when the LU is
592 finally about to start the Exec() method. Of course, to have end-user
593 visible results, the opcode must be initially (before calling into
594 Processor.ExecOpCode) set to OP_STATUS_WAITING.
595
596 """
597 assert self._op in self._job.ops
598 assert self._op.status in (constants.OP_STATUS_WAITING,
599 constants.OP_STATUS_CANCELING)
600
601
602 self._CheckCancel()
603
604 logging.debug("Opcode is now running")
605
606 self._op.status = constants.OP_STATUS_RUNNING
607 self._op.exec_timestamp = TimeStampNow()
608
609
610 self._queue.UpdateJobUnlocked(self._job)
611
612 @locking.ssynchronized(_QUEUE, shared=1)
614 """Mark opcode again as lock-waiting.
615
616 This is called from the mcpu code just after calling PrepareRetry.
617 The opcode will now again acquire locks (more, hopefully).
618
619 """
620 self._op.status = constants.OP_STATUS_WAITING
621 logging.debug("Opcode will be retried. Back to waiting.")
622
623 @locking.ssynchronized(_QUEUE, shared=1)
625 """Internal feedback append function, with locks
626
627 """
628 self._job.log_serial += 1
629 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
630 self._queue.UpdateJobUnlocked(self._job, replicate=False)
631
648
660
662 """Submits jobs for processing.
663
664 See L{JobQueue.SubmitManyJobs}.
665
666 """
667
668 return self._queue.SubmitManyJobs(jobs)
669
681
685 """Initializes this class.
686
687 """
688 self._fn = fn
689 self._next = None
690
692 """Gets the next timeout if necessary.
693
694 """
695 if self._next is None:
696 self._next = self._fn()
697
699 """Returns the next timeout.
700
701 """
702 self._Advance()
703 return self._next
704
706 """Returns the current timeout and advances the internal state.
707
708 """
709 self._Advance()
710 result = self._next
711 self._next = None
712 return result
713
714
715 -class _OpExecContext:
716 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
717 """Initializes this class.
718
719 """
720 self.op = op
721 self.index = index
722 self.log_prefix = log_prefix
723 self.summary = op.input.Summary()
724
725
726 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
727 self.jobdeps = op.input.depends[:]
728 else:
729 self.jobdeps = None
730
731 self._timeout_strategy_factory = timeout_strategy_factory
732 self._ResetTimeoutStrategy()
733
735 """Creates a new timeout strategy.
736
737 """
738 self._timeout_strategy = \
739 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
740
742 """Checks whether priority can and should be increased.
743
744 Called when locks couldn't be acquired.
745
746 """
747 op = self.op
748
749
750
751 if (self._timeout_strategy.Peek() is None and
752 op.priority > constants.OP_PRIO_HIGHEST):
753 logging.debug("Increasing priority")
754 op.priority -= 1
755 self._ResetTimeoutStrategy()
756 return True
757
758 return False
759
761 """Returns the next lock acquire timeout.
762
763 """
764 return self._timeout_strategy.Next()
765
768 (DEFER,
769 WAITDEP,
770 FINISHED) = range(1, 4)
771
774 """Initializes this class.
775
776 """
777 self.queue = queue
778 self.opexec_fn = opexec_fn
779 self.job = job
780 self._timeout_strategy_factory = _timeout_strategy_factory
781
782 @staticmethod
784 """Locates the next opcode to run.
785
786 @type job: L{_QueuedJob}
787 @param job: Job object
788 @param timeout_strategy_factory: Callable to create new timeout strategy
789
790 """
791
792
793
794
795 if job.ops_iter is None:
796 job.ops_iter = enumerate(job.ops)
797
798
799 while True:
800 try:
801 (idx, op) = job.ops_iter.next()
802 except StopIteration:
803 raise errors.ProgrammerError("Called for a finished job")
804
805 if op.status == constants.OP_STATUS_RUNNING:
806
807 raise errors.ProgrammerError("Called for job marked as running")
808
809 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
810 timeout_strategy_factory)
811
812 if op.status not in constants.OPS_FINALIZED:
813 return opctx
814
815
816
817
818
819 logging.info("%s: opcode %s already processed, skipping",
820 opctx.log_prefix, opctx.summary)
821
822 @staticmethod
857
858 @staticmethod
860 """Checks if an opcode has dependencies and if so, processes them.
861
862 @type queue: L{JobQueue}
863 @param queue: Queue object
864 @type job: L{_QueuedJob}
865 @param job: Job object
866 @type opctx: L{_OpExecContext}
867 @param opctx: Opcode execution context
868 @rtype: bool
869 @return: Whether opcode will be re-scheduled by dependency tracker
870
871 """
872 op = opctx.op
873
874 result = False
875
876 while opctx.jobdeps:
877 (dep_job_id, dep_status) = opctx.jobdeps[0]
878
879 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
880 dep_status)
881 assert ht.TNonEmptyString(depmsg), "No dependency message"
882
883 logging.info("%s: %s", opctx.log_prefix, depmsg)
884
885 if depresult == _JobDependencyManager.CONTINUE:
886
887 opctx.jobdeps.pop(0)
888
889 elif depresult == _JobDependencyManager.WAIT:
890
891
892 result = True
893 break
894
895 elif depresult == _JobDependencyManager.CANCEL:
896
897 job.Cancel()
898 assert op.status == constants.OP_STATUS_CANCELING
899 break
900
901 elif depresult in (_JobDependencyManager.WRONGSTATUS,
902 _JobDependencyManager.ERROR):
903
904 op.status = constants.OP_STATUS_ERROR
905 op.result = _EncodeOpError(errors.OpExecError(depmsg))
906 break
907
908 else:
909 raise errors.ProgrammerError("Unknown dependency result '%s'" %
910 depresult)
911
912 return result
913
915 """Processes one opcode and returns the result.
916
917 """
918 op = opctx.op
919
920 assert op.status in (constants.OP_STATUS_WAITING,
921 constants.OP_STATUS_CANCELING)
922
923
924 if op.status == constants.OP_STATUS_CANCELING:
925 return (constants.OP_STATUS_CANCELING, None)
926
927 timeout = opctx.GetNextLockTimeout()
928
929 try:
930
931 result = self.opexec_fn(op.input,
932 _OpExecCallbacks(self.queue, self.job, op),
933 timeout=timeout)
934 except mcpu.LockAcquireTimeout:
935 assert timeout is not None, "Received timeout for blocking acquire"
936 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
937
938 assert op.status in (constants.OP_STATUS_WAITING,
939 constants.OP_STATUS_CANCELING)
940
941
942 if op.status == constants.OP_STATUS_CANCELING:
943 return (constants.OP_STATUS_CANCELING, None)
944
945
946 return (constants.OP_STATUS_WAITING, None)
947 except CancelJob:
948 logging.exception("%s: Canceling job", opctx.log_prefix)
949 assert op.status == constants.OP_STATUS_CANCELING
950 return (constants.OP_STATUS_CANCELING, None)
951
952 except Exception, err:
953 logging.exception("%s: Caught exception in %s",
954 opctx.log_prefix, opctx.summary)
955 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
956 else:
957 logging.debug("%s: %s successful",
958 opctx.log_prefix, opctx.summary)
959 return (constants.OP_STATUS_SUCCESS, result)
960
962 """Continues execution of a job.
963
964 @param _nextop_fn: Callback function for tests
965 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
966 be deferred and C{WAITDEP} if the dependency manager
967 (L{_JobDependencyManager}) will re-schedule the job when appropriate
968
969 """
970 queue = self.queue
971 job = self.job
972
973 logging.debug("Processing job %s", job.id)
974
975 queue.acquire(shared=1)
976 try:
977 opcount = len(job.ops)
978
979 assert job.writable, "Expected writable job"
980
981
982 if job.CalcStatus() in constants.JOBS_FINALIZED:
983 return self.FINISHED
984
985
986 if job.cur_opctx:
987 opctx = job.cur_opctx
988 job.cur_opctx = None
989 else:
990 if __debug__ and _nextop_fn:
991 _nextop_fn()
992 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
993
994 op = opctx.op
995
996
997 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
998 constants.OP_STATUS_CANCELING)
999 for i in job.ops[opctx.index + 1:])
1000
1001 assert op.status in (constants.OP_STATUS_QUEUED,
1002 constants.OP_STATUS_WAITING,
1003 constants.OP_STATUS_CANCELING)
1004
1005 assert (op.priority <= constants.OP_PRIO_LOWEST and
1006 op.priority >= constants.OP_PRIO_HIGHEST)
1007
1008 waitjob = None
1009
1010 if op.status != constants.OP_STATUS_CANCELING:
1011 assert op.status in (constants.OP_STATUS_QUEUED,
1012 constants.OP_STATUS_WAITING)
1013
1014
1015 if self._MarkWaitlock(job, op):
1016
1017 queue.UpdateJobUnlocked(job)
1018
1019 assert op.status == constants.OP_STATUS_WAITING
1020 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1021 assert job.start_timestamp and op.start_timestamp
1022 assert waitjob is None
1023
1024
1025 waitjob = self._CheckDependencies(queue, job, opctx)
1026
1027 assert op.status in (constants.OP_STATUS_WAITING,
1028 constants.OP_STATUS_CANCELING,
1029 constants.OP_STATUS_ERROR)
1030
1031 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1032 constants.OP_STATUS_ERROR)):
1033 logging.info("%s: opcode %s waiting for locks",
1034 opctx.log_prefix, opctx.summary)
1035
1036 assert not opctx.jobdeps, "Not all dependencies were removed"
1037
1038 queue.release()
1039 try:
1040 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1041 finally:
1042 queue.acquire(shared=1)
1043
1044 op.status = op_status
1045 op.result = op_result
1046
1047 assert not waitjob
1048
1049 if op.status in (constants.OP_STATUS_WAITING,
1050 constants.OP_STATUS_QUEUED):
1051
1052
1053 assert not op.end_timestamp
1054 else:
1055
1056 op.end_timestamp = TimeStampNow()
1057
1058 if op.status == constants.OP_STATUS_CANCELING:
1059 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1060 for i in job.ops[opctx.index:])
1061 else:
1062 assert op.status in constants.OPS_FINALIZED
1063
1064 if op.status == constants.OP_STATUS_QUEUED:
1065
1066 assert not waitjob
1067
1068 finalize = False
1069
1070
1071 job.cur_opctx = None
1072
1073
1074 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1075
1076 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1077 finalize = False
1078
1079 if not waitjob and opctx.CheckPriorityIncrease():
1080
1081 queue.UpdateJobUnlocked(job)
1082
1083
1084 job.cur_opctx = opctx
1085
1086 assert (op.priority <= constants.OP_PRIO_LOWEST and
1087 op.priority >= constants.OP_PRIO_HIGHEST)
1088
1089
1090 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1091
1092 else:
1093
1094 assert (opctx.index == 0 or
1095 compat.all(i.status == constants.OP_STATUS_SUCCESS
1096 for i in job.ops[:opctx.index]))
1097
1098
1099 job.cur_opctx = None
1100
1101 if op.status == constants.OP_STATUS_SUCCESS:
1102 finalize = False
1103
1104 elif op.status == constants.OP_STATUS_ERROR:
1105
1106
1107
1108
1109 to_encode = errors.OpExecError("Preceding opcode failed")
1110 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1111 _EncodeOpError(to_encode))
1112 finalize = True
1113 elif op.status == constants.OP_STATUS_CANCELING:
1114 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1115 "Job canceled by request")
1116 finalize = True
1117
1118 else:
1119 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1120
1121 if opctx.index == (opcount - 1):
1122
1123 finalize = True
1124
1125 if finalize:
1126
1127 job.Finalize()
1128
1129
1130
1131 queue.UpdateJobUnlocked(job)
1132
1133 assert not waitjob
1134
1135 if finalize:
1136 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1137 return self.FINISHED
1138
1139 assert not waitjob or queue.depmgr.JobWaiting(job)
1140
1141 if waitjob:
1142 return self.WAITDEP
1143 else:
1144 return self.DEFER
1145 finally:
1146 assert job.writable, "Job became read-only while being processed"
1147 queue.release()
1148
1171
1174 """The actual job workers.
1175
1176 """
1178 """Job executor.
1179
1180 @type job: L{_QueuedJob}
1181 @param job: the job to be processed
1182
1183 """
1184 assert job.writable, "Expected writable job"
1185
1186
1187
1188
1189 job.processor_lock.acquire()
1190 try:
1191 return self._RunTaskInner(job)
1192 finally:
1193 job.processor_lock.release()
1194
1215
1216 @staticmethod
1218 """Updates the worker thread name to include a short summary of the opcode.
1219
1220 @param setname_fn: Callable setting worker thread name
1221 @param execop_fn: Callable for executing opcode (usually
1222 L{mcpu.Processor.ExecOpCode})
1223
1224 """
1225 setname_fn(op)
1226 try:
1227 return execop_fn(op, *args, **kwargs)
1228 finally:
1229 setname_fn(None)
1230
1231 @staticmethod
1233 """Sets the worker thread name.
1234
1235 @type job: L{_QueuedJob}
1236 @type op: L{opcodes.OpCode}
1237
1238 """
1239 parts = ["Job%s" % job.id]
1240
1241 if op:
1242 parts.append(op.TinySummary())
1243
1244 return "/".join(parts)
1245
1248 """Simple class implementing a job-processing workerpool.
1249
1250 """
1256
1259 """Keeps track of job dependencies.
1260
1261 """
1262 (WAIT,
1263 ERROR,
1264 CANCEL,
1265 CONTINUE,
1266 WRONGSTATUS) = range(1, 6)
1267
1268 - def __init__(self, getstatus_fn, enqueue_fn):
1269 """Initializes this class.
1270
1271 """
1272 self._getstatus_fn = getstatus_fn
1273 self._enqueue_fn = enqueue_fn
1274
1275 self._waiters = {}
1276 self._lock = locking.SharedLock("JobDepMgr")
1277
1278 @locking.ssynchronized(_LOCK, shared=1)
1280 """Retrieves information about waiting jobs.
1281
1282 @type requested: set
1283 @param requested: Requested information, see C{query.LQ_*}
1284
1285 """
1286
1287
1288
1289 return [("job/%s" % job_id, None, None,
1290 [("job", [job.id for job in waiters])])
1291 for job_id, waiters in self._waiters.items()
1292 if waiters]
1293
1294 @locking.ssynchronized(_LOCK, shared=1)
1296 """Checks if a job is waiting.
1297
1298 """
1299 return compat.any(job in jobs
1300 for jobs in self._waiters.values())
1301
1302 @locking.ssynchronized(_LOCK)
1304 """Checks if a dependency job has the requested status.
1305
1306 If the other job is not yet in a finalized status, the calling job will be
1307 notified (re-added to the workerpool) at a later point.
1308
1309 @type job: L{_QueuedJob}
1310 @param job: Job object
1311 @type dep_job_id: int
1312 @param dep_job_id: ID of dependency job
1313 @type dep_status: list
1314 @param dep_status: Required status
1315
1316 """
1317 assert ht.TJobId(job.id)
1318 assert ht.TJobId(dep_job_id)
1319 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1320
1321 if job.id == dep_job_id:
1322 return (self.ERROR, "Job can't depend on itself")
1323
1324
1325 try:
1326 status = self._getstatus_fn(dep_job_id)
1327 except errors.JobLost, err:
1328 return (self.ERROR, "Dependency error: %s" % err)
1329
1330 assert status in constants.JOB_STATUS_ALL
1331
1332 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1333
1334 if status not in constants.JOBS_FINALIZED:
1335
1336 job_id_waiters.add(job)
1337 return (self.WAIT,
1338 "Need to wait for job %s, wanted status '%s'" %
1339 (dep_job_id, dep_status))
1340
1341
1342 if job in job_id_waiters:
1343 job_id_waiters.remove(job)
1344
1345 if (status == constants.JOB_STATUS_CANCELED and
1346 constants.JOB_STATUS_CANCELED not in dep_status):
1347 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1348
1349 elif not dep_status or status in dep_status:
1350 return (self.CONTINUE,
1351 "Dependency job %s finished with status '%s'" %
1352 (dep_job_id, status))
1353
1354 else:
1355 return (self.WRONGSTATUS,
1356 "Dependency job %s finished with status '%s',"
1357 " not one of '%s' as required" %
1358 (dep_job_id, status, utils.CommaJoin(dep_status)))
1359
1361 """Remove all jobs without actual waiters.
1362
1363 """
1364 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1365 if not waiters]:
1366 del self._waiters[job_id]
1367
1369 """Notifies all jobs waiting for a certain job ID.
1370
1371 @attention: Do not call until L{CheckAndRegister} returned a status other
1372 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1373 @type job_id: int
1374 @param job_id: Job ID
1375
1376 """
1377 assert ht.TJobId(job_id)
1378
1379 self._lock.acquire()
1380 try:
1381 self._RemoveEmptyWaitersUnlocked()
1382
1383 jobs = self._waiters.pop(job_id, None)
1384 finally:
1385 self._lock.release()
1386
1387 if jobs:
1388
1389 logging.debug("Re-adding %s jobs which were waiting for job %s",
1390 len(jobs), job_id)
1391 self._enqueue_fn(jobs)
1392
1395 """Queue used to manage the jobs.
1396
1397 """
1399 """Constructor for JobQueue.
1400
1401 The constructor will initialize the job queue object and then
1402 start loading the current jobs from disk, either for starting them
1403 (if they were queue) or for aborting them (if they were already
1404 running).
1405
1406 @type context: GanetiContext
1407 @param context: the context object for access to the configuration
1408 data and other ganeti objects
1409
1410 """
1411 self.primary_jid = None
1412 self.context = context
1413 self._memcache = weakref.WeakValueDictionary()
1414 self._my_hostname = netutils.Hostname.GetSysName()
1415
1416
1417
1418
1419
1420
1421 self._lock = locking.SharedLock("JobQueue")
1422
1423 self.acquire = self._lock.acquire
1424 self.release = self._lock.release
1425
1426
1427 self._last_serial = jstore.ReadSerial()
1428 assert self._last_serial is not None, ("Serial file was modified between"
1429 " check in jstore and here")
1430
1431
1432 self._nodes = dict((n.name, n.primary_ip)
1433 for n in cfg.GetAllNodesInfo().values()
1434 if n.master_candidate)
1435
1436
1437 self._nodes.pop(self._my_hostname, None)
1438
1439
1440
1441 self._queue_size = None
1442 self._UpdateQueueSizeUnlocked()
1443 assert ht.TInt(self._queue_size)
1444
1445
1446 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1447 self._EnqueueJobs)
1448
1449
1450 self._wpool = _JobQueueWorkerPool(self)
1451
1453 """Load a job from the job queue
1454
1455 Pick up a job that already is in the job queue and start/resume it.
1456
1457 """
1458 if self.primary_jid:
1459 logging.warning("Job process asked to pick up %s, but already has %s",
1460 job_id, self.primary_jid)
1461
1462 self.primary_jid = int(job_id)
1463
1464 job = self._LoadJobUnlocked(job_id)
1465
1466 if job is None:
1467 logging.warning("Job %s could not be read", job_id)
1468 return
1469
1470 job.AddReasons(pickup=True)
1471
1472 status = job.CalcStatus()
1473 if status == constants.JOB_STATUS_QUEUED:
1474 job.SetPid(os.getpid())
1475 self._EnqueueJobsUnlocked([job])
1476 logging.info("Restarting job %s", job.id)
1477
1478 elif status in (constants.JOB_STATUS_RUNNING,
1479 constants.JOB_STATUS_WAITING,
1480 constants.JOB_STATUS_CANCELING):
1481 logging.warning("Unfinished job %s found: %s", job.id, job)
1482
1483 if status == constants.JOB_STATUS_WAITING:
1484 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1485 job.SetPid(os.getpid())
1486 self._EnqueueJobsUnlocked([job])
1487 logging.info("Restarting job %s", job.id)
1488 else:
1489 to_encode = errors.OpExecError("Unclean master daemon shutdown")
1490 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1491 _EncodeOpError(to_encode))
1492 job.Finalize()
1493
1494 self.UpdateJobUnlocked(job)
1495
1496 @locking.ssynchronized(_LOCK)
1499
1501 """Gets RPC runner with context.
1502
1503 """
1504 return rpc.JobQueueRunner(self.context, address_list)
1505
1506 @locking.ssynchronized(_LOCK)
1508 """Register a new node with the queue.
1509
1510 @type node: L{objects.Node}
1511 @param node: the node object to be added
1512
1513 """
1514 node_name = node.name
1515 assert node_name != self._my_hostname
1516
1517
1518 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1519 msg = result.fail_msg
1520 if msg:
1521 logging.warning("Cannot cleanup queue directory on node %s: %s",
1522 node_name, msg)
1523
1524 if not node.master_candidate:
1525
1526 self._nodes.pop(node_name, None)
1527
1528 return
1529
1530
1531 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1532
1533
1534 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1535
1536
1537 addrs = [node.primary_ip]
1538
1539 for file_name in files:
1540
1541 content = utils.ReadFile(file_name)
1542
1543 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1544 file_name, content)
1545 msg = result[node_name].fail_msg
1546 if msg:
1547 logging.error("Failed to upload file %s to node %s: %s",
1548 file_name, node_name, msg)
1549
1550 msg = result[node_name].fail_msg
1551 if msg:
1552 logging.error("Failed to set queue drained flag on node %s: %s",
1553 node_name, msg)
1554
1555 self._nodes[node_name] = node.primary_ip
1556
1557 @locking.ssynchronized(_LOCK)
1559 """Callback called when removing nodes from the cluster.
1560
1561 @type node_name: str
1562 @param node_name: the name of the node to remove
1563
1564 """
1565 self._nodes.pop(node_name, None)
1566
1567 @staticmethod
1569 """Verifies the status of an RPC call.
1570
1571 Since we aim to keep consistency should this node (the current
1572 master) fail, we will log errors if our rpc fail, and especially
1573 log the case when more than half of the nodes fails.
1574
1575 @param result: the data as returned from the rpc call
1576 @type nodes: list
1577 @param nodes: the list of nodes we made the call to
1578 @type failmsg: str
1579 @param failmsg: the identifier to be used for logging
1580
1581 """
1582 failed = []
1583 success = []
1584
1585 for node in nodes:
1586 msg = result[node].fail_msg
1587 if msg:
1588 failed.append(node)
1589 logging.error("RPC call %s (%s) failed on node %s: %s",
1590 result[node].call, failmsg, node, msg)
1591 else:
1592 success.append(node)
1593
1594
1595 if (len(success) + 1) < len(failed):
1596
1597 logging.error("More than half of the nodes failed")
1598
1600 """Helper for returning the node name/ip list.
1601
1602 @rtype: (list, list)
1603 @return: a tuple of two lists, the first one with the node
1604 names and the second one with the node addresses
1605
1606 """
1607
1608 name_list = self._nodes.keys()
1609 addr_list = [self._nodes[name] for name in name_list]
1610 return name_list, addr_list
1611
1613 """Writes a file locally and then replicates it to all nodes.
1614
1615 This function will replace the contents of a file on the local
1616 node and then replicate it to all the other nodes we have.
1617
1618 @type file_name: str
1619 @param file_name: the path of the file to be replicated
1620 @type data: str
1621 @param data: the new contents of the file
1622 @type replicate: boolean
1623 @param replicate: whether to spread the changes to the remote nodes
1624
1625 """
1626 getents = runtime.GetEnts()
1627 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1628 gid=getents.daemons_gid,
1629 mode=constants.JOB_QUEUE_FILES_PERMS)
1630
1631 if replicate:
1632 names, addrs = self._GetNodeIp()
1633 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1634 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1635
1637 """Renames a file locally and then replicate the change.
1638
1639 This function will rename a file in the local queue directory
1640 and then replicate this rename to all the other nodes we have.
1641
1642 @type rename: list of (old, new)
1643 @param rename: List containing tuples mapping old to new names
1644
1645 """
1646
1647 for old, new in rename:
1648 utils.RenameFile(old, new, mkdir=True)
1649
1650
1651 names, addrs = self._GetNodeIp()
1652 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1653 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1654
1655 @staticmethod
1657 """Returns the job file for a given job id.
1658
1659 @type job_id: str
1660 @param job_id: the job identifier
1661 @rtype: str
1662 @return: the path to the job file
1663
1664 """
1665 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1666
1667 @staticmethod
1669 """Returns the archived job file for a give job id.
1670
1671 @type job_id: str
1672 @param job_id: the job identifier
1673 @rtype: str
1674 @return: the path to the archived job file
1675
1676 """
1677 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1678 jstore.GetArchiveDirectory(job_id),
1679 "job-%s" % job_id)
1680
1681 @staticmethod
1683 """Build list of directories containing job files.
1684
1685 @type archived: bool
1686 @param archived: Whether to include directories for archived jobs
1687 @rtype: list
1688
1689 """
1690 result = [pathutils.QUEUE_DIR]
1691
1692 if archived:
1693 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1694 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1695 utils.ListVisibleFiles(archive_path)))
1696
1697 return result
1698
1699 @classmethod
1701 """Return all known job IDs.
1702
1703 The method only looks at disk because it's a requirement that all
1704 jobs are present on disk (so in the _memcache we don't have any
1705 extra IDs).
1706
1707 @type sort: boolean
1708 @param sort: perform sorting on the returned job ids
1709 @rtype: list
1710 @return: the list of job IDs
1711
1712 """
1713 jlist = []
1714
1715 for path in cls._DetermineJobDirectories(archived):
1716 for filename in utils.ListVisibleFiles(path):
1717 m = constants.JOB_FILE_RE.match(filename)
1718 if m:
1719 jlist.append(int(m.group(1)))
1720
1721 if sort:
1722 jlist.sort()
1723 return jlist
1724
1726 """Loads a job from the disk or memory.
1727
1728 Given a job id, this will return the cached job object if
1729 existing, or try to load the job from the disk. If loading from
1730 disk, it will also add the job to the cache.
1731
1732 @type job_id: int
1733 @param job_id: the job id
1734 @rtype: L{_QueuedJob} or None
1735 @return: either None or the job object
1736
1737 """
1738 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1739
1740 job = self._memcache.get(job_id, None)
1741 if job:
1742 logging.debug("Found job %s in memcache", job_id)
1743 assert job.writable, "Found read-only job in memcache"
1744 return job
1745
1746 try:
1747 job = self._LoadJobFromDisk(job_id, False)
1748 if job is None:
1749 return job
1750 except errors.JobFileCorrupted:
1751 old_path = self._GetJobPath(job_id)
1752 new_path = self._GetArchivedJobPath(job_id)
1753 if old_path == new_path:
1754
1755 logging.exception("Can't parse job %s", job_id)
1756 else:
1757
1758 logging.exception("Can't parse job %s, will archive.", job_id)
1759 self._RenameFilesUnlocked([(old_path, new_path)])
1760 return None
1761
1762 assert job.writable, "Job just loaded is not writable"
1763
1764 self._memcache[job_id] = job
1765 logging.debug("Added job %s to the cache", job_id)
1766 return job
1767
1769 """Load the given job file from disk.
1770
1771 Given a job file, read, load and restore it in a _QueuedJob format.
1772
1773 @type job_id: int
1774 @param job_id: job identifier
1775 @type try_archived: bool
1776 @param try_archived: Whether to try loading an archived job
1777 @rtype: L{_QueuedJob} or None
1778 @return: either None or the job object
1779
1780 """
1781 path_functions = [(self._GetJobPath, False)]
1782
1783 if try_archived:
1784 path_functions.append((self._GetArchivedJobPath, True))
1785
1786 raw_data = None
1787 archived = None
1788
1789 for (fn, archived) in path_functions:
1790 filepath = fn(job_id)
1791 logging.debug("Loading job from %s", filepath)
1792 try:
1793 raw_data = utils.ReadFile(filepath)
1794 except EnvironmentError, err:
1795 if err.errno != errno.ENOENT:
1796 raise
1797 else:
1798 break
1799
1800 if not raw_data:
1801 logging.debug("No data available for job %s", job_id)
1802 if int(job_id) == self.primary_jid:
1803 logging.warning("My own job file (%s) disappeared;"
1804 " this should only happy at cluster desctruction",
1805 job_id)
1806 if mcpu.lusExecuting[0] == 0:
1807 logging.warning("Not in execution; cleaning up myself due to missing"
1808 " job file")
1809 logging.shutdown()
1810 os._exit(1)
1811 return None
1812
1813 if writable is None:
1814 writable = not archived
1815
1816 try:
1817 data = serializer.LoadJson(raw_data)
1818 job = _QueuedJob.Restore(self, data, writable, archived)
1819 except Exception, err:
1820 raise errors.JobFileCorrupted(err)
1821
1822 return job
1823
1825 """Load the given job file from disk.
1826
1827 Given a job file, read, load and restore it in a _QueuedJob format.
1828 In case of error reading the job, it gets returned as None, and the
1829 exception is logged.
1830
1831 @type job_id: int
1832 @param job_id: job identifier
1833 @type try_archived: bool
1834 @param try_archived: Whether to try loading an archived job
1835 @rtype: L{_QueuedJob} or None
1836 @return: either None or the job object
1837
1838 """
1839 try:
1840 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1841 except (errors.JobFileCorrupted, EnvironmentError):
1842 logging.exception("Can't load/parse job %s", job_id)
1843 return None
1844
1846 """Update the queue size.
1847
1848 """
1849 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1850
1851 @classmethod
1857
1858 @staticmethod
1865
1866 @staticmethod
1868 """Resolves relative job IDs in dependencies.
1869
1870 @type resolve_fn: callable
1871 @param resolve_fn: Function to resolve a relative job ID
1872 @type deps: list
1873 @param deps: Dependencies
1874 @rtype: tuple; (boolean, string or list)
1875 @return: If successful (first tuple item), the returned list contains
1876 resolved job IDs along with the requested status; if not successful,
1877 the second element is an error message
1878
1879 """
1880 result = []
1881
1882 for (dep_job_id, dep_status) in deps:
1883 if ht.TRelativeJobId(dep_job_id):
1884 assert ht.TInt(dep_job_id) and dep_job_id < 0
1885 try:
1886 job_id = resolve_fn(dep_job_id)
1887 except IndexError:
1888
1889 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
1890 else:
1891 job_id = dep_job_id
1892
1893 result.append((job_id, dep_status))
1894
1895 return (True, result)
1896
1897 @locking.ssynchronized(_LOCK)
1899 """Helper function to add jobs to worker pool's queue.
1900
1901 @type jobs: list
1902 @param jobs: List of all jobs
1903
1904 """
1905 return self._EnqueueJobsUnlocked(jobs)
1906
1908 """Helper function to add jobs to worker pool's queue.
1909
1910 @type jobs: list
1911 @param jobs: List of all jobs
1912
1913 """
1914 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
1915 self._wpool.AddManyTasks([(job, ) for job in jobs],
1916 priority=[job.CalcPriority() for job in jobs],
1917 task_id=map(_GetIdAttr, jobs))
1918
1920 """Gets the status of a job for dependencies.
1921
1922 @type job_id: int
1923 @param job_id: Job ID
1924 @raise errors.JobLost: If job can't be found
1925
1926 """
1927
1928
1929
1930 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1931
1932 if job:
1933 assert not job.writable, "Got writable job"
1934
1935 if job:
1936 return job.CalcStatus()
1937
1938 raise errors.JobLost("Job %s not found" % job_id)
1939
1941 """Update a job's on disk storage.
1942
1943 After a job has been modified, this function needs to be called in
1944 order to write the changes to disk and replicate them to the other
1945 nodes.
1946
1947 @type job: L{_QueuedJob}
1948 @param job: the changed job
1949 @type replicate: boolean
1950 @param replicate: whether to replicate the change to remote nodes
1951
1952 """
1953 if __debug__:
1954 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1955 assert (finalized ^ (job.end_timestamp is None))
1956 assert job.writable, "Can't update read-only job"
1957 assert not job.archived, "Can't update archived job"
1958
1959 filename = self._GetJobPath(job.id)
1960 data = serializer.DumpJson(job.Serialize())
1961 logging.debug("Writing job %s to %s", job.id, filename)
1962 self._UpdateJobQueueFile(filename, data, replicate)
1963
1965 """Checks if a job has been finalized.
1966
1967 @type job_id: int
1968 @param job_id: Job identifier
1969 @rtype: boolean
1970 @return: True if the job has been finalized,
1971 False if the timeout has been reached,
1972 None if the job doesn't exist
1973
1974 """
1975 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1976 if job is not None:
1977 return job.CalcStatus() in constants.JOBS_FINALIZED
1978 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed:
1979
1980
1981
1982 return True
1983 else:
1984 return None
1985
1986 @locking.ssynchronized(_LOCK)
1988 """Cancels a job.
1989
1990 This will only succeed if the job has not started yet.
1991
1992 @type job_id: int
1993 @param job_id: job ID of job to be cancelled.
1994
1995 """
1996 logging.info("Cancelling job %s", job_id)
1997
1998 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1999
2000 @locking.ssynchronized(_LOCK)
2002 """Changes a job's priority.
2003
2004 @type job_id: int
2005 @param job_id: ID of the job whose priority should be changed
2006 @type priority: int
2007 @param priority: New priority
2008
2009 """
2010 logging.info("Changing priority of job %s to %s", job_id, priority)
2011
2012 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2013 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2014 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2015 (priority, allowed))
2016
2017 def fn(job):
2018 (success, msg) = job.ChangePriority(priority)
2019
2020 if success:
2021 try:
2022 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2023 except workerpool.NoSuchTask:
2024 logging.debug("Job %s is not in workerpool at this time", job.id)
2025
2026 return (success, msg)
2027
2028 return self._ModifyJobUnlocked(job_id, fn)
2029
2031 """Modifies a job.
2032
2033 @type job_id: int
2034 @param job_id: Job ID
2035 @type mod_fn: callable
2036 @param mod_fn: Modifying function, receiving job object as parameter,
2037 returning tuple of (status boolean, message string)
2038
2039 """
2040 job = self._LoadJobUnlocked(job_id)
2041 if not job:
2042 logging.debug("Job %s not found", job_id)
2043 return (False, "Job %s not found" % job_id)
2044
2045 assert job.writable, "Can't modify read-only job"
2046 assert not job.archived, "Can't modify archived job"
2047
2048 (success, msg) = mod_fn(job)
2049
2050 if success:
2051
2052
2053 self.UpdateJobUnlocked(job)
2054
2055 return (success, msg)
2056
2058 """Archives jobs.
2059
2060 @type jobs: list of L{_QueuedJob}
2061 @param jobs: Job objects
2062 @rtype: int
2063 @return: Number of archived jobs
2064
2065 """
2066 archive_jobs = []
2067 rename_files = []
2068 for job in jobs:
2069 assert job.writable, "Can't archive read-only job"
2070 assert not job.archived, "Can't cancel archived job"
2071
2072 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2073 logging.debug("Job %s is not yet done", job.id)
2074 continue
2075
2076 archive_jobs.append(job)
2077
2078 old = self._GetJobPath(job.id)
2079 new = self._GetArchivedJobPath(job.id)
2080 rename_files.append((old, new))
2081
2082
2083 self._RenameFilesUnlocked(rename_files)
2084
2085 logging.debug("Successfully archived job(s) %s",
2086 utils.CommaJoin(job.id for job in archive_jobs))
2087
2088
2089
2090
2091
2092 self._UpdateQueueSizeUnlocked()
2093 return len(archive_jobs)
2094
2095 - def _Query(self, fields, qfilter):
2096 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2097 namefield="id")
2098
2099
2100
2101
2102 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2103
2104 job_ids = qobj.RequestedNames()
2105
2106 list_all = (job_ids is None)
2107
2108 if list_all:
2109
2110
2111 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2112
2113 jobs = []
2114
2115 for job_id in job_ids:
2116 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2117 if job is not None or not list_all:
2118 jobs.append((job_id, job))
2119
2120 return (qobj, jobs, list_all)
2121
2123 """Returns a list of jobs in queue.
2124
2125 @type fields: sequence
2126 @param fields: List of wanted fields
2127 @type qfilter: None or query2 filter (list)
2128 @param qfilter: Query filter
2129
2130 """
2131 (qobj, ctx, _) = self._Query(fields, qfilter)
2132
2133 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2134
2136 """Returns a list of jobs in queue.
2137
2138 @type job_ids: list
2139 @param job_ids: sequence of job identifiers or None for all
2140 @type fields: list
2141 @param fields: names of fields to return
2142 @rtype: list
2143 @return: list one element per job, each element being list with
2144 the requested fields
2145
2146 """
2147
2148 job_ids = [int(jid) for jid in job_ids]
2149 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2150
2151 (qobj, ctx, _) = self._Query(fields, qfilter)
2152
2153 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2154
2155 @locking.ssynchronized(_LOCK)
2157 """Prepare to stop the job queue.
2158
2159 Returns whether there are any jobs currently running. If the latter is the
2160 case, the job queue is not yet ready for shutdown. Once this function
2161 returns C{True} L{Shutdown} can be called without interfering with any job.
2162
2163 @rtype: bool
2164 @return: Whether there are any running jobs
2165
2166 """
2167 return self._wpool.HasRunningTasks()
2168
2169 @locking.ssynchronized(_LOCK)
2171 """Stops the job queue.
2172
2173 This shutdowns all the worker threads an closes the queue.
2174
2175 """
2176 self._wpool.TerminateWorkers()
2177