1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the job queue handling.
32
33 Locking: there's a single, large lock in the L{JobQueue} class. It's
34 used by all other classes in this module.
35
36 @var JOBQUEUE_THREADS: the number of worker threads we start for
37 processing jobs
38
39 """
40
41 import logging
42 import errno
43 import time
44 import weakref
45 import threading
46 import itertools
47 import operator
48 import os
49
50 try:
51
52 from pyinotify import pyinotify
53 except ImportError:
54 import pyinotify
55
56 from ganeti import asyncnotifier
57 from ganeti import constants
58 from ganeti import serializer
59 from ganeti import workerpool
60 from ganeti import locking
61 from ganeti import luxi
62 from ganeti import opcodes
63 from ganeti import opcodes_base
64 from ganeti import errors
65 from ganeti import mcpu
66 from ganeti import utils
67 from ganeti import jstore
68 import ganeti.rpc.node as rpc
69 from ganeti import runtime
70 from ganeti import netutils
71 from ganeti import compat
72 from ganeti import ht
73 from ganeti import query
74 from ganeti import qlang
75 from ganeti import pathutils
76 from ganeti import vcluster
77 from ganeti.cmdlib import cluster
78
79
80 JOBQUEUE_THREADS = 1
81
82
83 _LOCK = "_lock"
84 _QUEUE = "_queue"
85
86
87 _GetIdAttr = operator.attrgetter("id")
91 """Special exception to cancel a job.
92
93 """
94
97 """Returns the current timestamp.
98
99 @rtype: tuple
100 @return: the current time in the (seconds, microseconds) format
101
102 """
103 return utils.SplitTime(time.time())
104
112
115 """Encapsulates an opcode object.
116
117 @ivar log: holds the execution log and consists of tuples
118 of the form C{(log_serial, timestamp, level, message)}
119 @ivar input: the OpCode we encapsulate
120 @ivar status: the current status
121 @ivar result: the result of the LU execution
122 @ivar start_timestamp: timestamp for the start of the execution
123 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
124 @ivar stop_timestamp: timestamp for the end of the execution
125
126 """
127 __slots__ = ["input", "status", "result", "log", "priority",
128 "start_timestamp", "exec_timestamp", "end_timestamp",
129 "__weakref__"]
130
132 """Initializes instances of this class.
133
134 @type op: L{opcodes.OpCode}
135 @param op: the opcode we encapsulate
136
137 """
138 self.input = op
139 self.status = constants.OP_STATUS_QUEUED
140 self.result = None
141 self.log = []
142 self.start_timestamp = None
143 self.exec_timestamp = None
144 self.end_timestamp = None
145
146
147 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
148
149 @classmethod
151 """Restore the _QueuedOpCode from the serialized form.
152
153 @type state: dict
154 @param state: the serialized state
155 @rtype: _QueuedOpCode
156 @return: a new _QueuedOpCode instance
157
158 """
159 obj = _QueuedOpCode.__new__(cls)
160 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
161 obj.status = state["status"]
162 obj.result = state["result"]
163 obj.log = state["log"]
164 obj.start_timestamp = state.get("start_timestamp", None)
165 obj.exec_timestamp = state.get("exec_timestamp", None)
166 obj.end_timestamp = state.get("end_timestamp", None)
167 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
168 return obj
169
171 """Serializes this _QueuedOpCode.
172
173 @rtype: dict
174 @return: the dictionary holding the serialized state
175
176 """
177 return {
178 "input": self.input.__getstate__(),
179 "status": self.status,
180 "result": self.result,
181 "log": self.log,
182 "start_timestamp": self.start_timestamp,
183 "exec_timestamp": self.exec_timestamp,
184 "end_timestamp": self.end_timestamp,
185 "priority": self.priority,
186 }
187
190 """In-memory job representation.
191
192 This is what we use to track the user-submitted jobs. Locking must
193 be taken care of by users of this class.
194
195 @type queue: L{JobQueue}
196 @ivar queue: the parent queue
197 @ivar id: the job ID
198 @type ops: list
199 @ivar ops: the list of _QueuedOpCode that constitute the job
200 @type log_serial: int
201 @ivar log_serial: holds the index for the next log entry
202 @ivar received_timestamp: the timestamp for when the job was received
203 @ivar start_timestmap: the timestamp for start of execution
204 @ivar end_timestamp: the timestamp for end of execution
205 @ivar writable: Whether the job is allowed to be modified
206
207 """
208
209 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
210 "received_timestamp", "start_timestamp", "end_timestamp",
211 "processor_lock", "writable", "archived",
212 "livelock", "process_id",
213 "__weakref__"]
214
216 """Extend the reason trail
217
218 Add the reason for all the opcodes of this job to be executed.
219
220 """
221 count = 0
222 for queued_op in self.ops:
223 op = queued_op.input
224 if pickup:
225 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
226 else:
227 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
228 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
229 reason_src_prefix)
230 reason_text = "job=%d;index=%d" % (self.id, count)
231 reason = getattr(op, "reason", [])
232 reason.append((reason_src, reason_text, utils.EpochNano()))
233 op.reason = reason
234 count = count + 1
235
236 - def __init__(self, queue, job_id, ops, writable):
237 """Constructor for the _QueuedJob.
238
239 @type queue: L{JobQueue}
240 @param queue: our parent queue
241 @type job_id: job_id
242 @param job_id: our job id
243 @type ops: list
244 @param ops: the list of opcodes we hold, which will be encapsulated
245 in _QueuedOpCodes
246 @type writable: bool
247 @param writable: Whether job can be modified
248
249 """
250 if not ops:
251 raise errors.GenericError("A job needs at least one opcode")
252
253 self.queue = queue
254 self.id = int(job_id)
255 self.ops = [_QueuedOpCode(op) for op in ops]
256 self.AddReasons()
257 self.log_serial = 0
258 self.received_timestamp = TimeStampNow()
259 self.start_timestamp = None
260 self.end_timestamp = None
261 self.archived = False
262 self.livelock = None
263 self.process_id = None
264
265 self._InitInMemory(self, writable)
266
267 assert not self.archived, "New jobs can not be marked as archived"
268
269 @staticmethod
271 """Initializes in-memory variables.
272
273 """
274 obj.writable = writable
275 obj.ops_iter = None
276 obj.cur_opctx = None
277
278
279 if writable:
280 obj.processor_lock = threading.Lock()
281 else:
282 obj.processor_lock = None
283
285 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
286 "id=%s" % self.id,
287 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
288
289 return "<%s at %#x>" % (" ".join(status), id(self))
290
291 @classmethod
292 - def Restore(cls, queue, state, writable, archived):
293 """Restore a _QueuedJob from serialized state:
294
295 @type queue: L{JobQueue}
296 @param queue: to which queue the restored job belongs
297 @type state: dict
298 @param state: the serialized state
299 @type writable: bool
300 @param writable: Whether job can be modified
301 @type archived: bool
302 @param archived: Whether job was already archived
303 @rtype: _JobQueue
304 @return: the restored _JobQueue instance
305
306 """
307 obj = _QueuedJob.__new__(cls)
308 obj.queue = queue
309 obj.id = int(state["id"])
310 obj.received_timestamp = state.get("received_timestamp", None)
311 obj.start_timestamp = state.get("start_timestamp", None)
312 obj.end_timestamp = state.get("end_timestamp", None)
313 obj.archived = archived
314 obj.livelock = state.get("livelock", None)
315 obj.process_id = state.get("process_id", None)
316 if obj.process_id is not None:
317 obj.process_id = int(obj.process_id)
318
319 obj.ops = []
320 obj.log_serial = 0
321 for op_state in state["ops"]:
322 op = _QueuedOpCode.Restore(op_state)
323 for log_entry in op.log:
324 obj.log_serial = max(obj.log_serial, log_entry[0])
325 obj.ops.append(op)
326
327 cls._InitInMemory(obj, writable)
328
329 return obj
330
332 """Serialize the _JobQueue instance.
333
334 @rtype: dict
335 @return: the serialized state
336
337 """
338 return {
339 "id": self.id,
340 "ops": [op.Serialize() for op in self.ops],
341 "start_timestamp": self.start_timestamp,
342 "end_timestamp": self.end_timestamp,
343 "received_timestamp": self.received_timestamp,
344 "livelock": self.livelock,
345 "process_id": self.process_id,
346 }
347
400
402 """Gets the current priority for this job.
403
404 Only unfinished opcodes are considered. When all are done, the default
405 priority is used.
406
407 @rtype: int
408
409 """
410 priorities = [op.priority for op in self.ops
411 if op.status not in constants.OPS_FINALIZED]
412
413 if not priorities:
414
415 return constants.OP_PRIO_DEFAULT
416
417 return min(priorities)
418
420 """Selectively returns the log entries.
421
422 @type newer_than: None or int
423 @param newer_than: if this is None, return all log entries,
424 otherwise return only the log entries with serial higher
425 than this value
426 @rtype: list
427 @return: the list of the log entries selected
428
429 """
430 if newer_than is None:
431 serial = -1
432 else:
433 serial = newer_than
434
435 entries = []
436 for op in self.ops:
437 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
438
439 return entries
440
442 """Mark unfinished opcodes with a given status and result.
443
444 This is an utility function for marking all running or waiting to
445 be run opcodes with a given status. Opcodes which are already
446 finalised are not changed.
447
448 @param status: a given opcode status
449 @param result: the opcode result
450
451 """
452 not_marked = True
453 for op in self.ops:
454 if op.status in constants.OPS_FINALIZED:
455 assert not_marked, "Finalized opcodes found after non-finalized ones"
456 continue
457 op.status = status
458 op.result = result
459 not_marked = False
460
462 """Marks the job as finalized.
463
464 """
465 self.end_timestamp = TimeStampNow()
466
491
493 """Changes the job priority.
494
495 @type priority: int
496 @param priority: New priority
497 @rtype: tuple; (bool, string)
498 @return: Boolean describing whether job's priority was successfully changed
499 and a text message
500
501 """
502 status = self.CalcStatus()
503
504 if status in constants.JOBS_FINALIZED:
505 return (False, "Job %s is finished" % self.id)
506 elif status == constants.JOB_STATUS_CANCELING:
507 return (False, "Job %s is cancelling" % self.id)
508 else:
509 assert status in (constants.JOB_STATUS_QUEUED,
510 constants.JOB_STATUS_WAITING,
511 constants.JOB_STATUS_RUNNING)
512
513 changed = False
514 for op in self.ops:
515 if (op.status == constants.OP_STATUS_RUNNING or
516 op.status in constants.OPS_FINALIZED):
517 assert not changed, \
518 ("Found opcode for which priority should not be changed after"
519 " priority has been changed for previous opcodes")
520 continue
521
522 assert op.status in (constants.OP_STATUS_QUEUED,
523 constants.OP_STATUS_WAITING)
524
525 changed = True
526
527
528 op.priority = priority
529
530 if changed:
531 return (True, ("Priorities of pending opcodes for job %s have been"
532 " changed to %s" % (self.id, priority)))
533 else:
534 return (False, "Job %s had no pending opcodes" % self.id)
535
537 """Sets the job's process ID
538
539 @type pid: int
540 @param pid: the process ID
541
542 """
543 status = self.CalcStatus()
544
545 if status in (constants.JOB_STATUS_QUEUED,
546 constants.JOB_STATUS_WAITING):
547 if self.process_id is not None:
548 logging.warning("Replacing the process id %s of job %s with %s",
549 self.process_id, self.id, pid)
550 self.process_id = pid
551 else:
552 logging.warning("Can set pid only for queued/waiting jobs")
553
556
558 """Initializes this class.
559
560 @type queue: L{JobQueue}
561 @param queue: Job queue
562 @type job: L{_QueuedJob}
563 @param job: Job object
564 @type op: L{_QueuedOpCode}
565 @param op: OpCode
566
567 """
568 super(_OpExecCallbacks, self).__init__()
569
570 assert queue, "Queue is missing"
571 assert job, "Job is missing"
572 assert op, "Opcode is missing"
573
574 self._queue = queue
575 self._job = job
576 self._op = op
577
579 """Raises an exception to cancel the job if asked to.
580
581 """
582
583 if self._op.status == constants.OP_STATUS_CANCELING:
584 logging.debug("Canceling opcode")
585 raise CancelJob()
586
587 @locking.ssynchronized(_QUEUE, shared=1)
589 """Mark the opcode as running, not lock-waiting.
590
591 This is called from the mcpu code as a notifier function, when the LU is
592 finally about to start the Exec() method. Of course, to have end-user
593 visible results, the opcode must be initially (before calling into
594 Processor.ExecOpCode) set to OP_STATUS_WAITING.
595
596 """
597 assert self._op in self._job.ops
598 assert self._op.status in (constants.OP_STATUS_WAITING,
599 constants.OP_STATUS_CANCELING)
600
601
602 self._CheckCancel()
603
604 logging.debug("Opcode is now running")
605
606 self._op.status = constants.OP_STATUS_RUNNING
607 self._op.exec_timestamp = TimeStampNow()
608
609
610 self._queue.UpdateJobUnlocked(self._job)
611
612 @locking.ssynchronized(_QUEUE, shared=1)
614 """Internal feedback append function, with locks
615
616 """
617 self._job.log_serial += 1
618 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
619 self._queue.UpdateJobUnlocked(self._job, replicate=False)
620
637
649
651 """Submits jobs for processing.
652
653 See L{JobQueue.SubmitManyJobs}.
654
655 """
656
657 return self._queue.SubmitManyJobs(jobs)
658
670
674 """Initializes this class.
675
676 """
677 self._fn = fn
678 self._next = None
679
681 """Gets the next timeout if necessary.
682
683 """
684 if self._next is None:
685 self._next = self._fn()
686
688 """Returns the next timeout.
689
690 """
691 self._Advance()
692 return self._next
693
695 """Returns the current timeout and advances the internal state.
696
697 """
698 self._Advance()
699 result = self._next
700 self._next = None
701 return result
702
703
704 -class _OpExecContext:
705 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
706 """Initializes this class.
707
708 """
709 self.op = op
710 self.index = index
711 self.log_prefix = log_prefix
712 self.summary = op.input.Summary()
713
714
715 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
716 self.jobdeps = op.input.depends[:]
717 else:
718 self.jobdeps = None
719
720 self._timeout_strategy_factory = timeout_strategy_factory
721 self._ResetTimeoutStrategy()
722
724 """Creates a new timeout strategy.
725
726 """
727 self._timeout_strategy = \
728 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
729
731 """Checks whether priority can and should be increased.
732
733 Called when locks couldn't be acquired.
734
735 """
736 op = self.op
737
738
739
740 if (self._timeout_strategy.Peek() is None and
741 op.priority > constants.OP_PRIO_HIGHEST):
742 logging.debug("Increasing priority")
743 op.priority -= 1
744 self._ResetTimeoutStrategy()
745 return True
746
747 return False
748
750 """Returns the next lock acquire timeout.
751
752 """
753 return self._timeout_strategy.Next()
754
757 (DEFER,
758 WAITDEP,
759 FINISHED) = range(1, 4)
760
763 """Initializes this class.
764
765 """
766 self.queue = queue
767 self.opexec_fn = opexec_fn
768 self.job = job
769 self._timeout_strategy_factory = _timeout_strategy_factory
770
771 @staticmethod
773 """Locates the next opcode to run.
774
775 @type job: L{_QueuedJob}
776 @param job: Job object
777 @param timeout_strategy_factory: Callable to create new timeout strategy
778
779 """
780
781
782
783
784 if job.ops_iter is None:
785 job.ops_iter = enumerate(job.ops)
786
787
788 while True:
789 try:
790 (idx, op) = job.ops_iter.next()
791 except StopIteration:
792 raise errors.ProgrammerError("Called for a finished job")
793
794 if op.status == constants.OP_STATUS_RUNNING:
795
796 raise errors.ProgrammerError("Called for job marked as running")
797
798 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
799 timeout_strategy_factory)
800
801 if op.status not in constants.OPS_FINALIZED:
802 return opctx
803
804
805
806
807
808 logging.info("%s: opcode %s already processed, skipping",
809 opctx.log_prefix, opctx.summary)
810
811 @staticmethod
846
847 @staticmethod
849 """Checks if an opcode has dependencies and if so, processes them.
850
851 @type queue: L{JobQueue}
852 @param queue: Queue object
853 @type job: L{_QueuedJob}
854 @param job: Job object
855 @type opctx: L{_OpExecContext}
856 @param opctx: Opcode execution context
857 @rtype: bool
858 @return: Whether opcode will be re-scheduled by dependency tracker
859
860 """
861 op = opctx.op
862
863 result = False
864
865 while opctx.jobdeps:
866 (dep_job_id, dep_status) = opctx.jobdeps[0]
867
868 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
869 dep_status)
870 assert ht.TNonEmptyString(depmsg), "No dependency message"
871
872 logging.info("%s: %s", opctx.log_prefix, depmsg)
873
874 if depresult == _JobDependencyManager.CONTINUE:
875
876 opctx.jobdeps.pop(0)
877
878 elif depresult == _JobDependencyManager.WAIT:
879
880
881 result = True
882 break
883
884 elif depresult == _JobDependencyManager.CANCEL:
885
886 job.Cancel()
887 assert op.status == constants.OP_STATUS_CANCELING
888 break
889
890 elif depresult in (_JobDependencyManager.WRONGSTATUS,
891 _JobDependencyManager.ERROR):
892
893 op.status = constants.OP_STATUS_ERROR
894 op.result = _EncodeOpError(errors.OpExecError(depmsg))
895 break
896
897 else:
898 raise errors.ProgrammerError("Unknown dependency result '%s'" %
899 depresult)
900
901 return result
902
904 """Processes one opcode and returns the result.
905
906 """
907 op = opctx.op
908
909 assert op.status in (constants.OP_STATUS_WAITING,
910 constants.OP_STATUS_CANCELING)
911
912
913 if op.status == constants.OP_STATUS_CANCELING:
914 return (constants.OP_STATUS_CANCELING, None)
915
916 timeout = opctx.GetNextLockTimeout()
917
918 try:
919
920 result = self.opexec_fn(op.input,
921 _OpExecCallbacks(self.queue, self.job, op),
922 timeout=timeout)
923 except mcpu.LockAcquireTimeout:
924 assert timeout is not None, "Received timeout for blocking acquire"
925 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
926
927 assert op.status in (constants.OP_STATUS_WAITING,
928 constants.OP_STATUS_CANCELING)
929
930
931 if op.status == constants.OP_STATUS_CANCELING:
932 return (constants.OP_STATUS_CANCELING, None)
933
934
935 return (constants.OP_STATUS_WAITING, None)
936 except CancelJob:
937 logging.exception("%s: Canceling job", opctx.log_prefix)
938 assert op.status == constants.OP_STATUS_CANCELING
939 return (constants.OP_STATUS_CANCELING, None)
940
941 except Exception, err:
942 logging.exception("%s: Caught exception in %s",
943 opctx.log_prefix, opctx.summary)
944 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
945 else:
946 logging.debug("%s: %s successful",
947 opctx.log_prefix, opctx.summary)
948 return (constants.OP_STATUS_SUCCESS, result)
949
951 """Continues execution of a job.
952
953 @param _nextop_fn: Callback function for tests
954 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
955 be deferred and C{WAITDEP} if the dependency manager
956 (L{_JobDependencyManager}) will re-schedule the job when appropriate
957
958 """
959 queue = self.queue
960 job = self.job
961
962 logging.debug("Processing job %s", job.id)
963
964 queue.acquire(shared=1)
965 try:
966 opcount = len(job.ops)
967
968 assert job.writable, "Expected writable job"
969
970
971 if job.CalcStatus() in constants.JOBS_FINALIZED:
972 return self.FINISHED
973
974
975 if job.cur_opctx:
976 opctx = job.cur_opctx
977 job.cur_opctx = None
978 else:
979 if __debug__ and _nextop_fn:
980 _nextop_fn()
981 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
982
983 op = opctx.op
984
985
986 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
987 constants.OP_STATUS_CANCELING)
988 for i in job.ops[opctx.index + 1:])
989
990 assert op.status in (constants.OP_STATUS_QUEUED,
991 constants.OP_STATUS_WAITING,
992 constants.OP_STATUS_CANCELING)
993
994 assert (op.priority <= constants.OP_PRIO_LOWEST and
995 op.priority >= constants.OP_PRIO_HIGHEST)
996
997 waitjob = None
998
999 if op.status != constants.OP_STATUS_CANCELING:
1000 assert op.status in (constants.OP_STATUS_QUEUED,
1001 constants.OP_STATUS_WAITING)
1002
1003
1004 if self._MarkWaitlock(job, op):
1005
1006 queue.UpdateJobUnlocked(job)
1007
1008 assert op.status == constants.OP_STATUS_WAITING
1009 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1010 assert job.start_timestamp and op.start_timestamp
1011 assert waitjob is None
1012
1013
1014 waitjob = self._CheckDependencies(queue, job, opctx)
1015
1016 assert op.status in (constants.OP_STATUS_WAITING,
1017 constants.OP_STATUS_CANCELING,
1018 constants.OP_STATUS_ERROR)
1019
1020 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1021 constants.OP_STATUS_ERROR)):
1022 logging.info("%s: opcode %s waiting for locks",
1023 opctx.log_prefix, opctx.summary)
1024
1025 assert not opctx.jobdeps, "Not all dependencies were removed"
1026
1027 queue.release()
1028 try:
1029 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1030 finally:
1031 queue.acquire(shared=1)
1032
1033 op.status = op_status
1034 op.result = op_result
1035
1036 assert not waitjob
1037
1038 if op.status in (constants.OP_STATUS_WAITING,
1039 constants.OP_STATUS_QUEUED):
1040
1041
1042 assert not op.end_timestamp
1043 else:
1044
1045 op.end_timestamp = TimeStampNow()
1046
1047 if op.status == constants.OP_STATUS_CANCELING:
1048 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1049 for i in job.ops[opctx.index:])
1050 else:
1051 assert op.status in constants.OPS_FINALIZED
1052
1053 if op.status == constants.OP_STATUS_QUEUED:
1054
1055 assert not waitjob
1056
1057 finalize = False
1058
1059
1060 job.cur_opctx = None
1061
1062
1063 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1064
1065 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1066 finalize = False
1067
1068 if not waitjob and opctx.CheckPriorityIncrease():
1069
1070 queue.UpdateJobUnlocked(job)
1071
1072
1073 job.cur_opctx = opctx
1074
1075 assert (op.priority <= constants.OP_PRIO_LOWEST and
1076 op.priority >= constants.OP_PRIO_HIGHEST)
1077
1078
1079 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1080
1081 else:
1082
1083 assert (opctx.index == 0 or
1084 compat.all(i.status == constants.OP_STATUS_SUCCESS
1085 for i in job.ops[:opctx.index]))
1086
1087
1088 job.cur_opctx = None
1089
1090 if op.status == constants.OP_STATUS_SUCCESS:
1091 finalize = False
1092
1093 elif op.status == constants.OP_STATUS_ERROR:
1094
1095
1096
1097
1098 to_encode = errors.OpExecError("Preceding opcode failed")
1099 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1100 _EncodeOpError(to_encode))
1101 finalize = True
1102 elif op.status == constants.OP_STATUS_CANCELING:
1103 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1104 "Job canceled by request")
1105 finalize = True
1106
1107 else:
1108 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1109
1110 if opctx.index == (opcount - 1):
1111
1112 finalize = True
1113
1114 if finalize:
1115
1116 job.Finalize()
1117
1118
1119
1120 queue.UpdateJobUnlocked(job)
1121
1122 assert not waitjob
1123
1124 if finalize:
1125 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1126 return self.FINISHED
1127
1128 assert not waitjob or queue.depmgr.JobWaiting(job)
1129
1130 if waitjob:
1131 return self.WAITDEP
1132 else:
1133 return self.DEFER
1134 finally:
1135 assert job.writable, "Job became read-only while being processed"
1136 queue.release()
1137
1160
1163 """The actual job workers.
1164
1165 """
1167 """Job executor.
1168
1169 @type job: L{_QueuedJob}
1170 @param job: the job to be processed
1171
1172 """
1173 assert job.writable, "Expected writable job"
1174
1175
1176
1177
1178 job.processor_lock.acquire()
1179 try:
1180 return self._RunTaskInner(job)
1181 finally:
1182 job.processor_lock.release()
1183
1204
1205 @staticmethod
1207 """Updates the worker thread name to include a short summary of the opcode.
1208
1209 @param setname_fn: Callable setting worker thread name
1210 @param execop_fn: Callable for executing opcode (usually
1211 L{mcpu.Processor.ExecOpCode})
1212
1213 """
1214 setname_fn(op)
1215 try:
1216 return execop_fn(op, *args, **kwargs)
1217 finally:
1218 setname_fn(None)
1219
1220 @staticmethod
1222 """Sets the worker thread name.
1223
1224 @type job: L{_QueuedJob}
1225 @type op: L{opcodes.OpCode}
1226
1227 """
1228 parts = ["Job%s" % job.id]
1229
1230 if op:
1231 parts.append(op.TinySummary())
1232
1233 return "/".join(parts)
1234
1237 """Simple class implementing a job-processing workerpool.
1238
1239 """
1245
1248 """Keeps track of job dependencies.
1249
1250 """
1251 (WAIT,
1252 ERROR,
1253 CANCEL,
1254 CONTINUE,
1255 WRONGSTATUS) = range(1, 6)
1256
1257 - def __init__(self, getstatus_fn, enqueue_fn):
1258 """Initializes this class.
1259
1260 """
1261 self._getstatus_fn = getstatus_fn
1262 self._enqueue_fn = enqueue_fn
1263
1264 self._waiters = {}
1265 self._lock = locking.SharedLock("JobDepMgr")
1266
1267 @locking.ssynchronized(_LOCK, shared=1)
1269 """Retrieves information about waiting jobs.
1270
1271 @type requested: set
1272 @param requested: Requested information, see C{query.LQ_*}
1273
1274 """
1275
1276
1277
1278 return [("job/%s" % job_id, None, None,
1279 [("job", [job.id for job in waiters])])
1280 for job_id, waiters in self._waiters.items()
1281 if waiters]
1282
1283 @locking.ssynchronized(_LOCK, shared=1)
1285 """Checks if a job is waiting.
1286
1287 """
1288 return compat.any(job in jobs
1289 for jobs in self._waiters.values())
1290
1291 @locking.ssynchronized(_LOCK)
1293 """Checks if a dependency job has the requested status.
1294
1295 If the other job is not yet in a finalized status, the calling job will be
1296 notified (re-added to the workerpool) at a later point.
1297
1298 @type job: L{_QueuedJob}
1299 @param job: Job object
1300 @type dep_job_id: int
1301 @param dep_job_id: ID of dependency job
1302 @type dep_status: list
1303 @param dep_status: Required status
1304
1305 """
1306 assert ht.TJobId(job.id)
1307 assert ht.TJobId(dep_job_id)
1308 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1309
1310 if job.id == dep_job_id:
1311 return (self.ERROR, "Job can't depend on itself")
1312
1313
1314 try:
1315 status = self._getstatus_fn(dep_job_id)
1316 except errors.JobLost, err:
1317 return (self.ERROR, "Dependency error: %s" % err)
1318
1319 assert status in constants.JOB_STATUS_ALL
1320
1321 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1322
1323 if status not in constants.JOBS_FINALIZED:
1324
1325 job_id_waiters.add(job)
1326 return (self.WAIT,
1327 "Need to wait for job %s, wanted status '%s'" %
1328 (dep_job_id, dep_status))
1329
1330
1331 if job in job_id_waiters:
1332 job_id_waiters.remove(job)
1333
1334 if (status == constants.JOB_STATUS_CANCELED and
1335 constants.JOB_STATUS_CANCELED not in dep_status):
1336 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1337
1338 elif not dep_status or status in dep_status:
1339 return (self.CONTINUE,
1340 "Dependency job %s finished with status '%s'" %
1341 (dep_job_id, status))
1342
1343 else:
1344 return (self.WRONGSTATUS,
1345 "Dependency job %s finished with status '%s',"
1346 " not one of '%s' as required" %
1347 (dep_job_id, status, utils.CommaJoin(dep_status)))
1348
1350 """Remove all jobs without actual waiters.
1351
1352 """
1353 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1354 if not waiters]:
1355 del self._waiters[job_id]
1356
1358 """Notifies all jobs waiting for a certain job ID.
1359
1360 @attention: Do not call until L{CheckAndRegister} returned a status other
1361 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1362 @type job_id: int
1363 @param job_id: Job ID
1364
1365 """
1366 assert ht.TJobId(job_id)
1367
1368 self._lock.acquire()
1369 try:
1370 self._RemoveEmptyWaitersUnlocked()
1371
1372 jobs = self._waiters.pop(job_id, None)
1373 finally:
1374 self._lock.release()
1375
1376 if jobs:
1377
1378 logging.debug("Re-adding %s jobs which were waiting for job %s",
1379 len(jobs), job_id)
1380 self._enqueue_fn(jobs)
1381
1384 """Queue used to manage the jobs.
1385
1386 """
1388 """Constructor for JobQueue.
1389
1390 The constructor will initialize the job queue object and then
1391 start loading the current jobs from disk, either for starting them
1392 (if they were queue) or for aborting them (if they were already
1393 running).
1394
1395 @type context: GanetiContext
1396 @param context: the context object for access to the configuration
1397 data and other ganeti objects
1398
1399 """
1400 self.primary_jid = None
1401 self.context = context
1402 self._memcache = weakref.WeakValueDictionary()
1403 self._my_hostname = netutils.Hostname.GetSysName()
1404
1405
1406
1407
1408
1409
1410 self._lock = locking.SharedLock("JobQueue")
1411
1412 self.acquire = self._lock.acquire
1413 self.release = self._lock.release
1414
1415
1416 self._last_serial = jstore.ReadSerial()
1417 assert self._last_serial is not None, ("Serial file was modified between"
1418 " check in jstore and here")
1419
1420
1421 self._nodes = dict((n.name, n.primary_ip)
1422 for n in cfg.GetAllNodesInfo().values()
1423 if n.master_candidate)
1424
1425
1426 self._nodes.pop(self._my_hostname, None)
1427
1428
1429
1430 self._queue_size = None
1431 self._UpdateQueueSizeUnlocked()
1432 assert ht.TInt(self._queue_size)
1433
1434
1435 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1436 self._EnqueueJobs)
1437
1438
1439 self._wpool = _JobQueueWorkerPool(self)
1440
1442 """Load a job from the job queue
1443
1444 Pick up a job that already is in the job queue and start/resume it.
1445
1446 """
1447 if self.primary_jid:
1448 logging.warning("Job process asked to pick up %s, but already has %s",
1449 job_id, self.primary_jid)
1450
1451 self.primary_jid = int(job_id)
1452
1453 job = self._LoadJobUnlocked(job_id)
1454
1455 if job is None:
1456 logging.warning("Job %s could not be read", job_id)
1457 return
1458
1459 job.AddReasons(pickup=True)
1460
1461 status = job.CalcStatus()
1462 if status == constants.JOB_STATUS_QUEUED:
1463 job.SetPid(os.getpid())
1464 self._EnqueueJobsUnlocked([job])
1465 logging.info("Restarting job %s", job.id)
1466
1467 elif status in (constants.JOB_STATUS_RUNNING,
1468 constants.JOB_STATUS_WAITING,
1469 constants.JOB_STATUS_CANCELING):
1470 logging.warning("Unfinished job %s found: %s", job.id, job)
1471
1472 if status == constants.JOB_STATUS_WAITING:
1473 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1474 job.SetPid(os.getpid())
1475 self._EnqueueJobsUnlocked([job])
1476 logging.info("Restarting job %s", job.id)
1477 else:
1478 to_encode = errors.OpExecError("Unclean master daemon shutdown")
1479 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1480 _EncodeOpError(to_encode))
1481 job.Finalize()
1482
1483 self.UpdateJobUnlocked(job)
1484
1485 @locking.ssynchronized(_LOCK)
1488
1490 """Gets RPC runner with context.
1491
1492 """
1493 return rpc.JobQueueRunner(self.context, address_list)
1494
1495 @locking.ssynchronized(_LOCK)
1497 """Register a new node with the queue.
1498
1499 @type node: L{objects.Node}
1500 @param node: the node object to be added
1501
1502 """
1503 node_name = node.name
1504 assert node_name != self._my_hostname
1505
1506
1507 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1508 msg = result.fail_msg
1509 if msg:
1510 logging.warning("Cannot cleanup queue directory on node %s: %s",
1511 node_name, msg)
1512
1513 if not node.master_candidate:
1514
1515 self._nodes.pop(node_name, None)
1516
1517 return
1518
1519
1520 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1521
1522
1523 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1524
1525
1526 addrs = [node.primary_ip]
1527
1528 for file_name in files:
1529
1530 content = utils.ReadFile(file_name)
1531
1532 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1533 file_name, content)
1534 msg = result[node_name].fail_msg
1535 if msg:
1536 logging.error("Failed to upload file %s to node %s: %s",
1537 file_name, node_name, msg)
1538
1539 msg = result[node_name].fail_msg
1540 if msg:
1541 logging.error("Failed to set queue drained flag on node %s: %s",
1542 node_name, msg)
1543
1544 self._nodes[node_name] = node.primary_ip
1545
1546 @locking.ssynchronized(_LOCK)
1548 """Callback called when removing nodes from the cluster.
1549
1550 @type node_name: str
1551 @param node_name: the name of the node to remove
1552
1553 """
1554 self._nodes.pop(node_name, None)
1555
1556 @staticmethod
1558 """Verifies the status of an RPC call.
1559
1560 Since we aim to keep consistency should this node (the current
1561 master) fail, we will log errors if our rpc fail, and especially
1562 log the case when more than half of the nodes fails.
1563
1564 @param result: the data as returned from the rpc call
1565 @type nodes: list
1566 @param nodes: the list of nodes we made the call to
1567 @type failmsg: str
1568 @param failmsg: the identifier to be used for logging
1569
1570 """
1571 failed = []
1572 success = []
1573
1574 for node in nodes:
1575 msg = result[node].fail_msg
1576 if msg:
1577 failed.append(node)
1578 logging.error("RPC call %s (%s) failed on node %s: %s",
1579 result[node].call, failmsg, node, msg)
1580 else:
1581 success.append(node)
1582
1583
1584 if (len(success) + 1) < len(failed):
1585
1586 logging.error("More than half of the nodes failed")
1587
1589 """Helper for returning the node name/ip list.
1590
1591 @rtype: (list, list)
1592 @return: a tuple of two lists, the first one with the node
1593 names and the second one with the node addresses
1594
1595 """
1596
1597 name_list = self._nodes.keys()
1598 addr_list = [self._nodes[name] for name in name_list]
1599 return name_list, addr_list
1600
1602 """Writes a file locally and then replicates it to all nodes.
1603
1604 This function will replace the contents of a file on the local
1605 node and then replicate it to all the other nodes we have.
1606
1607 @type file_name: str
1608 @param file_name: the path of the file to be replicated
1609 @type data: str
1610 @param data: the new contents of the file
1611 @type replicate: boolean
1612 @param replicate: whether to spread the changes to the remote nodes
1613
1614 """
1615 getents = runtime.GetEnts()
1616 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1617 gid=getents.daemons_gid,
1618 mode=constants.JOB_QUEUE_FILES_PERMS)
1619
1620 if replicate:
1621 names, addrs = self._GetNodeIp()
1622 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1623 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1624
1626 """Renames a file locally and then replicate the change.
1627
1628 This function will rename a file in the local queue directory
1629 and then replicate this rename to all the other nodes we have.
1630
1631 @type rename: list of (old, new)
1632 @param rename: List containing tuples mapping old to new names
1633
1634 """
1635
1636 for old, new in rename:
1637 utils.RenameFile(old, new, mkdir=True)
1638
1639
1640 names, addrs = self._GetNodeIp()
1641 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1642 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1643
1644 @staticmethod
1646 """Returns the job file for a given job id.
1647
1648 @type job_id: str
1649 @param job_id: the job identifier
1650 @rtype: str
1651 @return: the path to the job file
1652
1653 """
1654 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1655
1656 @staticmethod
1658 """Returns the archived job file for a give job id.
1659
1660 @type job_id: str
1661 @param job_id: the job identifier
1662 @rtype: str
1663 @return: the path to the archived job file
1664
1665 """
1666 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1667 jstore.GetArchiveDirectory(job_id),
1668 "job-%s" % job_id)
1669
1670 @staticmethod
1672 """Build list of directories containing job files.
1673
1674 @type archived: bool
1675 @param archived: Whether to include directories for archived jobs
1676 @rtype: list
1677
1678 """
1679 result = [pathutils.QUEUE_DIR]
1680
1681 if archived:
1682 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1683 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1684 utils.ListVisibleFiles(archive_path)))
1685
1686 return result
1687
1688 @classmethod
1690 """Return all known job IDs.
1691
1692 The method only looks at disk because it's a requirement that all
1693 jobs are present on disk (so in the _memcache we don't have any
1694 extra IDs).
1695
1696 @type sort: boolean
1697 @param sort: perform sorting on the returned job ids
1698 @rtype: list
1699 @return: the list of job IDs
1700
1701 """
1702 jlist = []
1703
1704 for path in cls._DetermineJobDirectories(archived):
1705 for filename in utils.ListVisibleFiles(path):
1706 m = constants.JOB_FILE_RE.match(filename)
1707 if m:
1708 jlist.append(int(m.group(1)))
1709
1710 if sort:
1711 jlist.sort()
1712 return jlist
1713
1715 """Loads a job from the disk or memory.
1716
1717 Given a job id, this will return the cached job object if
1718 existing, or try to load the job from the disk. If loading from
1719 disk, it will also add the job to the cache.
1720
1721 @type job_id: int
1722 @param job_id: the job id
1723 @rtype: L{_QueuedJob} or None
1724 @return: either None or the job object
1725
1726 """
1727 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1728
1729 job = self._memcache.get(job_id, None)
1730 if job:
1731 logging.debug("Found job %s in memcache", job_id)
1732 assert job.writable, "Found read-only job in memcache"
1733 return job
1734
1735 try:
1736 job = self._LoadJobFromDisk(job_id, False)
1737 if job is None:
1738 return job
1739 except errors.JobFileCorrupted:
1740 old_path = self._GetJobPath(job_id)
1741 new_path = self._GetArchivedJobPath(job_id)
1742 if old_path == new_path:
1743
1744 logging.exception("Can't parse job %s", job_id)
1745 else:
1746
1747 logging.exception("Can't parse job %s, will archive.", job_id)
1748 self._RenameFilesUnlocked([(old_path, new_path)])
1749 return None
1750
1751 assert job.writable, "Job just loaded is not writable"
1752
1753 self._memcache[job_id] = job
1754 logging.debug("Added job %s to the cache", job_id)
1755 return job
1756
1758 """Load the given job file from disk.
1759
1760 Given a job file, read, load and restore it in a _QueuedJob format.
1761
1762 @type job_id: int
1763 @param job_id: job identifier
1764 @type try_archived: bool
1765 @param try_archived: Whether to try loading an archived job
1766 @rtype: L{_QueuedJob} or None
1767 @return: either None or the job object
1768
1769 """
1770 path_functions = [(self._GetJobPath, False)]
1771
1772 if try_archived:
1773 path_functions.append((self._GetArchivedJobPath, True))
1774
1775 raw_data = None
1776 archived = None
1777
1778 for (fn, archived) in path_functions:
1779 filepath = fn(job_id)
1780 logging.debug("Loading job from %s", filepath)
1781 try:
1782 raw_data = utils.ReadFile(filepath)
1783 except EnvironmentError, err:
1784 if err.errno != errno.ENOENT:
1785 raise
1786 else:
1787 break
1788
1789 if not raw_data:
1790 logging.debug("No data available for job %s", job_id)
1791 if int(job_id) == self.primary_jid:
1792 logging.warning("My own job file (%s) disappeared;"
1793 " this should only happy at cluster desctruction",
1794 job_id)
1795 if mcpu.lusExecuting[0] == 0:
1796 logging.warning("Not in execution; cleaning up myself due to missing"
1797 " job file")
1798 logging.shutdown()
1799 os._exit(1)
1800 return None
1801
1802 if writable is None:
1803 writable = not archived
1804
1805 try:
1806 data = serializer.LoadJson(raw_data)
1807 job = _QueuedJob.Restore(self, data, writable, archived)
1808 except Exception, err:
1809 raise errors.JobFileCorrupted(err)
1810
1811 return job
1812
1814 """Load the given job file from disk.
1815
1816 Given a job file, read, load and restore it in a _QueuedJob format.
1817 In case of error reading the job, it gets returned as None, and the
1818 exception is logged.
1819
1820 @type job_id: int
1821 @param job_id: job identifier
1822 @type try_archived: bool
1823 @param try_archived: Whether to try loading an archived job
1824 @rtype: L{_QueuedJob} or None
1825 @return: either None or the job object
1826
1827 """
1828 try:
1829 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1830 except (errors.JobFileCorrupted, EnvironmentError):
1831 logging.exception("Can't load/parse job %s", job_id)
1832 return None
1833
1835 """Update the queue size.
1836
1837 """
1838 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1839
1840 @classmethod
1846
1847 @staticmethod
1854
1855 @staticmethod
1857 """Resolves relative job IDs in dependencies.
1858
1859 @type resolve_fn: callable
1860 @param resolve_fn: Function to resolve a relative job ID
1861 @type deps: list
1862 @param deps: Dependencies
1863 @rtype: tuple; (boolean, string or list)
1864 @return: If successful (first tuple item), the returned list contains
1865 resolved job IDs along with the requested status; if not successful,
1866 the second element is an error message
1867
1868 """
1869 result = []
1870
1871 for (dep_job_id, dep_status) in deps:
1872 if ht.TRelativeJobId(dep_job_id):
1873 assert ht.TInt(dep_job_id) and dep_job_id < 0
1874 try:
1875 job_id = resolve_fn(dep_job_id)
1876 except IndexError:
1877
1878 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
1879 else:
1880 job_id = dep_job_id
1881
1882 result.append((job_id, dep_status))
1883
1884 return (True, result)
1885
1886 @locking.ssynchronized(_LOCK)
1888 """Helper function to add jobs to worker pool's queue.
1889
1890 @type jobs: list
1891 @param jobs: List of all jobs
1892
1893 """
1894 return self._EnqueueJobsUnlocked(jobs)
1895
1897 """Helper function to add jobs to worker pool's queue.
1898
1899 @type jobs: list
1900 @param jobs: List of all jobs
1901
1902 """
1903 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
1904 self._wpool.AddManyTasks([(job, ) for job in jobs],
1905 priority=[job.CalcPriority() for job in jobs],
1906 task_id=map(_GetIdAttr, jobs))
1907
1909 """Gets the status of a job for dependencies.
1910
1911 @type job_id: int
1912 @param job_id: Job ID
1913 @raise errors.JobLost: If job can't be found
1914
1915 """
1916
1917
1918
1919 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1920
1921 if job:
1922 assert not job.writable, "Got writable job"
1923
1924 if job:
1925 return job.CalcStatus()
1926
1927 raise errors.JobLost("Job %s not found" % job_id)
1928
1930 """Update a job's on disk storage.
1931
1932 After a job has been modified, this function needs to be called in
1933 order to write the changes to disk and replicate them to the other
1934 nodes.
1935
1936 @type job: L{_QueuedJob}
1937 @param job: the changed job
1938 @type replicate: boolean
1939 @param replicate: whether to replicate the change to remote nodes
1940
1941 """
1942 if __debug__:
1943 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1944 assert (finalized ^ (job.end_timestamp is None))
1945 assert job.writable, "Can't update read-only job"
1946 assert not job.archived, "Can't update archived job"
1947
1948 filename = self._GetJobPath(job.id)
1949 data = serializer.DumpJson(job.Serialize())
1950 logging.debug("Writing job %s to %s", job.id, filename)
1951 self._UpdateJobQueueFile(filename, data, replicate)
1952
1954 """Checks if a job has been finalized.
1955
1956 @type job_id: int
1957 @param job_id: Job identifier
1958 @rtype: boolean
1959 @return: True if the job has been finalized,
1960 False if the timeout has been reached,
1961 None if the job doesn't exist
1962
1963 """
1964 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1965 if job is not None:
1966 return job.CalcStatus() in constants.JOBS_FINALIZED
1967 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed:
1968
1969
1970
1971 return True
1972 else:
1973 return None
1974
1975 @locking.ssynchronized(_LOCK)
1977 """Cancels a job.
1978
1979 This will only succeed if the job has not started yet.
1980
1981 @type job_id: int
1982 @param job_id: job ID of job to be cancelled.
1983
1984 """
1985 logging.info("Cancelling job %s", job_id)
1986
1987 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1988
1989 @locking.ssynchronized(_LOCK)
1991 """Changes a job's priority.
1992
1993 @type job_id: int
1994 @param job_id: ID of the job whose priority should be changed
1995 @type priority: int
1996 @param priority: New priority
1997
1998 """
1999 logging.info("Changing priority of job %s to %s", job_id, priority)
2000
2001 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2002 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2003 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2004 (priority, allowed))
2005
2006 def fn(job):
2007 (success, msg) = job.ChangePriority(priority)
2008
2009 if success:
2010 try:
2011 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2012 except workerpool.NoSuchTask:
2013 logging.debug("Job %s is not in workerpool at this time", job.id)
2014
2015 return (success, msg)
2016
2017 return self._ModifyJobUnlocked(job_id, fn)
2018
2020 """Modifies a job.
2021
2022 @type job_id: int
2023 @param job_id: Job ID
2024 @type mod_fn: callable
2025 @param mod_fn: Modifying function, receiving job object as parameter,
2026 returning tuple of (status boolean, message string)
2027
2028 """
2029 job = self._LoadJobUnlocked(job_id)
2030 if not job:
2031 logging.debug("Job %s not found", job_id)
2032 return (False, "Job %s not found" % job_id)
2033
2034 assert job.writable, "Can't modify read-only job"
2035 assert not job.archived, "Can't modify archived job"
2036
2037 (success, msg) = mod_fn(job)
2038
2039 if success:
2040
2041
2042 self.UpdateJobUnlocked(job)
2043
2044 return (success, msg)
2045
2047 """Archives jobs.
2048
2049 @type jobs: list of L{_QueuedJob}
2050 @param jobs: Job objects
2051 @rtype: int
2052 @return: Number of archived jobs
2053
2054 """
2055 archive_jobs = []
2056 rename_files = []
2057 for job in jobs:
2058 assert job.writable, "Can't archive read-only job"
2059 assert not job.archived, "Can't cancel archived job"
2060
2061 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2062 logging.debug("Job %s is not yet done", job.id)
2063 continue
2064
2065 archive_jobs.append(job)
2066
2067 old = self._GetJobPath(job.id)
2068 new = self._GetArchivedJobPath(job.id)
2069 rename_files.append((old, new))
2070
2071
2072 self._RenameFilesUnlocked(rename_files)
2073
2074 logging.debug("Successfully archived job(s) %s",
2075 utils.CommaJoin(job.id for job in archive_jobs))
2076
2077
2078
2079
2080
2081 self._UpdateQueueSizeUnlocked()
2082 return len(archive_jobs)
2083
2084 - def _Query(self, fields, qfilter):
2085 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2086 namefield="id")
2087
2088
2089
2090
2091 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2092
2093 job_ids = qobj.RequestedNames()
2094
2095 list_all = (job_ids is None)
2096
2097 if list_all:
2098
2099
2100 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2101
2102 jobs = []
2103
2104 for job_id in job_ids:
2105 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2106 if job is not None or not list_all:
2107 jobs.append((job_id, job))
2108
2109 return (qobj, jobs, list_all)
2110
2112 """Returns a list of jobs in queue.
2113
2114 @type fields: sequence
2115 @param fields: List of wanted fields
2116 @type qfilter: None or query2 filter (list)
2117 @param qfilter: Query filter
2118
2119 """
2120 (qobj, ctx, _) = self._Query(fields, qfilter)
2121
2122 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2123
2125 """Returns a list of jobs in queue.
2126
2127 @type job_ids: list
2128 @param job_ids: sequence of job identifiers or None for all
2129 @type fields: list
2130 @param fields: names of fields to return
2131 @rtype: list
2132 @return: list one element per job, each element being list with
2133 the requested fields
2134
2135 """
2136
2137 job_ids = [int(jid) for jid in job_ids]
2138 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2139
2140 (qobj, ctx, _) = self._Query(fields, qfilter)
2141
2142 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2143
2144 @locking.ssynchronized(_LOCK)
2146 """Prepare to stop the job queue.
2147
2148 Returns whether there are any jobs currently running. If the latter is the
2149 case, the job queue is not yet ready for shutdown. Once this function
2150 returns C{True} L{Shutdown} can be called without interfering with any job.
2151
2152 @rtype: bool
2153 @return: Whether there are any running jobs
2154
2155 """
2156 return self._wpool.HasRunningTasks()
2157
2158 @locking.ssynchronized(_LOCK)
2160 """Stops the job queue.
2161
2162 This shutdowns all the worker threads an closes the queue.
2163
2164 """
2165 self._wpool.TerminateWorkers()
2166