1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28 processing jobs
29
30 """
31
32 import os
33 import logging
34 import errno
35 import re
36 import time
37 import weakref
38
39 try:
40
41 from pyinotify import pyinotify
42 except ImportError:
43 import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59
60
61 JOBQUEUE_THREADS = 25
62 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63
64
65 _LOCK = "_lock"
66 _QUEUE = "_queue"
70 """Special exception to cancel a job.
71
72 """
73
76 """Returns the current timestamp.
77
78 @rtype: tuple
79 @return: the current time in the (seconds, microseconds) format
80
81 """
82 return utils.SplitTime(time.time())
83
86 """Encapsulates an opcode object.
87
88 @ivar log: holds the execution log and consists of tuples
89 of the form C{(log_serial, timestamp, level, message)}
90 @ivar input: the OpCode we encapsulate
91 @ivar status: the current status
92 @ivar result: the result of the LU execution
93 @ivar start_timestamp: timestamp for the start of the execution
94 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95 @ivar stop_timestamp: timestamp for the end of the execution
96
97 """
98 __slots__ = ["input", "status", "result", "log", "priority",
99 "start_timestamp", "exec_timestamp", "end_timestamp",
100 "__weakref__"]
101
103 """Constructor for the _QuededOpCode.
104
105 @type op: L{opcodes.OpCode}
106 @param op: the opcode we encapsulate
107
108 """
109 self.input = op
110 self.status = constants.OP_STATUS_QUEUED
111 self.result = None
112 self.log = []
113 self.start_timestamp = None
114 self.exec_timestamp = None
115 self.end_timestamp = None
116
117
118 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119
120 @classmethod
122 """Restore the _QueuedOpCode from the serialized form.
123
124 @type state: dict
125 @param state: the serialized state
126 @rtype: _QueuedOpCode
127 @return: a new _QueuedOpCode instance
128
129 """
130 obj = _QueuedOpCode.__new__(cls)
131 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132 obj.status = state["status"]
133 obj.result = state["result"]
134 obj.log = state["log"]
135 obj.start_timestamp = state.get("start_timestamp", None)
136 obj.exec_timestamp = state.get("exec_timestamp", None)
137 obj.end_timestamp = state.get("end_timestamp", None)
138 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139 return obj
140
142 """Serializes this _QueuedOpCode.
143
144 @rtype: dict
145 @return: the dictionary holding the serialized state
146
147 """
148 return {
149 "input": self.input.__getstate__(),
150 "status": self.status,
151 "result": self.result,
152 "log": self.log,
153 "start_timestamp": self.start_timestamp,
154 "exec_timestamp": self.exec_timestamp,
155 "end_timestamp": self.end_timestamp,
156 "priority": self.priority,
157 }
158
161 """In-memory job representation.
162
163 This is what we use to track the user-submitted jobs. Locking must
164 be taken care of by users of this class.
165
166 @type queue: L{JobQueue}
167 @ivar queue: the parent queue
168 @ivar id: the job ID
169 @type ops: list
170 @ivar ops: the list of _QueuedOpCode that constitute the job
171 @type log_serial: int
172 @ivar log_serial: holds the index for the next log entry
173 @ivar received_timestamp: the timestamp for when the job was received
174 @ivar start_timestmap: the timestamp for start of execution
175 @ivar end_timestamp: the timestamp for end of execution
176
177 """
178
179 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180 "received_timestamp", "start_timestamp", "end_timestamp",
181 "__weakref__"]
182
183 - def __init__(self, queue, job_id, ops):
184 """Constructor for the _QueuedJob.
185
186 @type queue: L{JobQueue}
187 @param queue: our parent queue
188 @type job_id: job_id
189 @param job_id: our job id
190 @type ops: list
191 @param ops: the list of opcodes we hold, which will be encapsulated
192 in _QueuedOpCodes
193
194 """
195 if not ops:
196 raise errors.GenericError("A job needs at least one opcode")
197
198 self.queue = queue
199 self.id = job_id
200 self.ops = [_QueuedOpCode(op) for op in ops]
201 self.log_serial = 0
202 self.received_timestamp = TimeStampNow()
203 self.start_timestamp = None
204 self.end_timestamp = None
205
206 self._InitInMemory(self)
207
208 @staticmethod
210 """Initializes in-memory variables.
211
212 """
213 obj.ops_iter = None
214 obj.cur_opctx = None
215
217 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218 "id=%s" % self.id,
219 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220
221 return "<%s at %#x>" % (" ".join(status), id(self))
222
223 @classmethod
225 """Restore a _QueuedJob from serialized state:
226
227 @type queue: L{JobQueue}
228 @param queue: to which queue the restored job belongs
229 @type state: dict
230 @param state: the serialized state
231 @rtype: _JobQueue
232 @return: the restored _JobQueue instance
233
234 """
235 obj = _QueuedJob.__new__(cls)
236 obj.queue = queue
237 obj.id = state["id"]
238 obj.received_timestamp = state.get("received_timestamp", None)
239 obj.start_timestamp = state.get("start_timestamp", None)
240 obj.end_timestamp = state.get("end_timestamp", None)
241
242 obj.ops = []
243 obj.log_serial = 0
244 for op_state in state["ops"]:
245 op = _QueuedOpCode.Restore(op_state)
246 for log_entry in op.log:
247 obj.log_serial = max(obj.log_serial, log_entry[0])
248 obj.ops.append(op)
249
250 cls._InitInMemory(obj)
251
252 return obj
253
255 """Serialize the _JobQueue instance.
256
257 @rtype: dict
258 @return: the serialized state
259
260 """
261 return {
262 "id": self.id,
263 "ops": [op.Serialize() for op in self.ops],
264 "start_timestamp": self.start_timestamp,
265 "end_timestamp": self.end_timestamp,
266 "received_timestamp": self.received_timestamp,
267 }
268
321
323 """Gets the current priority for this job.
324
325 Only unfinished opcodes are considered. When all are done, the default
326 priority is used.
327
328 @rtype: int
329
330 """
331 priorities = [op.priority for op in self.ops
332 if op.status not in constants.OPS_FINALIZED]
333
334 if not priorities:
335
336 return constants.OP_PRIO_DEFAULT
337
338 return min(priorities)
339
341 """Selectively returns the log entries.
342
343 @type newer_than: None or int
344 @param newer_than: if this is None, return all log entries,
345 otherwise return only the log entries with serial higher
346 than this value
347 @rtype: list
348 @return: the list of the log entries selected
349
350 """
351 if newer_than is None:
352 serial = -1
353 else:
354 serial = newer_than
355
356 entries = []
357 for op in self.ops:
358 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359
360 return entries
361
363 """Returns information about a job.
364
365 @type fields: list
366 @param fields: names of fields to return
367 @rtype: list
368 @return: list with one element for each field
369 @raise errors.OpExecError: when an invalid field
370 has been passed
371
372 """
373 row = []
374 for fname in fields:
375 if fname == "id":
376 row.append(self.id)
377 elif fname == "status":
378 row.append(self.CalcStatus())
379 elif fname == "priority":
380 row.append(self.CalcPriority())
381 elif fname == "ops":
382 row.append([op.input.__getstate__() for op in self.ops])
383 elif fname == "opresult":
384 row.append([op.result for op in self.ops])
385 elif fname == "opstatus":
386 row.append([op.status for op in self.ops])
387 elif fname == "oplog":
388 row.append([op.log for op in self.ops])
389 elif fname == "opstart":
390 row.append([op.start_timestamp for op in self.ops])
391 elif fname == "opexec":
392 row.append([op.exec_timestamp for op in self.ops])
393 elif fname == "opend":
394 row.append([op.end_timestamp for op in self.ops])
395 elif fname == "oppriority":
396 row.append([op.priority for op in self.ops])
397 elif fname == "received_ts":
398 row.append(self.received_timestamp)
399 elif fname == "start_ts":
400 row.append(self.start_timestamp)
401 elif fname == "end_ts":
402 row.append(self.end_timestamp)
403 elif fname == "summary":
404 row.append([op.input.Summary() for op in self.ops])
405 else:
406 raise errors.OpExecError("Invalid self query field '%s'" % fname)
407 return row
408
410 """Mark unfinished opcodes with a given status and result.
411
412 This is an utility function for marking all running or waiting to
413 be run opcodes with a given status. Opcodes which are already
414 finalised are not changed.
415
416 @param status: a given opcode status
417 @param result: the opcode result
418
419 """
420 not_marked = True
421 for op in self.ops:
422 if op.status in constants.OPS_FINALIZED:
423 assert not_marked, "Finalized opcodes found after non-finalized ones"
424 continue
425 op.status = status
426 op.result = result
427 not_marked = False
428
452
456 """Initializes this class.
457
458 @type queue: L{JobQueue}
459 @param queue: Job queue
460 @type job: L{_QueuedJob}
461 @param job: Job object
462 @type op: L{_QueuedOpCode}
463 @param op: OpCode
464
465 """
466 assert queue, "Queue is missing"
467 assert job, "Job is missing"
468 assert op, "Opcode is missing"
469
470 self._queue = queue
471 self._job = job
472 self._op = op
473
475 """Raises an exception to cancel the job if asked to.
476
477 """
478
479 if self._op.status == constants.OP_STATUS_CANCELING:
480 logging.debug("Canceling opcode")
481 raise CancelJob()
482
483 @locking.ssynchronized(_QUEUE, shared=1)
485 """Mark the opcode as running, not lock-waiting.
486
487 This is called from the mcpu code as a notifier function, when the LU is
488 finally about to start the Exec() method. Of course, to have end-user
489 visible results, the opcode must be initially (before calling into
490 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
491
492 """
493 assert self._op in self._job.ops
494 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
495 constants.OP_STATUS_CANCELING)
496
497
498 self._CheckCancel()
499
500 logging.debug("Opcode is now running")
501
502 self._op.status = constants.OP_STATUS_RUNNING
503 self._op.exec_timestamp = TimeStampNow()
504
505
506 self._queue.UpdateJobUnlocked(self._job)
507
508 @locking.ssynchronized(_QUEUE, shared=1)
510 """Internal feedback append function, with locks
511
512 """
513 self._job.log_serial += 1
514 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
515 self._queue.UpdateJobUnlocked(self._job, replicate=False)
516
533
543
546 - def __init__(self, fields, prev_job_info, prev_log_serial):
547 """Initializes this class.
548
549 @type fields: list of strings
550 @param fields: Fields requested by LUXI client
551 @type prev_job_info: string
552 @param prev_job_info: previous job info, as passed by the LUXI client
553 @type prev_log_serial: string
554 @param prev_log_serial: previous job serial, as passed by the LUXI client
555
556 """
557 self._fields = fields
558 self._prev_job_info = prev_job_info
559 self._prev_log_serial = prev_log_serial
560
595
599 """Initializes this class.
600
601 @type filename: string
602 @param filename: Path to job file
603 @raises errors.InotifyError: if the notifier cannot be setup
604
605 """
606 self._wm = pyinotify.WatchManager()
607 self._inotify_handler = \
608 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
609 self._notifier = \
610 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
611 try:
612 self._inotify_handler.enable()
613 except Exception:
614
615 self._notifier.stop()
616 raise
617
619 """Callback for inotify.
620
621 """
622 if not notifier_enabled:
623 self._inotify_handler.enable()
624
625 - def Wait(self, timeout):
626 """Waits for the job file to change.
627
628 @type timeout: float
629 @param timeout: Timeout in seconds
630 @return: Whether there have been events
631
632 """
633 assert timeout >= 0
634 have_events = self._notifier.check_events(timeout * 1000)
635 if have_events:
636 self._notifier.read_events()
637 self._notifier.process_events()
638 return have_events
639
641 """Closes underlying notifier and its file descriptor.
642
643 """
644 self._notifier.stop()
645
649 """Initializes this class.
650
651 @type filename: string
652 @param filename: Path to job file
653
654 """
655 self._filewaiter = None
656 self._filename = filename
657
658 - def Wait(self, timeout):
659 """Waits for a job to change.
660
661 @type timeout: float
662 @param timeout: Timeout in seconds
663 @return: Whether there have been events
664
665 """
666 if self._filewaiter:
667 return self._filewaiter.Wait(timeout)
668
669
670
671
672
673 self._filewaiter = _JobFileChangesWaiter(self._filename)
674
675 return True
676
678 """Closes underlying waiter.
679
680 """
681 if self._filewaiter:
682 self._filewaiter.Close()
683
686 """Helper class using inotify to wait for changes in a job file.
687
688 This class takes a previous job status and serial, and alerts the client when
689 the current job status has changed.
690
691 """
692 @staticmethod
694 job = job_load_fn()
695 if not job:
696 raise errors.JobLost()
697
698 result = check_fn(job)
699 if result is None:
700 raise utils.RetryAgain()
701
702 return result
703
704 - def __call__(self, filename, job_load_fn,
705 fields, prev_job_info, prev_log_serial, timeout):
706 """Waits for changes on a job.
707
708 @type filename: string
709 @param filename: File on which to wait for changes
710 @type job_load_fn: callable
711 @param job_load_fn: Function to load job
712 @type fields: list of strings
713 @param fields: Which fields to check for changes
714 @type prev_job_info: list or None
715 @param prev_job_info: Last job information returned
716 @type prev_log_serial: int
717 @param prev_log_serial: Last job message serial number
718 @type timeout: float
719 @param timeout: maximum time to wait in seconds
720
721 """
722 try:
723 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
724 waiter = _JobChangesWaiter(filename)
725 try:
726 return utils.Retry(compat.partial(self._CheckForChanges,
727 job_load_fn, check_fn),
728 utils.RETRY_REMAINING_TIME, timeout,
729 wait_fn=waiter.Wait)
730 finally:
731 waiter.Close()
732 except (errors.InotifyError, errors.JobLost):
733 return None
734 except utils.RetryTimeout:
735 return constants.JOB_NOTCHANGED
736
748
752 """Initializes this class.
753
754 """
755 self._fn = fn
756 self._next = None
757
759 """Gets the next timeout if necessary.
760
761 """
762 if self._next is None:
763 self._next = self._fn()
764
766 """Returns the next timeout.
767
768 """
769 self._Advance()
770 return self._next
771
773 """Returns the current timeout and advances the internal state.
774
775 """
776 self._Advance()
777 result = self._next
778 self._next = None
779 return result
780
781
782 -class _OpExecContext:
783 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
784 """Initializes this class.
785
786 """
787 self.op = op
788 self.index = index
789 self.log_prefix = log_prefix
790 self.summary = op.input.Summary()
791
792 self._timeout_strategy_factory = timeout_strategy_factory
793 self._ResetTimeoutStrategy()
794
796 """Creates a new timeout strategy.
797
798 """
799 self._timeout_strategy = \
800 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
801
803 """Checks whether priority can and should be increased.
804
805 Called when locks couldn't be acquired.
806
807 """
808 op = self.op
809
810
811
812 if (self._timeout_strategy.Peek() is None and
813 op.priority > constants.OP_PRIO_HIGHEST):
814 logging.debug("Increasing priority")
815 op.priority -= 1
816 self._ResetTimeoutStrategy()
817 return True
818
819 return False
820
822 """Returns the next lock acquire timeout.
823
824 """
825 return self._timeout_strategy.Next()
826
831 """Initializes this class.
832
833 """
834 self.queue = queue
835 self.opexec_fn = opexec_fn
836 self.job = job
837 self._timeout_strategy_factory = _timeout_strategy_factory
838
839 @staticmethod
841 """Locates the next opcode to run.
842
843 @type job: L{_QueuedJob}
844 @param job: Job object
845 @param timeout_strategy_factory: Callable to create new timeout strategy
846
847 """
848
849
850
851
852 if job.ops_iter is None:
853 job.ops_iter = enumerate(job.ops)
854
855
856 while True:
857 try:
858 (idx, op) = job.ops_iter.next()
859 except StopIteration:
860 raise errors.ProgrammerError("Called for a finished job")
861
862 if op.status == constants.OP_STATUS_RUNNING:
863
864 raise errors.ProgrammerError("Called for job marked as running")
865
866 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867 timeout_strategy_factory)
868
869 if op.status == constants.OP_STATUS_CANCELED:
870
871 assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872 for i in job.ops[idx:])
873
874 elif op.status in constants.OPS_FINALIZED:
875
876
877
878
879 logging.info("%s: opcode %s already processed, skipping",
880 opctx.log_prefix, opctx.summary)
881 continue
882
883 return opctx
884
885 @staticmethod
887 """Marks an opcode as waiting for locks.
888
889 The job's start timestamp is also set if necessary.
890
891 @type job: L{_QueuedJob}
892 @param job: Job object
893 @type op: L{_QueuedOpCode}
894 @param op: Opcode object
895
896 """
897 assert op in job.ops
898
899 op.status = constants.OP_STATUS_WAITLOCK
900 op.result = None
901 op.start_timestamp = TimeStampNow()
902
903 if job.start_timestamp is None:
904 job.start_timestamp = op.start_timestamp
905
907 """Processes one opcode and returns the result.
908
909 """
910 op = opctx.op
911
912 assert op.status == constants.OP_STATUS_WAITLOCK
913
914 timeout = opctx.GetNextLockTimeout()
915
916 try:
917
918 result = self.opexec_fn(op.input,
919 _OpExecCallbacks(self.queue, self.job, op),
920 timeout=timeout, priority=op.priority)
921 except mcpu.LockAcquireTimeout:
922 assert timeout is not None, "Received timeout for blocking acquire"
923 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
924
925 assert op.status in (constants.OP_STATUS_WAITLOCK,
926 constants.OP_STATUS_CANCELING)
927
928
929 if op.status == constants.OP_STATUS_CANCELING:
930 return (constants.OP_STATUS_CANCELING, None)
931
932 return (constants.OP_STATUS_QUEUED, None)
933 except CancelJob:
934 logging.exception("%s: Canceling job", opctx.log_prefix)
935 assert op.status == constants.OP_STATUS_CANCELING
936 return (constants.OP_STATUS_CANCELING, None)
937 except Exception, err:
938 logging.exception("%s: Caught exception in %s",
939 opctx.log_prefix, opctx.summary)
940 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
941 else:
942 logging.debug("%s: %s successful",
943 opctx.log_prefix, opctx.summary)
944 return (constants.OP_STATUS_SUCCESS, result)
945
947 """Continues execution of a job.
948
949 @param _nextop_fn: Callback function for tests
950 @rtype: bool
951 @return: True if job is finished, False if processor needs to be called
952 again
953
954 """
955 queue = self.queue
956 job = self.job
957
958 logging.debug("Processing job %s", job.id)
959
960 queue.acquire(shared=1)
961 try:
962 opcount = len(job.ops)
963
964
965 if job.cur_opctx:
966 opctx = job.cur_opctx
967 else:
968 if __debug__ and _nextop_fn:
969 _nextop_fn()
970 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
971
972 op = opctx.op
973
974
975 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
976 constants.OP_STATUS_CANCELED)
977 for i in job.ops[opctx.index:])
978
979 assert op.status in (constants.OP_STATUS_QUEUED,
980 constants.OP_STATUS_WAITLOCK,
981 constants.OP_STATUS_CANCELED)
982
983 assert (op.priority <= constants.OP_PRIO_LOWEST and
984 op.priority >= constants.OP_PRIO_HIGHEST)
985
986 if op.status != constants.OP_STATUS_CANCELED:
987
988 self._MarkWaitlock(job, op)
989
990 assert op.status == constants.OP_STATUS_WAITLOCK
991 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
992
993
994 queue.UpdateJobUnlocked(job)
995
996 logging.info("%s: opcode %s waiting for locks",
997 opctx.log_prefix, opctx.summary)
998
999 queue.release()
1000 try:
1001 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1002 finally:
1003 queue.acquire(shared=1)
1004
1005 op.status = op_status
1006 op.result = op_result
1007
1008 if op.status == constants.OP_STATUS_QUEUED:
1009
1010 assert not op.end_timestamp
1011 else:
1012
1013 op.end_timestamp = TimeStampNow()
1014
1015 if op.status == constants.OP_STATUS_CANCELING:
1016 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1017 for i in job.ops[opctx.index:])
1018 else:
1019 assert op.status in constants.OPS_FINALIZED
1020
1021 if op.status == constants.OP_STATUS_QUEUED:
1022 finalize = False
1023
1024 opctx.CheckPriorityIncrease()
1025
1026
1027 job.cur_opctx = opctx
1028
1029 assert (op.priority <= constants.OP_PRIO_LOWEST and
1030 op.priority >= constants.OP_PRIO_HIGHEST)
1031
1032
1033 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1034
1035 queue.UpdateJobUnlocked(job)
1036
1037 else:
1038
1039 assert (opctx.index == 0 or
1040 compat.all(i.status == constants.OP_STATUS_SUCCESS
1041 for i in job.ops[:opctx.index]))
1042
1043
1044 job.cur_opctx = None
1045
1046 if op.status == constants.OP_STATUS_SUCCESS:
1047 finalize = False
1048
1049 elif op.status == constants.OP_STATUS_ERROR:
1050
1051 assert errors.GetEncodedError(job.ops[opctx.index].result)
1052
1053 to_encode = errors.OpExecError("Preceding opcode failed")
1054 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1055 _EncodeOpError(to_encode))
1056 finalize = True
1057
1058
1059 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1060 errors.GetEncodedError(i.result)
1061 for i in job.ops[opctx.index:])
1062
1063 elif op.status == constants.OP_STATUS_CANCELING:
1064 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1065 "Job canceled by request")
1066 finalize = True
1067
1068 elif op.status == constants.OP_STATUS_CANCELED:
1069 finalize = True
1070
1071 else:
1072 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1073
1074
1075 if finalize or opctx.index == (opcount - 1):
1076
1077 job.end_timestamp = TimeStampNow()
1078
1079
1080
1081 queue.UpdateJobUnlocked(job)
1082
1083 if finalize or opctx.index == (opcount - 1):
1084 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1085 return True
1086
1087 return False
1088 finally:
1089 queue.release()
1090
1093 """The actual job workers.
1094
1095 """
1097 """Job executor.
1098
1099 This functions processes a job. It is closely tied to the L{_QueuedJob} and
1100 L{_QueuedOpCode} classes.
1101
1102 @type job: L{_QueuedJob}
1103 @param job: the job to be processed
1104
1105 """
1106 queue = job.queue
1107 assert queue == self.pool.queue
1108
1109 self.SetTaskName("Job%s" % job.id)
1110
1111 proc = mcpu.Processor(queue.context, job.id)
1112
1113 if not _JobProcessor(queue, proc.ExecOpCode, job)():
1114
1115 raise workerpool.DeferTask(priority=job.CalcPriority())
1116
1119 """Simple class implementing a job-processing workerpool.
1120
1121 """
1127
1130 """Decorator for "public" functions.
1131
1132 This function should be used for all 'public' functions. That is,
1133 functions usually called from other classes. Note that this should
1134 be applied only to methods (not plain functions), since it expects
1135 that the decorated function is called with a first argument that has
1136 a '_queue_filelock' argument.
1137
1138 @warning: Use this decorator only after locking.ssynchronized
1139
1140 Example::
1141 @locking.ssynchronized(_LOCK)
1142 @_RequireOpenQueue
1143 def Example(self):
1144 pass
1145
1146 """
1147 def wrapper(self, *args, **kwargs):
1148
1149 assert self._queue_filelock is not None, "Queue should be open"
1150 return fn(self, *args, **kwargs)
1151 return wrapper
1152
1155 """Queue used to manage the jobs.
1156
1157 @cvar _RE_JOB_FILE: regex matching the valid job file names
1158
1159 """
1160 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1161
1163 """Constructor for JobQueue.
1164
1165 The constructor will initialize the job queue object and then
1166 start loading the current jobs from disk, either for starting them
1167 (if they were queue) or for aborting them (if they were already
1168 running).
1169
1170 @type context: GanetiContext
1171 @param context: the context object for access to the configuration
1172 data and other ganeti objects
1173
1174 """
1175 self.context = context
1176 self._memcache = weakref.WeakValueDictionary()
1177 self._my_hostname = netutils.Hostname.GetSysName()
1178
1179
1180
1181
1182
1183
1184 self._lock = locking.SharedLock("JobQueue")
1185
1186 self.acquire = self._lock.acquire
1187 self.release = self._lock.release
1188
1189
1190
1191 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1192
1193
1194 self._last_serial = jstore.ReadSerial()
1195 assert self._last_serial is not None, ("Serial file was modified between"
1196 " check in jstore and here")
1197
1198
1199 self._nodes = dict((n.name, n.primary_ip)
1200 for n in self.context.cfg.GetAllNodesInfo().values()
1201 if n.master_candidate)
1202
1203
1204 self._nodes.pop(self._my_hostname, None)
1205
1206
1207
1208 self._queue_size = 0
1209 self._UpdateQueueSizeUnlocked()
1210 self._drained = self._IsQueueMarkedDrain()
1211
1212
1213 self._wpool = _JobQueueWorkerPool(self)
1214 try:
1215 self._InspectQueue()
1216 except:
1217 self._wpool.TerminateWorkers()
1218 raise
1219
1220 @locking.ssynchronized(_LOCK)
1221 @_RequireOpenQueue
1223 """Loads the whole job queue and resumes unfinished jobs.
1224
1225 This function needs the lock here because WorkerPool.AddTask() may start a
1226 job while we're still doing our work.
1227
1228 """
1229 logging.info("Inspecting job queue")
1230
1231 restartjobs = []
1232
1233 all_job_ids = self._GetJobIDsUnlocked()
1234 jobs_count = len(all_job_ids)
1235 lastinfo = time.time()
1236 for idx, job_id in enumerate(all_job_ids):
1237
1238 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1239 idx == (jobs_count - 1)):
1240 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1241 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1242 lastinfo = time.time()
1243
1244 job = self._LoadJobUnlocked(job_id)
1245
1246
1247 if job is None:
1248 continue
1249
1250 status = job.CalcStatus()
1251
1252 if status == constants.JOB_STATUS_QUEUED:
1253 restartjobs.append(job)
1254
1255 elif status in (constants.JOB_STATUS_RUNNING,
1256 constants.JOB_STATUS_WAITLOCK,
1257 constants.JOB_STATUS_CANCELING):
1258 logging.warning("Unfinished job %s found: %s", job.id, job)
1259
1260 if status == constants.JOB_STATUS_WAITLOCK:
1261
1262 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1263 restartjobs.append(job)
1264 else:
1265 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1266 "Unclean master daemon shutdown")
1267
1268 self.UpdateJobUnlocked(job)
1269
1270 if restartjobs:
1271 logging.info("Restarting %s jobs", len(restartjobs))
1272 self._EnqueueJobs(restartjobs)
1273
1274 logging.info("Job queue inspection finished")
1275
1276 @locking.ssynchronized(_LOCK)
1277 @_RequireOpenQueue
1279 """Register a new node with the queue.
1280
1281 @type node: L{objects.Node}
1282 @param node: the node object to be added
1283
1284 """
1285 node_name = node.name
1286 assert node_name != self._my_hostname
1287
1288
1289 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1290 msg = result.fail_msg
1291 if msg:
1292 logging.warning("Cannot cleanup queue directory on node %s: %s",
1293 node_name, msg)
1294
1295 if not node.master_candidate:
1296
1297 self._nodes.pop(node_name, None)
1298
1299 return
1300
1301
1302 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1303
1304
1305 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1306
1307 for file_name in files:
1308
1309 content = utils.ReadFile(file_name)
1310
1311 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1312 [node.primary_ip],
1313 file_name, content)
1314 msg = result[node_name].fail_msg
1315 if msg:
1316 logging.error("Failed to upload file %s to node %s: %s",
1317 file_name, node_name, msg)
1318
1319 self._nodes[node_name] = node.primary_ip
1320
1321 @locking.ssynchronized(_LOCK)
1322 @_RequireOpenQueue
1324 """Callback called when removing nodes from the cluster.
1325
1326 @type node_name: str
1327 @param node_name: the name of the node to remove
1328
1329 """
1330 self._nodes.pop(node_name, None)
1331
1332 @staticmethod
1334 """Verifies the status of an RPC call.
1335
1336 Since we aim to keep consistency should this node (the current
1337 master) fail, we will log errors if our rpc fail, and especially
1338 log the case when more than half of the nodes fails.
1339
1340 @param result: the data as returned from the rpc call
1341 @type nodes: list
1342 @param nodes: the list of nodes we made the call to
1343 @type failmsg: str
1344 @param failmsg: the identifier to be used for logging
1345
1346 """
1347 failed = []
1348 success = []
1349
1350 for node in nodes:
1351 msg = result[node].fail_msg
1352 if msg:
1353 failed.append(node)
1354 logging.error("RPC call %s (%s) failed on node %s: %s",
1355 result[node].call, failmsg, node, msg)
1356 else:
1357 success.append(node)
1358
1359
1360 if (len(success) + 1) < len(failed):
1361
1362 logging.error("More than half of the nodes failed")
1363
1365 """Helper for returning the node name/ip list.
1366
1367 @rtype: (list, list)
1368 @return: a tuple of two lists, the first one with the node
1369 names and the second one with the node addresses
1370
1371 """
1372
1373 name_list = self._nodes.keys()
1374 addr_list = [self._nodes[name] for name in name_list]
1375 return name_list, addr_list
1376
1378 """Writes a file locally and then replicates it to all nodes.
1379
1380 This function will replace the contents of a file on the local
1381 node and then replicate it to all the other nodes we have.
1382
1383 @type file_name: str
1384 @param file_name: the path of the file to be replicated
1385 @type data: str
1386 @param data: the new contents of the file
1387 @type replicate: boolean
1388 @param replicate: whether to spread the changes to the remote nodes
1389
1390 """
1391 getents = runtime.GetEnts()
1392 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1393 gid=getents.masterd_gid)
1394
1395 if replicate:
1396 names, addrs = self._GetNodeIp()
1397 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1398 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1399
1401 """Renames a file locally and then replicate the change.
1402
1403 This function will rename a file in the local queue directory
1404 and then replicate this rename to all the other nodes we have.
1405
1406 @type rename: list of (old, new)
1407 @param rename: List containing tuples mapping old to new names
1408
1409 """
1410
1411 for old, new in rename:
1412 utils.RenameFile(old, new, mkdir=True)
1413
1414
1415 names, addrs = self._GetNodeIp()
1416 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1417 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1418
1419 @staticmethod
1439
1440 @classmethod
1442 """Returns the archive directory for a job.
1443
1444 @type job_id: str
1445 @param job_id: Job identifier
1446 @rtype: str
1447 @return: Directory name
1448
1449 """
1450 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1451
1453 """Generates a new job identifier.
1454
1455 Job identifiers are unique during the lifetime of a cluster.
1456
1457 @type count: integer
1458 @param count: how many serials to return
1459 @rtype: str
1460 @return: a string representing the job identifier.
1461
1462 """
1463 assert count > 0
1464
1465 serial = self._last_serial + count
1466
1467
1468 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1469 "%s\n" % serial, True)
1470
1471 result = [self._FormatJobID(v)
1472 for v in range(self._last_serial, serial + 1)]
1473
1474 self._last_serial = serial
1475
1476 return result
1477
1478 @staticmethod
1480 """Returns the job file for a given job id.
1481
1482 @type job_id: str
1483 @param job_id: the job identifier
1484 @rtype: str
1485 @return: the path to the job file
1486
1487 """
1488 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1489
1490 @classmethod
1492 """Returns the archived job file for a give job id.
1493
1494 @type job_id: str
1495 @param job_id: the job identifier
1496 @rtype: str
1497 @return: the path to the archived job file
1498
1499 """
1500 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1501 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1502
1504 """Return all known job IDs.
1505
1506 The method only looks at disk because it's a requirement that all
1507 jobs are present on disk (so in the _memcache we don't have any
1508 extra IDs).
1509
1510 @type sort: boolean
1511 @param sort: perform sorting on the returned job ids
1512 @rtype: list
1513 @return: the list of job IDs
1514
1515 """
1516 jlist = []
1517 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1518 m = self._RE_JOB_FILE.match(filename)
1519 if m:
1520 jlist.append(m.group(1))
1521 if sort:
1522 jlist = utils.NiceSort(jlist)
1523 return jlist
1524
1526 """Loads a job from the disk or memory.
1527
1528 Given a job id, this will return the cached job object if
1529 existing, or try to load the job from the disk. If loading from
1530 disk, it will also add the job to the cache.
1531
1532 @param job_id: the job id
1533 @rtype: L{_QueuedJob} or None
1534 @return: either None or the job object
1535
1536 """
1537 job = self._memcache.get(job_id, None)
1538 if job:
1539 logging.debug("Found job %s in memcache", job_id)
1540 return job
1541
1542 try:
1543 job = self._LoadJobFromDisk(job_id)
1544 if job is None:
1545 return job
1546 except errors.JobFileCorrupted:
1547 old_path = self._GetJobPath(job_id)
1548 new_path = self._GetArchivedJobPath(job_id)
1549 if old_path == new_path:
1550
1551 logging.exception("Can't parse job %s", job_id)
1552 else:
1553
1554 logging.exception("Can't parse job %s, will archive.", job_id)
1555 self._RenameFilesUnlocked([(old_path, new_path)])
1556 return None
1557
1558 self._memcache[job_id] = job
1559 logging.debug("Added job %s to the cache", job_id)
1560 return job
1561
1563 """Load the given job file from disk.
1564
1565 Given a job file, read, load and restore it in a _QueuedJob format.
1566
1567 @type job_id: string
1568 @param job_id: job identifier
1569 @rtype: L{_QueuedJob} or None
1570 @return: either None or the job object
1571
1572 """
1573 filepath = self._GetJobPath(job_id)
1574 logging.debug("Loading job from %s", filepath)
1575 try:
1576 raw_data = utils.ReadFile(filepath)
1577 except EnvironmentError, err:
1578 if err.errno in (errno.ENOENT, ):
1579 return None
1580 raise
1581
1582 try:
1583 data = serializer.LoadJson(raw_data)
1584 job = _QueuedJob.Restore(self, data)
1585 except Exception, err:
1586 raise errors.JobFileCorrupted(err)
1587
1588 return job
1589
1591 """Load the given job file from disk.
1592
1593 Given a job file, read, load and restore it in a _QueuedJob format.
1594 In case of error reading the job, it gets returned as None, and the
1595 exception is logged.
1596
1597 @type job_id: string
1598 @param job_id: job identifier
1599 @rtype: L{_QueuedJob} or None
1600 @return: either None or the job object
1601
1602 """
1603 try:
1604 return self._LoadJobFromDisk(job_id)
1605 except (errors.JobFileCorrupted, EnvironmentError):
1606 logging.exception("Can't load/parse job %s", job_id)
1607 return None
1608
1609 @staticmethod
1611 """Check if the queue is marked from drain.
1612
1613 This currently uses the queue drain file, which makes it a
1614 per-node flag. In the future this can be moved to the config file.
1615
1616 @rtype: boolean
1617 @return: True of the job queue is marked for draining
1618
1619 """
1620 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1621
1623 """Update the queue size.
1624
1625 """
1626 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1627
1628 @locking.ssynchronized(_LOCK)
1629 @_RequireOpenQueue
1648
1649 @_RequireOpenQueue
1651 """Create and store a new job.
1652
1653 This enters the job into our job queue and also puts it on the new
1654 queue, in order for it to be picked up by the queue processors.
1655
1656 @type job_id: job ID
1657 @param job_id: the job ID for the new job
1658 @type ops: list
1659 @param ops: The list of OpCodes that will become the new job.
1660 @rtype: L{_QueuedJob}
1661 @return: the job object to be queued
1662 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1663 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1664 @raise errors.GenericError: If an opcode is not valid
1665
1666 """
1667
1668
1669 if self._drained:
1670 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1671
1672 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1673 raise errors.JobQueueFull()
1674
1675 job = _QueuedJob(self, job_id, ops)
1676
1677
1678 for idx, op in enumerate(job.ops):
1679 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1680 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1681 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1682 " are %s" % (idx, op.priority, allowed))
1683
1684
1685 self.UpdateJobUnlocked(job)
1686
1687 self._queue_size += 1
1688
1689 logging.debug("Adding new job %s to the cache", job_id)
1690 self._memcache[job_id] = job
1691
1692 return job
1693
1694 @locking.ssynchronized(_LOCK)
1695 @_RequireOpenQueue
1705
1706 @locking.ssynchronized(_LOCK)
1707 @_RequireOpenQueue
1709 """Create and store multiple jobs.
1710
1711 @see: L{_SubmitJobUnlocked}
1712
1713 """
1714 results = []
1715 added_jobs = []
1716 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1717 for job_id, ops in zip(all_job_ids, jobs):
1718 try:
1719 added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1720 status = True
1721 data = job_id
1722 except errors.GenericError, err:
1723 data = str(err)
1724 status = False
1725 results.append((status, data))
1726
1727 self._EnqueueJobs(added_jobs)
1728
1729 return results
1730
1732 """Helper function to add jobs to worker pool's queue.
1733
1734 @type jobs: list
1735 @param jobs: List of all jobs
1736
1737 """
1738 self._wpool.AddManyTasks([(job, ) for job in jobs],
1739 priority=[job.CalcPriority() for job in jobs])
1740
1741 @_RequireOpenQueue
1743 """Update a job's on disk storage.
1744
1745 After a job has been modified, this function needs to be called in
1746 order to write the changes to disk and replicate them to the other
1747 nodes.
1748
1749 @type job: L{_QueuedJob}
1750 @param job: the changed job
1751 @type replicate: boolean
1752 @param replicate: whether to replicate the change to remote nodes
1753
1754 """
1755 filename = self._GetJobPath(job.id)
1756 data = serializer.DumpJson(job.Serialize(), indent=False)
1757 logging.debug("Writing job %s to %s", job.id, filename)
1758 self._UpdateJobQueueFile(filename, data, replicate)
1759
1760 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1761 timeout):
1762 """Waits for changes in a job.
1763
1764 @type job_id: string
1765 @param job_id: Job identifier
1766 @type fields: list of strings
1767 @param fields: Which fields to check for changes
1768 @type prev_job_info: list or None
1769 @param prev_job_info: Last job information returned
1770 @type prev_log_serial: int
1771 @param prev_log_serial: Last job message serial number
1772 @type timeout: float
1773 @param timeout: maximum time to wait in seconds
1774 @rtype: tuple (job info, log entries)
1775 @return: a tuple of the job information as required via
1776 the fields parameter, and the log entries as a list
1777
1778 if the job has not changed and the timeout has expired,
1779 we instead return a special value,
1780 L{constants.JOB_NOTCHANGED}, which should be interpreted
1781 as such by the clients
1782
1783 """
1784 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1785
1786 helper = _WaitForJobChangesHelper()
1787
1788 return helper(self._GetJobPath(job_id), load_fn,
1789 fields, prev_job_info, prev_log_serial, timeout)
1790
1791 @locking.ssynchronized(_LOCK)
1792 @_RequireOpenQueue
1794 """Cancels a job.
1795
1796 This will only succeed if the job has not started yet.
1797
1798 @type job_id: string
1799 @param job_id: job ID of job to be cancelled.
1800
1801 """
1802 logging.info("Cancelling job %s", job_id)
1803
1804 job = self._LoadJobUnlocked(job_id)
1805 if not job:
1806 logging.debug("Job %s not found", job_id)
1807 return (False, "Job %s not found" % job_id)
1808
1809 (success, msg) = job.Cancel()
1810
1811 if success:
1812 self.UpdateJobUnlocked(job)
1813
1814 return (success, msg)
1815
1816 @_RequireOpenQueue
1818 """Archives jobs.
1819
1820 @type jobs: list of L{_QueuedJob}
1821 @param jobs: Job objects
1822 @rtype: int
1823 @return: Number of archived jobs
1824
1825 """
1826 archive_jobs = []
1827 rename_files = []
1828 for job in jobs:
1829 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1830 logging.debug("Job %s is not yet done", job.id)
1831 continue
1832
1833 archive_jobs.append(job)
1834
1835 old = self._GetJobPath(job.id)
1836 new = self._GetArchivedJobPath(job.id)
1837 rename_files.append((old, new))
1838
1839
1840 self._RenameFilesUnlocked(rename_files)
1841
1842 logging.debug("Successfully archived job(s) %s",
1843 utils.CommaJoin(job.id for job in archive_jobs))
1844
1845
1846
1847
1848
1849 self._UpdateQueueSizeUnlocked()
1850 return len(archive_jobs)
1851
1852 @locking.ssynchronized(_LOCK)
1853 @_RequireOpenQueue
1855 """Archives a job.
1856
1857 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1858
1859 @type job_id: string
1860 @param job_id: Job ID of job to be archived.
1861 @rtype: bool
1862 @return: Whether job was archived
1863
1864 """
1865 logging.info("Archiving job %s", job_id)
1866
1867 job = self._LoadJobUnlocked(job_id)
1868 if not job:
1869 logging.debug("Job %s not found", job_id)
1870 return False
1871
1872 return self._ArchiveJobsUnlocked([job]) == 1
1873
1874 @locking.ssynchronized(_LOCK)
1875 @_RequireOpenQueue
1877 """Archives all jobs based on age.
1878
1879 The method will archive all jobs which are older than the age
1880 parameter. For jobs that don't have an end timestamp, the start
1881 timestamp will be considered. The special '-1' age will cause
1882 archival of all jobs (that are not running or queued).
1883
1884 @type age: int
1885 @param age: the minimum age in seconds
1886
1887 """
1888 logging.info("Archiving jobs with age more than %s seconds", age)
1889
1890 now = time.time()
1891 end_time = now + timeout
1892 archived_count = 0
1893 last_touched = 0
1894
1895 all_job_ids = self._GetJobIDsUnlocked()
1896 pending = []
1897 for idx, job_id in enumerate(all_job_ids):
1898 last_touched = idx + 1
1899
1900
1901
1902
1903 if time.time() > end_time:
1904 break
1905
1906
1907 job = self._LoadJobUnlocked(job_id)
1908 if job:
1909 if job.end_timestamp is None:
1910 if job.start_timestamp is None:
1911 job_age = job.received_timestamp
1912 else:
1913 job_age = job.start_timestamp
1914 else:
1915 job_age = job.end_timestamp
1916
1917 if age == -1 or now - job_age[0] > age:
1918 pending.append(job)
1919
1920
1921 if len(pending) >= 10:
1922 archived_count += self._ArchiveJobsUnlocked(pending)
1923 pending = []
1924
1925 if pending:
1926 archived_count += self._ArchiveJobsUnlocked(pending)
1927
1928 return (archived_count, len(all_job_ids) - last_touched)
1929
1931 """Returns a list of jobs in queue.
1932
1933 @type job_ids: list
1934 @param job_ids: sequence of job identifiers or None for all
1935 @type fields: list
1936 @param fields: names of fields to return
1937 @rtype: list
1938 @return: list one element per job, each element being list with
1939 the requested fields
1940
1941 """
1942 jobs = []
1943 list_all = False
1944 if not job_ids:
1945
1946
1947 job_ids = self._GetJobIDsUnlocked()
1948 list_all = True
1949
1950 for job_id in job_ids:
1951 job = self.SafeLoadJobFromDisk(job_id)
1952 if job is not None:
1953 jobs.append(job.GetInfo(fields))
1954 elif not list_all:
1955 jobs.append(None)
1956
1957 return jobs
1958
1959 @locking.ssynchronized(_LOCK)
1960 @_RequireOpenQueue
1962 """Stops the job queue.
1963
1964 This shutdowns all the worker threads an closes the queue.
1965
1966 """
1967 self._wpool.TerminateWorkers()
1968
1969 self._queue_filelock.Close()
1970 self._queue_filelock = None
1971