1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28 processing jobs
29
30 """
31
32 import os
33 import logging
34 import errno
35 import re
36 import time
37 import weakref
38
39 try:
40
41 from pyinotify import pyinotify
42 except ImportError:
43 import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import netutils
57 from ganeti import compat
58
59
60 JOBQUEUE_THREADS = 25
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
62
63
64 _LOCK = "_lock"
65 _QUEUE = "_queue"
69 """Special exception to cancel a job.
70
71 """
72
75 """Returns the current timestamp.
76
77 @rtype: tuple
78 @return: the current time in the (seconds, microseconds) format
79
80 """
81 return utils.SplitTime(time.time())
82
85 """Encapsulates an opcode object.
86
87 @ivar log: holds the execution log and consists of tuples
88 of the form C{(log_serial, timestamp, level, message)}
89 @ivar input: the OpCode we encapsulate
90 @ivar status: the current status
91 @ivar result: the result of the LU execution
92 @ivar start_timestamp: timestamp for the start of the execution
93 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 @ivar stop_timestamp: timestamp for the end of the execution
95
96 """
97 __slots__ = ["input", "status", "result", "log",
98 "start_timestamp", "exec_timestamp", "end_timestamp",
99 "__weakref__"]
100
102 """Constructor for the _QuededOpCode.
103
104 @type op: L{opcodes.OpCode}
105 @param op: the opcode we encapsulate
106
107 """
108 self.input = op
109 self.status = constants.OP_STATUS_QUEUED
110 self.result = None
111 self.log = []
112 self.start_timestamp = None
113 self.exec_timestamp = None
114 self.end_timestamp = None
115
116 @classmethod
118 """Restore the _QueuedOpCode from the serialized form.
119
120 @type state: dict
121 @param state: the serialized state
122 @rtype: _QueuedOpCode
123 @return: a new _QueuedOpCode instance
124
125 """
126 obj = _QueuedOpCode.__new__(cls)
127 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128 obj.status = state["status"]
129 obj.result = state["result"]
130 obj.log = state["log"]
131 obj.start_timestamp = state.get("start_timestamp", None)
132 obj.exec_timestamp = state.get("exec_timestamp", None)
133 obj.end_timestamp = state.get("end_timestamp", None)
134 return obj
135
137 """Serializes this _QueuedOpCode.
138
139 @rtype: dict
140 @return: the dictionary holding the serialized state
141
142 """
143 return {
144 "input": self.input.__getstate__(),
145 "status": self.status,
146 "result": self.result,
147 "log": self.log,
148 "start_timestamp": self.start_timestamp,
149 "exec_timestamp": self.exec_timestamp,
150 "end_timestamp": self.end_timestamp,
151 }
152
155 """In-memory job representation.
156
157 This is what we use to track the user-submitted jobs. Locking must
158 be taken care of by users of this class.
159
160 @type queue: L{JobQueue}
161 @ivar queue: the parent queue
162 @ivar id: the job ID
163 @type ops: list
164 @ivar ops: the list of _QueuedOpCode that constitute the job
165 @type log_serial: int
166 @ivar log_serial: holds the index for the next log entry
167 @ivar received_timestamp: the timestamp for when the job was received
168 @ivar start_timestmap: the timestamp for start of execution
169 @ivar end_timestamp: the timestamp for end of execution
170
171 """
172
173 __slots__ = ["queue", "id", "ops", "log_serial",
174 "received_timestamp", "start_timestamp", "end_timestamp",
175 "__weakref__"]
176
177 - def __init__(self, queue, job_id, ops):
178 """Constructor for the _QueuedJob.
179
180 @type queue: L{JobQueue}
181 @param queue: our parent queue
182 @type job_id: job_id
183 @param job_id: our job id
184 @type ops: list
185 @param ops: the list of opcodes we hold, which will be encapsulated
186 in _QueuedOpCodes
187
188 """
189 if not ops:
190 raise errors.GenericError("A job needs at least one opcode")
191
192 self.queue = queue
193 self.id = job_id
194 self.ops = [_QueuedOpCode(op) for op in ops]
195 self.log_serial = 0
196 self.received_timestamp = TimeStampNow()
197 self.start_timestamp = None
198 self.end_timestamp = None
199
201 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
202 "id=%s" % self.id,
203 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
204
205 return "<%s at %#x>" % (" ".join(status), id(self))
206
207 @classmethod
209 """Restore a _QueuedJob from serialized state:
210
211 @type queue: L{JobQueue}
212 @param queue: to which queue the restored job belongs
213 @type state: dict
214 @param state: the serialized state
215 @rtype: _JobQueue
216 @return: the restored _JobQueue instance
217
218 """
219 obj = _QueuedJob.__new__(cls)
220 obj.queue = queue
221 obj.id = state["id"]
222 obj.received_timestamp = state.get("received_timestamp", None)
223 obj.start_timestamp = state.get("start_timestamp", None)
224 obj.end_timestamp = state.get("end_timestamp", None)
225
226 obj.ops = []
227 obj.log_serial = 0
228 for op_state in state["ops"]:
229 op = _QueuedOpCode.Restore(op_state)
230 for log_entry in op.log:
231 obj.log_serial = max(obj.log_serial, log_entry[0])
232 obj.ops.append(op)
233
234 return obj
235
237 """Serialize the _JobQueue instance.
238
239 @rtype: dict
240 @return: the serialized state
241
242 """
243 return {
244 "id": self.id,
245 "ops": [op.Serialize() for op in self.ops],
246 "start_timestamp": self.start_timestamp,
247 "end_timestamp": self.end_timestamp,
248 "received_timestamp": self.received_timestamp,
249 }
250
303
305 """Selectively returns the log entries.
306
307 @type newer_than: None or int
308 @param newer_than: if this is None, return all log entries,
309 otherwise return only the log entries with serial higher
310 than this value
311 @rtype: list
312 @return: the list of the log entries selected
313
314 """
315 if newer_than is None:
316 serial = -1
317 else:
318 serial = newer_than
319
320 entries = []
321 for op in self.ops:
322 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
323
324 return entries
325
327 """Returns information about a job.
328
329 @type fields: list
330 @param fields: names of fields to return
331 @rtype: list
332 @return: list with one element for each field
333 @raise errors.OpExecError: when an invalid field
334 has been passed
335
336 """
337 row = []
338 for fname in fields:
339 if fname == "id":
340 row.append(self.id)
341 elif fname == "status":
342 row.append(self.CalcStatus())
343 elif fname == "ops":
344 row.append([op.input.__getstate__() for op in self.ops])
345 elif fname == "opresult":
346 row.append([op.result for op in self.ops])
347 elif fname == "opstatus":
348 row.append([op.status for op in self.ops])
349 elif fname == "oplog":
350 row.append([op.log for op in self.ops])
351 elif fname == "opstart":
352 row.append([op.start_timestamp for op in self.ops])
353 elif fname == "opexec":
354 row.append([op.exec_timestamp for op in self.ops])
355 elif fname == "opend":
356 row.append([op.end_timestamp for op in self.ops])
357 elif fname == "received_ts":
358 row.append(self.received_timestamp)
359 elif fname == "start_ts":
360 row.append(self.start_timestamp)
361 elif fname == "end_ts":
362 row.append(self.end_timestamp)
363 elif fname == "summary":
364 row.append([op.input.Summary() for op in self.ops])
365 else:
366 raise errors.OpExecError("Invalid self query field '%s'" % fname)
367 return row
368
370 """Mark unfinished opcodes with a given status and result.
371
372 This is an utility function for marking all running or waiting to
373 be run opcodes with a given status. Opcodes which are already
374 finalised are not changed.
375
376 @param status: a given opcode status
377 @param result: the opcode result
378
379 """
380 not_marked = True
381 for op in self.ops:
382 if op.status in constants.OPS_FINALIZED:
383 assert not_marked, "Finalized opcodes found after non-finalized ones"
384 continue
385 op.status = status
386 op.result = result
387 not_marked = False
388
392 """Initializes this class.
393
394 @type queue: L{JobQueue}
395 @param queue: Job queue
396 @type job: L{_QueuedJob}
397 @param job: Job object
398 @type op: L{_QueuedOpCode}
399 @param op: OpCode
400
401 """
402 assert queue, "Queue is missing"
403 assert job, "Job is missing"
404 assert op, "Opcode is missing"
405
406 self._queue = queue
407 self._job = job
408 self._op = op
409
411 """Raises an exception to cancel the job if asked to.
412
413 """
414
415 if self._op.status == constants.OP_STATUS_CANCELING:
416 logging.debug("Canceling opcode")
417 raise CancelJob()
418
419 @locking.ssynchronized(_QUEUE, shared=1)
421 """Mark the opcode as running, not lock-waiting.
422
423 This is called from the mcpu code as a notifier function, when the LU is
424 finally about to start the Exec() method. Of course, to have end-user
425 visible results, the opcode must be initially (before calling into
426 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
427
428 """
429 assert self._op in self._job.ops
430 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
431 constants.OP_STATUS_CANCELING)
432
433
434 self._CheckCancel()
435
436 logging.debug("Opcode is now running")
437
438 self._op.status = constants.OP_STATUS_RUNNING
439 self._op.exec_timestamp = TimeStampNow()
440
441
442 self._queue.UpdateJobUnlocked(self._job)
443
444 @locking.ssynchronized(_QUEUE, shared=1)
446 """Internal feedback append function, with locks
447
448 """
449 self._job.log_serial += 1
450 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
451 self._queue.UpdateJobUnlocked(self._job, replicate=False)
452
469
479
482 - def __init__(self, fields, prev_job_info, prev_log_serial):
483 """Initializes this class.
484
485 @type fields: list of strings
486 @param fields: Fields requested by LUXI client
487 @type prev_job_info: string
488 @param prev_job_info: previous job info, as passed by the LUXI client
489 @type prev_log_serial: string
490 @param prev_log_serial: previous job serial, as passed by the LUXI client
491
492 """
493 self._fields = fields
494 self._prev_job_info = prev_job_info
495 self._prev_log_serial = prev_log_serial
496
531
535 """Initializes this class.
536
537 @type filename: string
538 @param filename: Path to job file
539 @raises errors.InotifyError: if the notifier cannot be setup
540
541 """
542 self._wm = pyinotify.WatchManager()
543 self._inotify_handler = \
544 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
545 self._notifier = \
546 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
547 try:
548 self._inotify_handler.enable()
549 except Exception:
550
551 self._notifier.stop()
552 raise
553
555 """Callback for inotify.
556
557 """
558 if not notifier_enabled:
559 self._inotify_handler.enable()
560
561 - def Wait(self, timeout):
562 """Waits for the job file to change.
563
564 @type timeout: float
565 @param timeout: Timeout in seconds
566 @return: Whether there have been events
567
568 """
569 assert timeout >= 0
570 have_events = self._notifier.check_events(timeout * 1000)
571 if have_events:
572 self._notifier.read_events()
573 self._notifier.process_events()
574 return have_events
575
577 """Closes underlying notifier and its file descriptor.
578
579 """
580 self._notifier.stop()
581
585 """Initializes this class.
586
587 @type filename: string
588 @param filename: Path to job file
589
590 """
591 self._filewaiter = None
592 self._filename = filename
593
594 - def Wait(self, timeout):
595 """Waits for a job to change.
596
597 @type timeout: float
598 @param timeout: Timeout in seconds
599 @return: Whether there have been events
600
601 """
602 if self._filewaiter:
603 return self._filewaiter.Wait(timeout)
604
605
606
607
608
609 self._filewaiter = _JobFileChangesWaiter(self._filename)
610
611 return True
612
614 """Closes underlying waiter.
615
616 """
617 if self._filewaiter:
618 self._filewaiter.Close()
619
622 """Helper class using inotify to wait for changes in a job file.
623
624 This class takes a previous job status and serial, and alerts the client when
625 the current job status has changed.
626
627 """
628 @staticmethod
630 job = job_load_fn()
631 if not job:
632 raise errors.JobLost()
633
634 result = check_fn(job)
635 if result is None:
636 raise utils.RetryAgain()
637
638 return result
639
640 - def __call__(self, filename, job_load_fn,
641 fields, prev_job_info, prev_log_serial, timeout):
642 """Waits for changes on a job.
643
644 @type filename: string
645 @param filename: File on which to wait for changes
646 @type job_load_fn: callable
647 @param job_load_fn: Function to load job
648 @type fields: list of strings
649 @param fields: Which fields to check for changes
650 @type prev_job_info: list or None
651 @param prev_job_info: Last job information returned
652 @type prev_log_serial: int
653 @param prev_log_serial: Last job message serial number
654 @type timeout: float
655 @param timeout: maximum time to wait in seconds
656
657 """
658 try:
659 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
660 waiter = _JobChangesWaiter(filename)
661 try:
662 return utils.Retry(compat.partial(self._CheckForChanges,
663 job_load_fn, check_fn),
664 utils.RETRY_REMAINING_TIME, timeout,
665 wait_fn=waiter.Wait)
666 finally:
667 waiter.Close()
668 except (errors.InotifyError, errors.JobLost):
669 return None
670 except utils.RetryTimeout:
671 return constants.JOB_NOTCHANGED
672
684
687 """The actual job workers.
688
689 """
691 """Job executor.
692
693 This functions processes a job. It is closely tied to the _QueuedJob and
694 _QueuedOpCode classes.
695
696 @type job: L{_QueuedJob}
697 @param job: the job to be processed
698
699 """
700 self.SetTaskName("Job%s" % job.id)
701
702 logging.info("Processing job %s", job.id)
703 proc = mcpu.Processor(self.pool.queue.context, job.id)
704 queue = job.queue
705 try:
706 try:
707 count = len(job.ops)
708 for idx, op in enumerate(job.ops):
709 op_summary = op.input.Summary()
710 if op.status == constants.OP_STATUS_SUCCESS:
711
712
713
714
715
716 logging.info("Op %s/%s: opcode %s already processed, skipping",
717 idx + 1, count, op_summary)
718 continue
719 try:
720 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
721 op_summary)
722
723 queue.acquire(shared=1)
724 try:
725 if op.status == constants.OP_STATUS_CANCELED:
726 logging.debug("Canceling opcode")
727 raise CancelJob()
728 assert op.status == constants.OP_STATUS_QUEUED
729 logging.debug("Opcode %s/%s waiting for locks",
730 idx + 1, count)
731 op.status = constants.OP_STATUS_WAITLOCK
732 op.result = None
733 op.start_timestamp = TimeStampNow()
734 if idx == 0:
735 job.start_timestamp = op.start_timestamp
736 queue.UpdateJobUnlocked(job)
737
738 input_opcode = op.input
739 finally:
740 queue.release()
741
742
743 result = proc.ExecOpCode(input_opcode,
744 _OpExecCallbacks(queue, job, op))
745
746 queue.acquire(shared=1)
747 try:
748 logging.debug("Opcode %s/%s succeeded", idx + 1, count)
749 op.status = constants.OP_STATUS_SUCCESS
750 op.result = result
751 op.end_timestamp = TimeStampNow()
752 if idx == count - 1:
753 job.end_timestamp = TimeStampNow()
754
755
756 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
757 for i in job.ops)
758
759 queue.UpdateJobUnlocked(job)
760 finally:
761 queue.release()
762
763 logging.info("Op %s/%s: Successfully finished opcode %s",
764 idx + 1, count, op_summary)
765 except CancelJob:
766
767 raise
768 except Exception, err:
769 queue.acquire(shared=1)
770 try:
771 try:
772 logging.debug("Opcode %s/%s failed", idx + 1, count)
773 op.status = constants.OP_STATUS_ERROR
774 op.result = _EncodeOpError(err)
775 op.end_timestamp = TimeStampNow()
776 logging.info("Op %s/%s: Error in opcode %s: %s",
777 idx + 1, count, op_summary, err)
778
779 to_encode = errors.OpExecError("Preceding opcode failed")
780 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
781 _EncodeOpError(to_encode))
782
783
784 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
785 for i in job.ops[:idx])
786 assert compat.all(i.status == constants.OP_STATUS_ERROR and
787 errors.GetEncodedError(i.result)
788 for i in job.ops[idx:])
789 finally:
790 job.end_timestamp = TimeStampNow()
791 queue.UpdateJobUnlocked(job)
792 finally:
793 queue.release()
794 raise
795
796 except CancelJob:
797 queue.acquire(shared=1)
798 try:
799 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
800 "Job canceled by request")
801 job.end_timestamp = TimeStampNow()
802 queue.UpdateJobUnlocked(job)
803 finally:
804 queue.release()
805 except errors.GenericError, err:
806 logging.exception("Ganeti exception")
807 except:
808 logging.exception("Unhandled exception")
809 finally:
810 status = job.CalcStatus()
811 logging.info("Finished job %s, status = %s", job.id, status)
812
815 """Simple class implementing a job-processing workerpool.
816
817 """
823
826 """Decorator for "public" functions.
827
828 This function should be used for all 'public' functions. That is,
829 functions usually called from other classes. Note that this should
830 be applied only to methods (not plain functions), since it expects
831 that the decorated function is called with a first argument that has
832 a '_queue_filelock' argument.
833
834 @warning: Use this decorator only after locking.ssynchronized
835
836 Example::
837 @locking.ssynchronized(_LOCK)
838 @_RequireOpenQueue
839 def Example(self):
840 pass
841
842 """
843 def wrapper(self, *args, **kwargs):
844
845 assert self._queue_filelock is not None, "Queue should be open"
846 return fn(self, *args, **kwargs)
847 return wrapper
848
851 """Queue used to manage the jobs.
852
853 @cvar _RE_JOB_FILE: regex matching the valid job file names
854
855 """
856 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
857
859 """Constructor for JobQueue.
860
861 The constructor will initialize the job queue object and then
862 start loading the current jobs from disk, either for starting them
863 (if they were queue) or for aborting them (if they were already
864 running).
865
866 @type context: GanetiContext
867 @param context: the context object for access to the configuration
868 data and other ganeti objects
869
870 """
871 self.context = context
872 self._memcache = weakref.WeakValueDictionary()
873 self._my_hostname = netutils.HostInfo().name
874
875
876
877
878
879
880 self._lock = locking.SharedLock("JobQueue")
881
882 self.acquire = self._lock.acquire
883 self.release = self._lock.release
884
885
886
887 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
888
889
890 self._last_serial = jstore.ReadSerial()
891 assert self._last_serial is not None, ("Serial file was modified between"
892 " check in jstore and here")
893
894
895 self._nodes = dict((n.name, n.primary_ip)
896 for n in self.context.cfg.GetAllNodesInfo().values()
897 if n.master_candidate)
898
899
900 self._nodes.pop(self._my_hostname, None)
901
902
903
904 self._queue_size = 0
905 self._UpdateQueueSizeUnlocked()
906 self._drained = self._IsQueueMarkedDrain()
907
908
909 self._wpool = _JobQueueWorkerPool(self)
910 try:
911 self._InspectQueue()
912 except:
913 self._wpool.TerminateWorkers()
914 raise
915
916 @locking.ssynchronized(_LOCK)
917 @_RequireOpenQueue
919 """Loads the whole job queue and resumes unfinished jobs.
920
921 This function needs the lock here because WorkerPool.AddTask() may start a
922 job while we're still doing our work.
923
924 """
925 logging.info("Inspecting job queue")
926
927 all_job_ids = self._GetJobIDsUnlocked()
928 jobs_count = len(all_job_ids)
929 lastinfo = time.time()
930 for idx, job_id in enumerate(all_job_ids):
931
932 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
933 idx == (jobs_count - 1)):
934 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
935 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
936 lastinfo = time.time()
937
938 job = self._LoadJobUnlocked(job_id)
939
940
941 if job is None:
942 continue
943
944 status = job.CalcStatus()
945
946 if status in (constants.JOB_STATUS_QUEUED, ):
947 self._wpool.AddTask((job, ))
948
949 elif status in (constants.JOB_STATUS_RUNNING,
950 constants.JOB_STATUS_WAITLOCK,
951 constants.JOB_STATUS_CANCELING):
952 logging.warning("Unfinished job %s found: %s", job.id, job)
953 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
954 "Unclean master daemon shutdown")
955 self.UpdateJobUnlocked(job)
956
957 logging.info("Job queue inspection finished")
958
959 @locking.ssynchronized(_LOCK)
960 @_RequireOpenQueue
962 """Register a new node with the queue.
963
964 @type node: L{objects.Node}
965 @param node: the node object to be added
966
967 """
968 node_name = node.name
969 assert node_name != self._my_hostname
970
971
972 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
973 msg = result.fail_msg
974 if msg:
975 logging.warning("Cannot cleanup queue directory on node %s: %s",
976 node_name, msg)
977
978 if not node.master_candidate:
979
980 self._nodes.pop(node_name, None)
981
982 return
983
984
985 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
986
987
988 files.append(constants.JOB_QUEUE_SERIAL_FILE)
989
990 for file_name in files:
991
992 content = utils.ReadFile(file_name)
993
994 result = rpc.RpcRunner.call_jobqueue_update([node_name],
995 [node.primary_ip],
996 file_name, content)
997 msg = result[node_name].fail_msg
998 if msg:
999 logging.error("Failed to upload file %s to node %s: %s",
1000 file_name, node_name, msg)
1001
1002 self._nodes[node_name] = node.primary_ip
1003
1004 @locking.ssynchronized(_LOCK)
1005 @_RequireOpenQueue
1007 """Callback called when removing nodes from the cluster.
1008
1009 @type node_name: str
1010 @param node_name: the name of the node to remove
1011
1012 """
1013 self._nodes.pop(node_name, None)
1014
1015 @staticmethod
1017 """Verifies the status of an RPC call.
1018
1019 Since we aim to keep consistency should this node (the current
1020 master) fail, we will log errors if our rpc fail, and especially
1021 log the case when more than half of the nodes fails.
1022
1023 @param result: the data as returned from the rpc call
1024 @type nodes: list
1025 @param nodes: the list of nodes we made the call to
1026 @type failmsg: str
1027 @param failmsg: the identifier to be used for logging
1028
1029 """
1030 failed = []
1031 success = []
1032
1033 for node in nodes:
1034 msg = result[node].fail_msg
1035 if msg:
1036 failed.append(node)
1037 logging.error("RPC call %s (%s) failed on node %s: %s",
1038 result[node].call, failmsg, node, msg)
1039 else:
1040 success.append(node)
1041
1042
1043 if (len(success) + 1) < len(failed):
1044
1045 logging.error("More than half of the nodes failed")
1046
1048 """Helper for returning the node name/ip list.
1049
1050 @rtype: (list, list)
1051 @return: a tuple of two lists, the first one with the node
1052 names and the second one with the node addresses
1053
1054 """
1055
1056 name_list = self._nodes.keys()
1057 addr_list = [self._nodes[name] for name in name_list]
1058 return name_list, addr_list
1059
1061 """Writes a file locally and then replicates it to all nodes.
1062
1063 This function will replace the contents of a file on the local
1064 node and then replicate it to all the other nodes we have.
1065
1066 @type file_name: str
1067 @param file_name: the path of the file to be replicated
1068 @type data: str
1069 @param data: the new contents of the file
1070 @type replicate: boolean
1071 @param replicate: whether to spread the changes to the remote nodes
1072
1073 """
1074 utils.WriteFile(file_name, data=data)
1075
1076 if replicate:
1077 names, addrs = self._GetNodeIp()
1078 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1079 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1080
1082 """Renames a file locally and then replicate the change.
1083
1084 This function will rename a file in the local queue directory
1085 and then replicate this rename to all the other nodes we have.
1086
1087 @type rename: list of (old, new)
1088 @param rename: List containing tuples mapping old to new names
1089
1090 """
1091
1092 for old, new in rename:
1093 utils.RenameFile(old, new, mkdir=True)
1094
1095
1096 names, addrs = self._GetNodeIp()
1097 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1098 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1099
1100 @staticmethod
1120
1121 @classmethod
1123 """Returns the archive directory for a job.
1124
1125 @type job_id: str
1126 @param job_id: Job identifier
1127 @rtype: str
1128 @return: Directory name
1129
1130 """
1131 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1132
1134 """Generates a new job identifier.
1135
1136 Job identifiers are unique during the lifetime of a cluster.
1137
1138 @type count: integer
1139 @param count: how many serials to return
1140 @rtype: str
1141 @return: a string representing the job identifier.
1142
1143 """
1144 assert count > 0
1145
1146 serial = self._last_serial + count
1147
1148
1149 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1150 "%s\n" % serial, True)
1151
1152 result = [self._FormatJobID(v)
1153 for v in range(self._last_serial, serial + 1)]
1154
1155 self._last_serial = serial
1156
1157 return result
1158
1159 @staticmethod
1161 """Returns the job file for a given job id.
1162
1163 @type job_id: str
1164 @param job_id: the job identifier
1165 @rtype: str
1166 @return: the path to the job file
1167
1168 """
1169 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1170
1171 @classmethod
1173 """Returns the archived job file for a give job id.
1174
1175 @type job_id: str
1176 @param job_id: the job identifier
1177 @rtype: str
1178 @return: the path to the archived job file
1179
1180 """
1181 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1182 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1183
1185 """Return all known job IDs.
1186
1187 The method only looks at disk because it's a requirement that all
1188 jobs are present on disk (so in the _memcache we don't have any
1189 extra IDs).
1190
1191 @type sort: boolean
1192 @param sort: perform sorting on the returned job ids
1193 @rtype: list
1194 @return: the list of job IDs
1195
1196 """
1197 jlist = []
1198 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1199 m = self._RE_JOB_FILE.match(filename)
1200 if m:
1201 jlist.append(m.group(1))
1202 if sort:
1203 jlist = utils.NiceSort(jlist)
1204 return jlist
1205
1207 """Loads a job from the disk or memory.
1208
1209 Given a job id, this will return the cached job object if
1210 existing, or try to load the job from the disk. If loading from
1211 disk, it will also add the job to the cache.
1212
1213 @param job_id: the job id
1214 @rtype: L{_QueuedJob} or None
1215 @return: either None or the job object
1216
1217 """
1218 job = self._memcache.get(job_id, None)
1219 if job:
1220 logging.debug("Found job %s in memcache", job_id)
1221 return job
1222
1223 try:
1224 job = self._LoadJobFromDisk(job_id)
1225 if job is None:
1226 return job
1227 except errors.JobFileCorrupted:
1228 old_path = self._GetJobPath(job_id)
1229 new_path = self._GetArchivedJobPath(job_id)
1230 if old_path == new_path:
1231
1232 logging.exception("Can't parse job %s", job_id)
1233 else:
1234
1235 logging.exception("Can't parse job %s, will archive.", job_id)
1236 self._RenameFilesUnlocked([(old_path, new_path)])
1237 return None
1238
1239 self._memcache[job_id] = job
1240 logging.debug("Added job %s to the cache", job_id)
1241 return job
1242
1244 """Load the given job file from disk.
1245
1246 Given a job file, read, load and restore it in a _QueuedJob format.
1247
1248 @type job_id: string
1249 @param job_id: job identifier
1250 @rtype: L{_QueuedJob} or None
1251 @return: either None or the job object
1252
1253 """
1254 filepath = self._GetJobPath(job_id)
1255 logging.debug("Loading job from %s", filepath)
1256 try:
1257 raw_data = utils.ReadFile(filepath)
1258 except EnvironmentError, err:
1259 if err.errno in (errno.ENOENT, ):
1260 return None
1261 raise
1262
1263 try:
1264 data = serializer.LoadJson(raw_data)
1265 job = _QueuedJob.Restore(self, data)
1266 except Exception, err:
1267 raise errors.JobFileCorrupted(err)
1268
1269 return job
1270
1272 """Load the given job file from disk.
1273
1274 Given a job file, read, load and restore it in a _QueuedJob format.
1275 In case of error reading the job, it gets returned as None, and the
1276 exception is logged.
1277
1278 @type job_id: string
1279 @param job_id: job identifier
1280 @rtype: L{_QueuedJob} or None
1281 @return: either None or the job object
1282
1283 """
1284 try:
1285 return self._LoadJobFromDisk(job_id)
1286 except (errors.JobFileCorrupted, EnvironmentError):
1287 logging.exception("Can't load/parse job %s", job_id)
1288 return None
1289
1290 @staticmethod
1292 """Check if the queue is marked from drain.
1293
1294 This currently uses the queue drain file, which makes it a
1295 per-node flag. In the future this can be moved to the config file.
1296
1297 @rtype: boolean
1298 @return: True of the job queue is marked for draining
1299
1300 """
1301 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1302
1304 """Update the queue size.
1305
1306 """
1307 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1308
1309 @locking.ssynchronized(_LOCK)
1310 @_RequireOpenQueue
1326
1327 @_RequireOpenQueue
1329 """Create and store a new job.
1330
1331 This enters the job into our job queue and also puts it on the new
1332 queue, in order for it to be picked up by the queue processors.
1333
1334 @type job_id: job ID
1335 @param job_id: the job ID for the new job
1336 @type ops: list
1337 @param ops: The list of OpCodes that will become the new job.
1338 @rtype: L{_QueuedJob}
1339 @return: the job object to be queued
1340 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1341 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1342
1343 """
1344
1345
1346 if self._drained:
1347 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1348
1349 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1350 raise errors.JobQueueFull()
1351
1352 job = _QueuedJob(self, job_id, ops)
1353
1354
1355 self.UpdateJobUnlocked(job)
1356
1357 self._queue_size += 1
1358
1359 logging.debug("Adding new job %s to the cache", job_id)
1360 self._memcache[job_id] = job
1361
1362 return job
1363
1364 @locking.ssynchronized(_LOCK)
1365 @_RequireOpenQueue
1375
1376 @locking.ssynchronized(_LOCK)
1377 @_RequireOpenQueue
1379 """Create and store multiple jobs.
1380
1381 @see: L{_SubmitJobUnlocked}
1382
1383 """
1384 results = []
1385 tasks = []
1386 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1387 for job_id, ops in zip(all_job_ids, jobs):
1388 try:
1389 tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1390 status = True
1391 data = job_id
1392 except errors.GenericError, err:
1393 data = str(err)
1394 status = False
1395 results.append((status, data))
1396 self._wpool.AddManyTasks(tasks)
1397
1398 return results
1399
1400 @_RequireOpenQueue
1402 """Update a job's on disk storage.
1403
1404 After a job has been modified, this function needs to be called in
1405 order to write the changes to disk and replicate them to the other
1406 nodes.
1407
1408 @type job: L{_QueuedJob}
1409 @param job: the changed job
1410 @type replicate: boolean
1411 @param replicate: whether to replicate the change to remote nodes
1412
1413 """
1414 filename = self._GetJobPath(job.id)
1415 data = serializer.DumpJson(job.Serialize(), indent=False)
1416 logging.debug("Writing job %s to %s", job.id, filename)
1417 self._UpdateJobQueueFile(filename, data, replicate)
1418
1419 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1420 timeout):
1421 """Waits for changes in a job.
1422
1423 @type job_id: string
1424 @param job_id: Job identifier
1425 @type fields: list of strings
1426 @param fields: Which fields to check for changes
1427 @type prev_job_info: list or None
1428 @param prev_job_info: Last job information returned
1429 @type prev_log_serial: int
1430 @param prev_log_serial: Last job message serial number
1431 @type timeout: float
1432 @param timeout: maximum time to wait in seconds
1433 @rtype: tuple (job info, log entries)
1434 @return: a tuple of the job information as required via
1435 the fields parameter, and the log entries as a list
1436
1437 if the job has not changed and the timeout has expired,
1438 we instead return a special value,
1439 L{constants.JOB_NOTCHANGED}, which should be interpreted
1440 as such by the clients
1441
1442 """
1443 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1444
1445 helper = _WaitForJobChangesHelper()
1446
1447 return helper(self._GetJobPath(job_id), load_fn,
1448 fields, prev_job_info, prev_log_serial, timeout)
1449
1450 @locking.ssynchronized(_LOCK)
1451 @_RequireOpenQueue
1488
1489 @_RequireOpenQueue
1491 """Archives jobs.
1492
1493 @type jobs: list of L{_QueuedJob}
1494 @param jobs: Job objects
1495 @rtype: int
1496 @return: Number of archived jobs
1497
1498 """
1499 archive_jobs = []
1500 rename_files = []
1501 for job in jobs:
1502 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1503 logging.debug("Job %s is not yet done", job.id)
1504 continue
1505
1506 archive_jobs.append(job)
1507
1508 old = self._GetJobPath(job.id)
1509 new = self._GetArchivedJobPath(job.id)
1510 rename_files.append((old, new))
1511
1512
1513 self._RenameFilesUnlocked(rename_files)
1514
1515 logging.debug("Successfully archived job(s) %s",
1516 utils.CommaJoin(job.id for job in archive_jobs))
1517
1518
1519
1520
1521
1522 self._UpdateQueueSizeUnlocked()
1523 return len(archive_jobs)
1524
1525 @locking.ssynchronized(_LOCK)
1526 @_RequireOpenQueue
1528 """Archives a job.
1529
1530 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1531
1532 @type job_id: string
1533 @param job_id: Job ID of job to be archived.
1534 @rtype: bool
1535 @return: Whether job was archived
1536
1537 """
1538 logging.info("Archiving job %s", job_id)
1539
1540 job = self._LoadJobUnlocked(job_id)
1541 if not job:
1542 logging.debug("Job %s not found", job_id)
1543 return False
1544
1545 return self._ArchiveJobsUnlocked([job]) == 1
1546
1547 @locking.ssynchronized(_LOCK)
1548 @_RequireOpenQueue
1550 """Archives all jobs based on age.
1551
1552 The method will archive all jobs which are older than the age
1553 parameter. For jobs that don't have an end timestamp, the start
1554 timestamp will be considered. The special '-1' age will cause
1555 archival of all jobs (that are not running or queued).
1556
1557 @type age: int
1558 @param age: the minimum age in seconds
1559
1560 """
1561 logging.info("Archiving jobs with age more than %s seconds", age)
1562
1563 now = time.time()
1564 end_time = now + timeout
1565 archived_count = 0
1566 last_touched = 0
1567
1568 all_job_ids = self._GetJobIDsUnlocked()
1569 pending = []
1570 for idx, job_id in enumerate(all_job_ids):
1571 last_touched = idx + 1
1572
1573
1574
1575
1576 if time.time() > end_time:
1577 break
1578
1579
1580 job = self._LoadJobUnlocked(job_id)
1581 if job:
1582 if job.end_timestamp is None:
1583 if job.start_timestamp is None:
1584 job_age = job.received_timestamp
1585 else:
1586 job_age = job.start_timestamp
1587 else:
1588 job_age = job.end_timestamp
1589
1590 if age == -1 or now - job_age[0] > age:
1591 pending.append(job)
1592
1593
1594 if len(pending) >= 10:
1595 archived_count += self._ArchiveJobsUnlocked(pending)
1596 pending = []
1597
1598 if pending:
1599 archived_count += self._ArchiveJobsUnlocked(pending)
1600
1601 return (archived_count, len(all_job_ids) - last_touched)
1602
1604 """Returns a list of jobs in queue.
1605
1606 @type job_ids: list
1607 @param job_ids: sequence of job identifiers or None for all
1608 @type fields: list
1609 @param fields: names of fields to return
1610 @rtype: list
1611 @return: list one element per job, each element being list with
1612 the requested fields
1613
1614 """
1615 jobs = []
1616 list_all = False
1617 if not job_ids:
1618
1619
1620 job_ids = self._GetJobIDsUnlocked()
1621 list_all = True
1622
1623 for job_id in job_ids:
1624 job = self.SafeLoadJobFromDisk(job_id)
1625 if job is not None:
1626 jobs.append(job.GetInfo(fields))
1627 elif not list_all:
1628 jobs.append(None)
1629
1630 return jobs
1631
1632 @locking.ssynchronized(_LOCK)
1633 @_RequireOpenQueue
1635 """Stops the job queue.
1636
1637 This shutdowns all the worker threads an closes the queue.
1638
1639 """
1640 self._wpool.TerminateWorkers()
1641
1642 self._queue_filelock.Close()
1643 self._queue_filelock = None
1644