1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28 processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
56 """Special exception to cancel a job.
57
58 """
59
62 """Returns the current timestamp.
63
64 @rtype: tuple
65 @return: the current time in the (seconds, microseconds) format
66
67 """
68 return utils.SplitTime(time.time())
69
72 """Encapsulates an opcode object.
73
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81 @ivar stop_timestamp: timestamp for the end of the execution
82
83 """
84 __slots__ = ["input", "status", "result", "log",
85 "start_timestamp", "exec_timestamp", "end_timestamp",
86 "__weakref__"]
87
89 """Constructor for the _QuededOpCode.
90
91 @type op: L{opcodes.OpCode}
92 @param op: the opcode we encapsulate
93
94 """
95 self.input = op
96 self.status = constants.OP_STATUS_QUEUED
97 self.result = None
98 self.log = []
99 self.start_timestamp = None
100 self.exec_timestamp = None
101 self.end_timestamp = None
102
103 @classmethod
105 """Restore the _QueuedOpCode from the serialized form.
106
107 @type state: dict
108 @param state: the serialized state
109 @rtype: _QueuedOpCode
110 @return: a new _QueuedOpCode instance
111
112 """
113 obj = _QueuedOpCode.__new__(cls)
114 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115 obj.status = state["status"]
116 obj.result = state["result"]
117 obj.log = state["log"]
118 obj.start_timestamp = state.get("start_timestamp", None)
119 obj.exec_timestamp = state.get("exec_timestamp", None)
120 obj.end_timestamp = state.get("end_timestamp", None)
121 return obj
122
124 """Serializes this _QueuedOpCode.
125
126 @rtype: dict
127 @return: the dictionary holding the serialized state
128
129 """
130 return {
131 "input": self.input.__getstate__(),
132 "status": self.status,
133 "result": self.result,
134 "log": self.log,
135 "start_timestamp": self.start_timestamp,
136 "exec_timestamp": self.exec_timestamp,
137 "end_timestamp": self.end_timestamp,
138 }
139
142 """In-memory job representation.
143
144 This is what we use to track the user-submitted jobs. Locking must
145 be taken care of by users of this class.
146
147 @type queue: L{JobQueue}
148 @ivar queue: the parent queue
149 @ivar id: the job ID
150 @type ops: list
151 @ivar ops: the list of _QueuedOpCode that constitute the job
152 @type log_serial: int
153 @ivar log_serial: holds the index for the next log entry
154 @ivar received_timestamp: the timestamp for when the job was received
155 @ivar start_timestmap: the timestamp for start of execution
156 @ivar end_timestamp: the timestamp for end of execution
157 @ivar lock_status: In-memory locking information for debugging
158 @ivar change: a Condition variable we use for waiting for job changes
159
160 """
161
162 __slots__ = ["queue", "id", "ops", "log_serial",
163 "received_timestamp", "start_timestamp", "end_timestamp",
164 "lock_status", "change",
165 "__weakref__"]
166
167 - def __init__(self, queue, job_id, ops):
168 """Constructor for the _QueuedJob.
169
170 @type queue: L{JobQueue}
171 @param queue: our parent queue
172 @type job_id: job_id
173 @param job_id: our job id
174 @type ops: list
175 @param ops: the list of opcodes we hold, which will be encapsulated
176 in _QueuedOpCodes
177
178 """
179 if not ops:
180
181 raise Exception("No opcodes")
182
183 self.queue = queue
184 self.id = job_id
185 self.ops = [_QueuedOpCode(op) for op in ops]
186 self.log_serial = 0
187 self.received_timestamp = TimeStampNow()
188 self.start_timestamp = None
189 self.end_timestamp = None
190
191
192 self.lock_status = None
193
194
195 self.change = threading.Condition(self.queue._lock)
196
198 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
199 "id=%s" % self.id,
200 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
201
202 return "<%s at %#x>" % (" ".join(status), id(self))
203
204 @classmethod
206 """Restore a _QueuedJob from serialized state:
207
208 @type queue: L{JobQueue}
209 @param queue: to which queue the restored job belongs
210 @type state: dict
211 @param state: the serialized state
212 @rtype: _JobQueue
213 @return: the restored _JobQueue instance
214
215 """
216 obj = _QueuedJob.__new__(cls)
217 obj.queue = queue
218 obj.id = state["id"]
219 obj.received_timestamp = state.get("received_timestamp", None)
220 obj.start_timestamp = state.get("start_timestamp", None)
221 obj.end_timestamp = state.get("end_timestamp", None)
222
223
224 obj.lock_status = None
225
226 obj.ops = []
227 obj.log_serial = 0
228 for op_state in state["ops"]:
229 op = _QueuedOpCode.Restore(op_state)
230 for log_entry in op.log:
231 obj.log_serial = max(obj.log_serial, log_entry[0])
232 obj.ops.append(op)
233
234
235 obj.change = threading.Condition(obj.queue._lock)
236
237 return obj
238
240 """Serialize the _JobQueue instance.
241
242 @rtype: dict
243 @return: the serialized state
244
245 """
246 return {
247 "id": self.id,
248 "ops": [op.Serialize() for op in self.ops],
249 "start_timestamp": self.start_timestamp,
250 "end_timestamp": self.end_timestamp,
251 "received_timestamp": self.received_timestamp,
252 }
253
306
308 """Selectively returns the log entries.
309
310 @type newer_than: None or int
311 @param newer_than: if this is None, return all log entries,
312 otherwise return only the log entries with serial higher
313 than this value
314 @rtype: list
315 @return: the list of the log entries selected
316
317 """
318 if newer_than is None:
319 serial = -1
320 else:
321 serial = newer_than
322
323 entries = []
324 for op in self.ops:
325 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
326
327 return entries
328
330 """Mark unfinished opcodes with a given status and result.
331
332 This is an utility function for marking all running or waiting to
333 be run opcodes with a given status. Opcodes which are already
334 finalised are not changed.
335
336 @param status: a given opcode status
337 @param result: the opcode result
338
339 """
340 not_marked = True
341 for op in self.ops:
342 if op.status in constants.OPS_FINALIZED:
343 assert not_marked, "Finalized opcodes found after non-finalized ones"
344 continue
345 op.status = status
346 op.result = result
347 not_marked = False
348
352 """Initializes this class.
353
354 @type queue: L{JobQueue}
355 @param queue: Job queue
356 @type job: L{_QueuedJob}
357 @param job: Job object
358 @type op: L{_QueuedOpCode}
359 @param op: OpCode
360
361 """
362 assert queue, "Queue is missing"
363 assert job, "Job is missing"
364 assert op, "Opcode is missing"
365
366 self._queue = queue
367 self._job = job
368 self._op = op
369
371 """Mark the opcode as running, not lock-waiting.
372
373 This is called from the mcpu code as a notifier function, when the LU is
374 finally about to start the Exec() method. Of course, to have end-user
375 visible results, the opcode must be initially (before calling into
376 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
377
378 """
379 self._queue.acquire()
380 try:
381 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
382 constants.OP_STATUS_CANCELING)
383
384
385 self._job.lock_status = None
386
387
388 if self._op.status == constants.OP_STATUS_CANCELING:
389 raise CancelJob()
390
391 self._op.status = constants.OP_STATUS_RUNNING
392 self._op.exec_timestamp = TimeStampNow()
393 finally:
394 self._queue.release()
395
397 """Append a log entry.
398
399 """
400 assert len(args) < 3
401
402 if len(args) == 1:
403 log_type = constants.ELOG_MESSAGE
404 log_msg = args[0]
405 else:
406 (log_type, log_msg) = args
407
408
409
410 timestamp = utils.SplitTime(time.time())
411
412 self._queue.acquire()
413 try:
414 self._job.log_serial += 1
415 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
416
417 self._job.change.notifyAll()
418 finally:
419 self._queue.release()
420
422 """Write locking information to the job.
423
424 Called whenever the LU processor is waiting for a lock or has acquired one.
425
426 """
427
428 self._job.lock_status = msg
429
432 """The actual job workers.
433
434 """
436 """Job executor.
437
438 This functions processes a job. It is closely tied to the _QueuedJob and
439 _QueuedOpCode classes.
440
441 @type job: L{_QueuedJob}
442 @param job: the job to be processed
443
444 """
445 logging.info("Processing job %s", job.id)
446 proc = mcpu.Processor(self.pool.queue.context, job.id)
447 queue = job.queue
448 try:
449 try:
450 count = len(job.ops)
451 for idx, op in enumerate(job.ops):
452 op_summary = op.input.Summary()
453 if op.status == constants.OP_STATUS_SUCCESS:
454
455
456
457
458
459 logging.info("Op %s/%s: opcode %s already processed, skipping",
460 idx + 1, count, op_summary)
461 continue
462 try:
463 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
464 op_summary)
465
466 queue.acquire()
467 try:
468 if op.status == constants.OP_STATUS_CANCELED:
469 raise CancelJob()
470 assert op.status == constants.OP_STATUS_QUEUED
471 op.status = constants.OP_STATUS_WAITLOCK
472 op.result = None
473 op.start_timestamp = TimeStampNow()
474 if idx == 0:
475 job.start_timestamp = op.start_timestamp
476 queue.UpdateJobUnlocked(job)
477
478 input_opcode = op.input
479 finally:
480 queue.release()
481
482
483 result = proc.ExecOpCode(input_opcode,
484 _OpExecCallbacks(queue, job, op))
485
486 queue.acquire()
487 try:
488 op.status = constants.OP_STATUS_SUCCESS
489 op.result = result
490 op.end_timestamp = TimeStampNow()
491 queue.UpdateJobUnlocked(job)
492 finally:
493 queue.release()
494
495 logging.info("Op %s/%s: Successfully finished opcode %s",
496 idx + 1, count, op_summary)
497 except CancelJob:
498
499 raise
500 except Exception, err:
501 queue.acquire()
502 try:
503 try:
504 op.status = constants.OP_STATUS_ERROR
505 if isinstance(err, errors.GenericError):
506 op.result = errors.EncodeException(err)
507 else:
508 op.result = str(err)
509 op.end_timestamp = TimeStampNow()
510 logging.info("Op %s/%s: Error in opcode %s: %s",
511 idx + 1, count, op_summary, err)
512 finally:
513 queue.UpdateJobUnlocked(job)
514 finally:
515 queue.release()
516 raise
517
518 except CancelJob:
519 queue.acquire()
520 try:
521 queue.CancelJobUnlocked(job)
522 finally:
523 queue.release()
524 except errors.GenericError, err:
525 logging.exception("Ganeti exception")
526 except:
527 logging.exception("Unhandled exception")
528 finally:
529 queue.acquire()
530 try:
531 try:
532 job.lock_status = None
533 job.end_timestamp = TimeStampNow()
534 queue.UpdateJobUnlocked(job)
535 finally:
536 job_id = job.id
537 status = job.CalcStatus()
538 finally:
539 queue.release()
540
541 logging.info("Finished job %s, status = %s", job_id, status)
542
545 """Simple class implementing a job-processing workerpool.
546
547 """
553
556 """Decorator for "public" functions.
557
558 This function should be used for all 'public' functions. That is,
559 functions usually called from other classes. Note that this should
560 be applied only to methods (not plain functions), since it expects
561 that the decorated function is called with a first argument that has
562 a '_queue_lock' argument.
563
564 @warning: Use this decorator only after utils.LockedMethod!
565
566 Example::
567 @utils.LockedMethod
568 @_RequireOpenQueue
569 def Example(self):
570 pass
571
572 """
573 def wrapper(self, *args, **kwargs):
574
575 assert self._queue_lock is not None, "Queue should be open"
576 return fn(self, *args, **kwargs)
577 return wrapper
578
581 """Queue used to manage the jobs.
582
583 @cvar _RE_JOB_FILE: regex matching the valid job file names
584
585 """
586 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
587
589 """Constructor for JobQueue.
590
591 The constructor will initialize the job queue object and then
592 start loading the current jobs from disk, either for starting them
593 (if they were queue) or for aborting them (if they were already
594 running).
595
596 @type context: GanetiContext
597 @param context: the context object for access to the configuration
598 data and other ganeti objects
599
600 """
601 self.context = context
602 self._memcache = weakref.WeakValueDictionary()
603 self._my_hostname = utils.HostInfo().name
604
605
606 self._lock = threading.Lock()
607 self.acquire = self._lock.acquire
608 self.release = self._lock.release
609
610
611 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
612
613
614 self._last_serial = jstore.ReadSerial()
615 assert self._last_serial is not None, ("Serial file was modified between"
616 " check in jstore and here")
617
618
619 self._nodes = dict((n.name, n.primary_ip)
620 for n in self.context.cfg.GetAllNodesInfo().values()
621 if n.master_candidate)
622
623
624 try:
625 del self._nodes[self._my_hostname]
626 except KeyError:
627 pass
628
629
630
631
632 self._wpool = _JobQueueWorkerPool(self)
633 try:
634
635
636 self.acquire()
637 try:
638 logging.info("Inspecting job queue")
639
640 all_job_ids = self._GetJobIDsUnlocked()
641 jobs_count = len(all_job_ids)
642 lastinfo = time.time()
643 for idx, job_id in enumerate(all_job_ids):
644
645 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
646 idx == (jobs_count - 1)):
647 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
648 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
649 lastinfo = time.time()
650
651 job = self._LoadJobUnlocked(job_id)
652
653
654 if job is None:
655 continue
656
657 status = job.CalcStatus()
658
659 if status in (constants.JOB_STATUS_QUEUED, ):
660 self._wpool.AddTask(job)
661
662 elif status in (constants.JOB_STATUS_RUNNING,
663 constants.JOB_STATUS_WAITLOCK,
664 constants.JOB_STATUS_CANCELING):
665 logging.warning("Unfinished job %s found: %s", job.id, job)
666 try:
667 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
668 "Unclean master daemon shutdown")
669 finally:
670 self.UpdateJobUnlocked(job)
671
672 logging.info("Job queue inspection finished")
673 finally:
674 self.release()
675 except:
676 self._wpool.TerminateWorkers()
677 raise
678
679 @utils.LockedMethod
680 @_RequireOpenQueue
682 """Register a new node with the queue.
683
684 @type node: L{objects.Node}
685 @param node: the node object to be added
686
687 """
688 node_name = node.name
689 assert node_name != self._my_hostname
690
691
692 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
693 msg = result.fail_msg
694 if msg:
695 logging.warning("Cannot cleanup queue directory on node %s: %s",
696 node_name, msg)
697
698 if not node.master_candidate:
699
700 self._nodes.pop(node_name, None)
701
702 return
703
704
705 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
706
707
708 files.append(constants.JOB_QUEUE_SERIAL_FILE)
709
710 for file_name in files:
711
712 content = utils.ReadFile(file_name)
713
714 result = rpc.RpcRunner.call_jobqueue_update([node_name],
715 [node.primary_ip],
716 file_name, content)
717 msg = result[node_name].fail_msg
718 if msg:
719 logging.error("Failed to upload file %s to node %s: %s",
720 file_name, node_name, msg)
721
722 self._nodes[node_name] = node.primary_ip
723
724 @utils.LockedMethod
725 @_RequireOpenQueue
727 """Callback called when removing nodes from the cluster.
728
729 @type node_name: str
730 @param node_name: the name of the node to remove
731
732 """
733 try:
734
735 del self._nodes[node_name]
736 except KeyError:
737 pass
738
739 @staticmethod
741 """Verifies the status of an RPC call.
742
743 Since we aim to keep consistency should this node (the current
744 master) fail, we will log errors if our rpc fail, and especially
745 log the case when more than half of the nodes fails.
746
747 @param result: the data as returned from the rpc call
748 @type nodes: list
749 @param nodes: the list of nodes we made the call to
750 @type failmsg: str
751 @param failmsg: the identifier to be used for logging
752
753 """
754 failed = []
755 success = []
756
757 for node in nodes:
758 msg = result[node].fail_msg
759 if msg:
760 failed.append(node)
761 logging.error("RPC call %s (%s) failed on node %s: %s",
762 result[node].call, failmsg, node, msg)
763 else:
764 success.append(node)
765
766
767 if (len(success) + 1) < len(failed):
768
769 logging.error("More than half of the nodes failed")
770
772 """Helper for returning the node name/ip list.
773
774 @rtype: (list, list)
775 @return: a tuple of two lists, the first one with the node
776 names and the second one with the node addresses
777
778 """
779 name_list = self._nodes.keys()
780 addr_list = [self._nodes[name] for name in name_list]
781 return name_list, addr_list
782
784 """Writes a file locally and then replicates it to all nodes.
785
786 This function will replace the contents of a file on the local
787 node and then replicate it to all the other nodes we have.
788
789 @type file_name: str
790 @param file_name: the path of the file to be replicated
791 @type data: str
792 @param data: the new contents of the file
793
794 """
795 utils.WriteFile(file_name, data=data)
796
797 names, addrs = self._GetNodeIp()
798 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
799 self._CheckRpcResult(result, self._nodes,
800 "Updating %s" % file_name)
801
803 """Renames a file locally and then replicate the change.
804
805 This function will rename a file in the local queue directory
806 and then replicate this rename to all the other nodes we have.
807
808 @type rename: list of (old, new)
809 @param rename: List containing tuples mapping old to new names
810
811 """
812
813 for old, new in rename:
814 utils.RenameFile(old, new, mkdir=True)
815
816
817 names, addrs = self._GetNodeIp()
818 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
819 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
820
821 @staticmethod
841
842 @classmethod
844 """Returns the archive directory for a job.
845
846 @type job_id: str
847 @param job_id: Job identifier
848 @rtype: str
849 @return: Directory name
850
851 """
852 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
853
855 """Generates a new job identifier.
856
857 Job identifiers are unique during the lifetime of a cluster.
858
859 @type count: integer
860 @param count: how many serials to return
861 @rtype: str
862 @return: a string representing the job identifier.
863
864 """
865 assert count > 0
866
867 serial = self._last_serial + count
868
869
870 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
871 "%s\n" % serial)
872
873 result = [self._FormatJobID(v)
874 for v in range(self._last_serial, serial + 1)]
875
876 self._last_serial = serial
877
878 return result
879
880 @staticmethod
882 """Returns the job file for a given job id.
883
884 @type job_id: str
885 @param job_id: the job identifier
886 @rtype: str
887 @return: the path to the job file
888
889 """
890 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
891
892 @classmethod
904
905 @classmethod
907 """Extract the job id from a filename.
908
909 @type name: str
910 @param name: the job filename
911 @rtype: job id or None
912 @return: the job id corresponding to the given filename,
913 or None if the filename does not represent a valid
914 job file
915
916 """
917 m = cls._RE_JOB_FILE.match(name)
918 if m:
919 return m.group(1)
920 else:
921 return None
922
924 """Return all known job IDs.
925
926 If the parameter archived is True, archived jobs IDs will be
927 included. Currently this argument is unused.
928
929 The method only looks at disk because it's a requirement that all
930 jobs are present on disk (so in the _memcache we don't have any
931 extra IDs).
932
933 @rtype: list
934 @return: the list of job IDs
935
936 """
937
938 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
939 jlist = utils.NiceSort(jlist)
940 return jlist
941
951
953 """Loads a job from the disk or memory.
954
955 Given a job id, this will return the cached job object if
956 existing, or try to load the job from the disk. If loading from
957 disk, it will also add the job to the cache.
958
959 @param job_id: the job id
960 @rtype: L{_QueuedJob} or None
961 @return: either None or the job object
962
963 """
964 job = self._memcache.get(job_id, None)
965 if job:
966 logging.debug("Found job %s in memcache", job_id)
967 return job
968
969 filepath = self._GetJobPath(job_id)
970 logging.debug("Loading job from %s", filepath)
971 try:
972 raw_data = utils.ReadFile(filepath)
973 except IOError, err:
974 if err.errno in (errno.ENOENT, ):
975 return None
976 raise
977
978 data = serializer.LoadJson(raw_data)
979
980 try:
981 job = _QueuedJob.Restore(self, data)
982 except Exception, err:
983 new_path = self._GetArchivedJobPath(job_id)
984 if filepath == new_path:
985
986 logging.exception("Can't parse job %s", job_id)
987 else:
988
989 logging.exception("Can't parse job %s, will archive.", job_id)
990 self._RenameFilesUnlocked([(filepath, new_path)])
991 return None
992
993 self._memcache[job_id] = job
994 logging.debug("Added job %s to the cache", job_id)
995 return job
996
998 """Return a list of jobs based on their IDs.
999
1000 @type job_ids: list
1001 @param job_ids: either an empty list (meaning all jobs),
1002 or a list of job IDs
1003 @rtype: list
1004 @return: the list of job objects
1005
1006 """
1007 if not job_ids:
1008 job_ids = self._GetJobIDsUnlocked()
1009
1010 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1011
1012 @staticmethod
1014 """Check if the queue is marked from drain.
1015
1016 This currently uses the queue drain file, which makes it a
1017 per-node flag. In the future this can be moved to the config file.
1018
1019 @rtype: boolean
1020 @return: True of the job queue is marked for draining
1021
1022 """
1023 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1024
1025 @staticmethod
1041
1042 @_RequireOpenQueue
1044 """Create and store a new job.
1045
1046 This enters the job into our job queue and also puts it on the new
1047 queue, in order for it to be picked up by the queue processors.
1048
1049 @type job_id: job ID
1050 @param job_id: the job ID for the new job
1051 @type ops: list
1052 @param ops: The list of OpCodes that will become the new job.
1053 @rtype: job ID
1054 @return: the job ID of the newly created job
1055 @raise errors.JobQueueDrainError: if the job is marked for draining
1056
1057 """
1058 if self._IsQueueMarkedDrain():
1059 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1060
1061
1062 size = len(self._ListJobFiles())
1063 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1064
1065
1066
1067 pass
1068
1069 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1070 raise errors.JobQueueFull()
1071
1072 job = _QueuedJob(self, job_id, ops)
1073
1074
1075 self.UpdateJobUnlocked(job)
1076
1077 logging.debug("Adding new job %s to the cache", job_id)
1078 self._memcache[job_id] = job
1079
1080
1081 self._wpool.AddTask(job)
1082
1083 return job.id
1084
1085 @utils.LockedMethod
1086 @_RequireOpenQueue
1095
1096 @utils.LockedMethod
1097 @_RequireOpenQueue
1099 """Create and store multiple jobs.
1100
1101 @see: L{_SubmitJobUnlocked}
1102
1103 """
1104 results = []
1105 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1106 for job_id, ops in zip(all_job_ids, jobs):
1107 try:
1108 data = self._SubmitJobUnlocked(job_id, ops)
1109 status = True
1110 except errors.GenericError, err:
1111 data = str(err)
1112 status = False
1113 results.append((status, data))
1114
1115 return results
1116
1117 @_RequireOpenQueue
1119 """Update a job's on disk storage.
1120
1121 After a job has been modified, this function needs to be called in
1122 order to write the changes to disk and replicate them to the other
1123 nodes.
1124
1125 @type job: L{_QueuedJob}
1126 @param job: the changed job
1127
1128 """
1129 filename = self._GetJobPath(job.id)
1130 data = serializer.DumpJson(job.Serialize(), indent=False)
1131 logging.debug("Writing job %s to %s", job.id, filename)
1132 self._WriteAndReplicateFileUnlocked(filename, data)
1133
1134
1135 job.change.notifyAll()
1136
1137 @utils.LockedMethod
1138 @_RequireOpenQueue
1139 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1140 timeout):
1141 """Waits for changes in a job.
1142
1143 @type job_id: string
1144 @param job_id: Job identifier
1145 @type fields: list of strings
1146 @param fields: Which fields to check for changes
1147 @type prev_job_info: list or None
1148 @param prev_job_info: Last job information returned
1149 @type prev_log_serial: int
1150 @param prev_log_serial: Last job message serial number
1151 @type timeout: float
1152 @param timeout: maximum time to wait
1153 @rtype: tuple (job info, log entries)
1154 @return: a tuple of the job information as required via
1155 the fields parameter, and the log entries as a list
1156
1157 if the job has not changed and the timeout has expired,
1158 we instead return a special value,
1159 L{constants.JOB_NOTCHANGED}, which should be interpreted
1160 as such by the clients
1161
1162 """
1163 job = self._LoadJobUnlocked(job_id)
1164 if not job:
1165 logging.debug("Job %s not found", job_id)
1166 return None
1167
1168 def _CheckForChanges():
1169 logging.debug("Waiting for changes in job %s", job_id)
1170
1171 status = job.CalcStatus()
1172 job_info = self._GetJobInfoUnlocked(job, fields)
1173 log_entries = job.GetLogEntries(prev_log_serial)
1174
1175
1176
1177
1178
1179
1180 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1181 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1182
1183
1184
1185 if (status not in (constants.JOB_STATUS_QUEUED,
1186 constants.JOB_STATUS_RUNNING,
1187 constants.JOB_STATUS_WAITLOCK) or
1188 prev_job_info != job_info or
1189 (log_entries and prev_log_serial != log_entries[0][0])):
1190 logging.debug("Job %s changed", job_id)
1191 return (job_info, log_entries)
1192
1193 raise utils.RetryAgain()
1194
1195 try:
1196
1197 return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1198 wait_fn=job.change.wait)
1199 except utils.RetryTimeout:
1200 return constants.JOB_NOTCHANGED
1201
1202 @utils.LockedMethod
1203 @_RequireOpenQueue
1238
1239 @_RequireOpenQueue
1249
1250 @_RequireOpenQueue
1252 """Archives jobs.
1253
1254 @type jobs: list of L{_QueuedJob}
1255 @param jobs: Job objects
1256 @rtype: int
1257 @return: Number of archived jobs
1258
1259 """
1260 archive_jobs = []
1261 rename_files = []
1262 for job in jobs:
1263 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1264 constants.JOB_STATUS_SUCCESS,
1265 constants.JOB_STATUS_ERROR):
1266 logging.debug("Job %s is not yet done", job.id)
1267 continue
1268
1269 archive_jobs.append(job)
1270
1271 old = self._GetJobPath(job.id)
1272 new = self._GetArchivedJobPath(job.id)
1273 rename_files.append((old, new))
1274
1275
1276 self._RenameFilesUnlocked(rename_files)
1277
1278 logging.debug("Successfully archived job(s) %s",
1279 utils.CommaJoin(job.id for job in archive_jobs))
1280
1281 return len(archive_jobs)
1282
1283 @utils.LockedMethod
1284 @_RequireOpenQueue
1286 """Archives a job.
1287
1288 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1289
1290 @type job_id: string
1291 @param job_id: Job ID of job to be archived.
1292 @rtype: bool
1293 @return: Whether job was archived
1294
1295 """
1296 logging.info("Archiving job %s", job_id)
1297
1298 job = self._LoadJobUnlocked(job_id)
1299 if not job:
1300 logging.debug("Job %s not found", job_id)
1301 return False
1302
1303 return self._ArchiveJobsUnlocked([job]) == 1
1304
1305 @utils.LockedMethod
1306 @_RequireOpenQueue
1308 """Archives all jobs based on age.
1309
1310 The method will archive all jobs which are older than the age
1311 parameter. For jobs that don't have an end timestamp, the start
1312 timestamp will be considered. The special '-1' age will cause
1313 archival of all jobs (that are not running or queued).
1314
1315 @type age: int
1316 @param age: the minimum age in seconds
1317
1318 """
1319 logging.info("Archiving jobs with age more than %s seconds", age)
1320
1321 now = time.time()
1322 end_time = now + timeout
1323 archived_count = 0
1324 last_touched = 0
1325
1326 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1327 pending = []
1328 for idx, job_id in enumerate(all_job_ids):
1329 last_touched = idx + 1
1330
1331
1332
1333
1334 if time.time() > end_time:
1335 break
1336
1337
1338 job = self._LoadJobUnlocked(job_id)
1339 if job:
1340 if job.end_timestamp is None:
1341 if job.start_timestamp is None:
1342 job_age = job.received_timestamp
1343 else:
1344 job_age = job.start_timestamp
1345 else:
1346 job_age = job.end_timestamp
1347
1348 if age == -1 or now - job_age[0] > age:
1349 pending.append(job)
1350
1351
1352 if len(pending) >= 10:
1353 archived_count += self._ArchiveJobsUnlocked(pending)
1354 pending = []
1355
1356 if pending:
1357 archived_count += self._ArchiveJobsUnlocked(pending)
1358
1359 return (archived_count, len(all_job_ids) - last_touched)
1360
1361 @staticmethod
1363 """Returns information about a job.
1364
1365 @type job: L{_QueuedJob}
1366 @param job: the job which we query
1367 @type fields: list
1368 @param fields: names of fields to return
1369 @rtype: list
1370 @return: list with one element for each field
1371 @raise errors.OpExecError: when an invalid field
1372 has been passed
1373
1374 """
1375 row = []
1376 for fname in fields:
1377 if fname == "id":
1378 row.append(job.id)
1379 elif fname == "status":
1380 row.append(job.CalcStatus())
1381 elif fname == "ops":
1382 row.append([op.input.__getstate__() for op in job.ops])
1383 elif fname == "opresult":
1384 row.append([op.result for op in job.ops])
1385 elif fname == "opstatus":
1386 row.append([op.status for op in job.ops])
1387 elif fname == "oplog":
1388 row.append([op.log for op in job.ops])
1389 elif fname == "opstart":
1390 row.append([op.start_timestamp for op in job.ops])
1391 elif fname == "opexec":
1392 row.append([op.exec_timestamp for op in job.ops])
1393 elif fname == "opend":
1394 row.append([op.end_timestamp for op in job.ops])
1395 elif fname == "received_ts":
1396 row.append(job.received_timestamp)
1397 elif fname == "start_ts":
1398 row.append(job.start_timestamp)
1399 elif fname == "end_ts":
1400 row.append(job.end_timestamp)
1401 elif fname == "lock_status":
1402 row.append(job.lock_status)
1403 elif fname == "summary":
1404 row.append([op.input.Summary() for op in job.ops])
1405 else:
1406 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1407 return row
1408
1409 @utils.LockedMethod
1410 @_RequireOpenQueue
1412 """Returns a list of jobs in queue.
1413
1414 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1415 processing for each job.
1416
1417 @type job_ids: list
1418 @param job_ids: sequence of job identifiers or None for all
1419 @type fields: list
1420 @param fields: names of fields to return
1421 @rtype: list
1422 @return: list one element per job, each element being list with
1423 the requested fields
1424
1425 """
1426 jobs = []
1427
1428 for job in self._GetJobsUnlocked(job_ids):
1429 if job is None:
1430 jobs.append(None)
1431 else:
1432 jobs.append(self._GetJobInfoUnlocked(job, fields))
1433
1434 return jobs
1435
1436 @utils.LockedMethod
1437 @_RequireOpenQueue
1439 """Stops the job queue.
1440
1441 This shutdowns all the worker threads an closes the queue.
1442
1443 """
1444 self._wpool.TerminateWorkers()
1445
1446 self._queue_lock.Close()
1447 self._queue_lock = None
1448