Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the job queue handling. 
  23   
  24  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  25  used by all other classes in this module. 
  26   
  27  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  28      processing jobs 
  29   
  30  """ 
  31   
  32  import logging 
  33  import errno 
  34  import time 
  35  import weakref 
  36  import threading 
  37  import itertools 
  38   
  39  try: 
  40    # pylint: disable=E0611 
  41    from pyinotify import pyinotify 
  42  except ImportError: 
  43    import pyinotify 
  44   
  45  from ganeti import asyncnotifier 
  46  from ganeti import constants 
  47  from ganeti import serializer 
  48  from ganeti import workerpool 
  49  from ganeti import locking 
  50  from ganeti import opcodes 
  51  from ganeti import errors 
  52  from ganeti import mcpu 
  53  from ganeti import utils 
  54  from ganeti import jstore 
  55  from ganeti import rpc 
  56  from ganeti import runtime 
  57  from ganeti import netutils 
  58  from ganeti import compat 
  59  from ganeti import ht 
  60   
  61   
  62  JOBQUEUE_THREADS = 25 
  63  JOBS_PER_ARCHIVE_DIRECTORY = 10000 
  64   
  65  # member lock names to be passed to @ssynchronized decorator 
  66  _LOCK = "_lock" 
  67  _QUEUE = "_queue" 
68 69 70 -class CancelJob(Exception):
71 """Special exception to cancel a job. 72 73 """
74
75 76 -def TimeStampNow():
77 """Returns the current timestamp. 78 79 @rtype: tuple 80 @return: the current time in the (seconds, microseconds) format 81 82 """ 83 return utils.SplitTime(time.time())
84
85 86 -class _QueuedOpCode(object):
87 """Encapsulates an opcode object. 88 89 @ivar log: holds the execution log and consists of tuples 90 of the form C{(log_serial, timestamp, level, message)} 91 @ivar input: the OpCode we encapsulate 92 @ivar status: the current status 93 @ivar result: the result of the LU execution 94 @ivar start_timestamp: timestamp for the start of the execution 95 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 96 @ivar stop_timestamp: timestamp for the end of the execution 97 98 """ 99 __slots__ = ["input", "status", "result", "log", "priority", 100 "start_timestamp", "exec_timestamp", "end_timestamp", 101 "__weakref__"] 102
103 - def __init__(self, op):
104 """Constructor for the _QuededOpCode. 105 106 @type op: L{opcodes.OpCode} 107 @param op: the opcode we encapsulate 108 109 """ 110 self.input = op 111 self.status = constants.OP_STATUS_QUEUED 112 self.result = None 113 self.log = [] 114 self.start_timestamp = None 115 self.exec_timestamp = None 116 self.end_timestamp = None 117 118 # Get initial priority (it might change during the lifetime of this opcode) 119 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120 121 @classmethod
122 - def Restore(cls, state):
123 """Restore the _QueuedOpCode from the serialized form. 124 125 @type state: dict 126 @param state: the serialized state 127 @rtype: _QueuedOpCode 128 @return: a new _QueuedOpCode instance 129 130 """ 131 obj = _QueuedOpCode.__new__(cls) 132 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 133 obj.status = state["status"] 134 obj.result = state["result"] 135 obj.log = state["log"] 136 obj.start_timestamp = state.get("start_timestamp", None) 137 obj.exec_timestamp = state.get("exec_timestamp", None) 138 obj.end_timestamp = state.get("end_timestamp", None) 139 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 140 return obj
141
142 - def Serialize(self):
143 """Serializes this _QueuedOpCode. 144 145 @rtype: dict 146 @return: the dictionary holding the serialized state 147 148 """ 149 return { 150 "input": self.input.__getstate__(), 151 "status": self.status, 152 "result": self.result, 153 "log": self.log, 154 "start_timestamp": self.start_timestamp, 155 "exec_timestamp": self.exec_timestamp, 156 "end_timestamp": self.end_timestamp, 157 "priority": self.priority, 158 }
159
160 161 -class _QueuedJob(object):
162 """In-memory job representation. 163 164 This is what we use to track the user-submitted jobs. Locking must 165 be taken care of by users of this class. 166 167 @type queue: L{JobQueue} 168 @ivar queue: the parent queue 169 @ivar id: the job ID 170 @type ops: list 171 @ivar ops: the list of _QueuedOpCode that constitute the job 172 @type log_serial: int 173 @ivar log_serial: holds the index for the next log entry 174 @ivar received_timestamp: the timestamp for when the job was received 175 @ivar start_timestmap: the timestamp for start of execution 176 @ivar end_timestamp: the timestamp for end of execution 177 @ivar writable: Whether the job is allowed to be modified 178 179 """ 180 # pylint: disable=W0212 181 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 182 "received_timestamp", "start_timestamp", "end_timestamp", 183 "__weakref__", "processor_lock", "writable"] 184
185 - def __init__(self, queue, job_id, ops, writable):
186 """Constructor for the _QueuedJob. 187 188 @type queue: L{JobQueue} 189 @param queue: our parent queue 190 @type job_id: job_id 191 @param job_id: our job id 192 @type ops: list 193 @param ops: the list of opcodes we hold, which will be encapsulated 194 in _QueuedOpCodes 195 @type writable: bool 196 @param writable: Whether job can be modified 197 198 """ 199 if not ops: 200 raise errors.GenericError("A job needs at least one opcode") 201 202 self.queue = queue 203 self.id = job_id 204 self.ops = [_QueuedOpCode(op) for op in ops] 205 self.log_serial = 0 206 self.received_timestamp = TimeStampNow() 207 self.start_timestamp = None 208 self.end_timestamp = None 209 210 self._InitInMemory(self, writable)
211 212 @staticmethod
213 - def _InitInMemory(obj, writable):
214 """Initializes in-memory variables. 215 216 """ 217 obj.writable = writable 218 obj.ops_iter = None 219 obj.cur_opctx = None 220 221 # Read-only jobs are not processed and therefore don't need a lock 222 if writable: 223 obj.processor_lock = threading.Lock() 224 else: 225 obj.processor_lock = None
226
227 - def __repr__(self):
228 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 229 "id=%s" % self.id, 230 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 231 232 return "<%s at %#x>" % (" ".join(status), id(self))
233 234 @classmethod
235 - def Restore(cls, queue, state, writable):
236 """Restore a _QueuedJob from serialized state: 237 238 @type queue: L{JobQueue} 239 @param queue: to which queue the restored job belongs 240 @type state: dict 241 @param state: the serialized state 242 @type writable: bool 243 @param writable: Whether job can be modified 244 @rtype: _JobQueue 245 @return: the restored _JobQueue instance 246 247 """ 248 obj = _QueuedJob.__new__(cls) 249 obj.queue = queue 250 obj.id = state["id"] 251 obj.received_timestamp = state.get("received_timestamp", None) 252 obj.start_timestamp = state.get("start_timestamp", None) 253 obj.end_timestamp = state.get("end_timestamp", None) 254 255 obj.ops = [] 256 obj.log_serial = 0 257 for op_state in state["ops"]: 258 op = _QueuedOpCode.Restore(op_state) 259 for log_entry in op.log: 260 obj.log_serial = max(obj.log_serial, log_entry[0]) 261 obj.ops.append(op) 262 263 cls._InitInMemory(obj, writable) 264 265 return obj
266
267 - def Serialize(self):
268 """Serialize the _JobQueue instance. 269 270 @rtype: dict 271 @return: the serialized state 272 273 """ 274 return { 275 "id": self.id, 276 "ops": [op.Serialize() for op in self.ops], 277 "start_timestamp": self.start_timestamp, 278 "end_timestamp": self.end_timestamp, 279 "received_timestamp": self.received_timestamp, 280 }
281
282 - def CalcStatus(self):
283 """Compute the status of this job. 284 285 This function iterates over all the _QueuedOpCodes in the job and 286 based on their status, computes the job status. 287 288 The algorithm is: 289 - if we find a cancelled, or finished with error, the job 290 status will be the same 291 - otherwise, the last opcode with the status one of: 292 - waitlock 293 - canceling 294 - running 295 296 will determine the job status 297 298 - otherwise, it means either all opcodes are queued, or success, 299 and the job status will be the same 300 301 @return: the job status 302 303 """ 304 status = constants.JOB_STATUS_QUEUED 305 306 all_success = True 307 for op in self.ops: 308 if op.status == constants.OP_STATUS_SUCCESS: 309 continue 310 311 all_success = False 312 313 if op.status == constants.OP_STATUS_QUEUED: 314 pass 315 elif op.status == constants.OP_STATUS_WAITING: 316 status = constants.JOB_STATUS_WAITING 317 elif op.status == constants.OP_STATUS_RUNNING: 318 status = constants.JOB_STATUS_RUNNING 319 elif op.status == constants.OP_STATUS_CANCELING: 320 status = constants.JOB_STATUS_CANCELING 321 break 322 elif op.status == constants.OP_STATUS_ERROR: 323 status = constants.JOB_STATUS_ERROR 324 # The whole job fails if one opcode failed 325 break 326 elif op.status == constants.OP_STATUS_CANCELED: 327 status = constants.OP_STATUS_CANCELED 328 break 329 330 if all_success: 331 status = constants.JOB_STATUS_SUCCESS 332 333 return status
334
335 - def CalcPriority(self):
336 """Gets the current priority for this job. 337 338 Only unfinished opcodes are considered. When all are done, the default 339 priority is used. 340 341 @rtype: int 342 343 """ 344 priorities = [op.priority for op in self.ops 345 if op.status not in constants.OPS_FINALIZED] 346 347 if not priorities: 348 # All opcodes are done, assume default priority 349 return constants.OP_PRIO_DEFAULT 350 351 return min(priorities)
352
353 - def GetLogEntries(self, newer_than):
354 """Selectively returns the log entries. 355 356 @type newer_than: None or int 357 @param newer_than: if this is None, return all log entries, 358 otherwise return only the log entries with serial higher 359 than this value 360 @rtype: list 361 @return: the list of the log entries selected 362 363 """ 364 if newer_than is None: 365 serial = -1 366 else: 367 serial = newer_than 368 369 entries = [] 370 for op in self.ops: 371 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 372 373 return entries
374
375 - def GetInfo(self, fields):
376 """Returns information about a job. 377 378 @type fields: list 379 @param fields: names of fields to return 380 @rtype: list 381 @return: list with one element for each field 382 @raise errors.OpExecError: when an invalid field 383 has been passed 384 385 """ 386 row = [] 387 for fname in fields: 388 if fname == "id": 389 row.append(self.id) 390 elif fname == "status": 391 row.append(self.CalcStatus()) 392 elif fname == "priority": 393 row.append(self.CalcPriority()) 394 elif fname == "ops": 395 row.append([op.input.__getstate__() for op in self.ops]) 396 elif fname == "opresult": 397 row.append([op.result for op in self.ops]) 398 elif fname == "opstatus": 399 row.append([op.status for op in self.ops]) 400 elif fname == "oplog": 401 row.append([op.log for op in self.ops]) 402 elif fname == "opstart": 403 row.append([op.start_timestamp for op in self.ops]) 404 elif fname == "opexec": 405 row.append([op.exec_timestamp for op in self.ops]) 406 elif fname == "opend": 407 row.append([op.end_timestamp for op in self.ops]) 408 elif fname == "oppriority": 409 row.append([op.priority for op in self.ops]) 410 elif fname == "received_ts": 411 row.append(self.received_timestamp) 412 elif fname == "start_ts": 413 row.append(self.start_timestamp) 414 elif fname == "end_ts": 415 row.append(self.end_timestamp) 416 elif fname == "summary": 417 row.append([op.input.Summary() for op in self.ops]) 418 else: 419 raise errors.OpExecError("Invalid self query field '%s'" % fname) 420 return row
421
422 - def MarkUnfinishedOps(self, status, result):
423 """Mark unfinished opcodes with a given status and result. 424 425 This is an utility function for marking all running or waiting to 426 be run opcodes with a given status. Opcodes which are already 427 finalised are not changed. 428 429 @param status: a given opcode status 430 @param result: the opcode result 431 432 """ 433 not_marked = True 434 for op in self.ops: 435 if op.status in constants.OPS_FINALIZED: 436 assert not_marked, "Finalized opcodes found after non-finalized ones" 437 continue 438 op.status = status 439 op.result = result 440 not_marked = False
441
442 - def Finalize(self):
443 """Marks the job as finalized. 444 445 """ 446 self.end_timestamp = TimeStampNow()
447
448 - def Cancel(self):
449 """Marks job as canceled/-ing if possible. 450 451 @rtype: tuple; (bool, string) 452 @return: Boolean describing whether job was successfully canceled or marked 453 as canceling and a text message 454 455 """ 456 status = self.CalcStatus() 457 458 if status == constants.JOB_STATUS_QUEUED: 459 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 460 "Job canceled by request") 461 self.Finalize() 462 return (True, "Job %s canceled" % self.id) 463 464 elif status == constants.JOB_STATUS_WAITING: 465 # The worker will notice the new status and cancel the job 466 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 467 return (True, "Job %s will be canceled" % self.id) 468 469 else: 470 logging.debug("Job %s is no longer waiting in the queue", self.id) 471 return (False, "Job %s is no longer waiting in the queue" % self.id)
472
473 474 -class _OpExecCallbacks(mcpu.OpExecCbBase):
475 - def __init__(self, queue, job, op):
476 """Initializes this class. 477 478 @type queue: L{JobQueue} 479 @param queue: Job queue 480 @type job: L{_QueuedJob} 481 @param job: Job object 482 @type op: L{_QueuedOpCode} 483 @param op: OpCode 484 485 """ 486 assert queue, "Queue is missing" 487 assert job, "Job is missing" 488 assert op, "Opcode is missing" 489 490 self._queue = queue 491 self._job = job 492 self._op = op
493
494 - def _CheckCancel(self):
495 """Raises an exception to cancel the job if asked to. 496 497 """ 498 # Cancel here if we were asked to 499 if self._op.status == constants.OP_STATUS_CANCELING: 500 logging.debug("Canceling opcode") 501 raise CancelJob()
502 503 @locking.ssynchronized(_QUEUE, shared=1)
504 - def NotifyStart(self):
505 """Mark the opcode as running, not lock-waiting. 506 507 This is called from the mcpu code as a notifier function, when the LU is 508 finally about to start the Exec() method. Of course, to have end-user 509 visible results, the opcode must be initially (before calling into 510 Processor.ExecOpCode) set to OP_STATUS_WAITING. 511 512 """ 513 assert self._op in self._job.ops 514 assert self._op.status in (constants.OP_STATUS_WAITING, 515 constants.OP_STATUS_CANCELING) 516 517 # Cancel here if we were asked to 518 self._CheckCancel() 519 520 logging.debug("Opcode is now running") 521 522 self._op.status = constants.OP_STATUS_RUNNING 523 self._op.exec_timestamp = TimeStampNow() 524 525 # And finally replicate the job status 526 self._queue.UpdateJobUnlocked(self._job)
527 528 @locking.ssynchronized(_QUEUE, shared=1)
529 - def _AppendFeedback(self, timestamp, log_type, log_msg):
530 """Internal feedback append function, with locks 531 532 """ 533 self._job.log_serial += 1 534 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 535 self._queue.UpdateJobUnlocked(self._job, replicate=False)
536
537 - def Feedback(self, *args):
538 """Append a log entry. 539 540 """ 541 assert len(args) < 3 542 543 if len(args) == 1: 544 log_type = constants.ELOG_MESSAGE 545 log_msg = args[0] 546 else: 547 (log_type, log_msg) = args 548 549 # The time is split to make serialization easier and not lose 550 # precision. 551 timestamp = utils.SplitTime(time.time()) 552 self._AppendFeedback(timestamp, log_type, log_msg)
553
554 - def CheckCancel(self):
555 """Check whether job has been cancelled. 556 557 """ 558 assert self._op.status in (constants.OP_STATUS_WAITING, 559 constants.OP_STATUS_CANCELING) 560 561 # Cancel here if we were asked to 562 self._CheckCancel()
563
564 - def SubmitManyJobs(self, jobs):
565 """Submits jobs for processing. 566 567 See L{JobQueue.SubmitManyJobs}. 568 569 """ 570 # Locking is done in job queue 571 return self._queue.SubmitManyJobs(jobs)
572
573 574 -class _JobChangesChecker(object):
575 - def __init__(self, fields, prev_job_info, prev_log_serial):
576 """Initializes this class. 577 578 @type fields: list of strings 579 @param fields: Fields requested by LUXI client 580 @type prev_job_info: string 581 @param prev_job_info: previous job info, as passed by the LUXI client 582 @type prev_log_serial: string 583 @param prev_log_serial: previous job serial, as passed by the LUXI client 584 585 """ 586 self._fields = fields 587 self._prev_job_info = prev_job_info 588 self._prev_log_serial = prev_log_serial
589
590 - def __call__(self, job):
591 """Checks whether job has changed. 592 593 @type job: L{_QueuedJob} 594 @param job: Job object 595 596 """ 597 assert not job.writable, "Expected read-only job" 598 599 status = job.CalcStatus() 600 job_info = job.GetInfo(self._fields) 601 log_entries = job.GetLogEntries(self._prev_log_serial) 602 603 # Serializing and deserializing data can cause type changes (e.g. from 604 # tuple to list) or precision loss. We're doing it here so that we get 605 # the same modifications as the data received from the client. Without 606 # this, the comparison afterwards might fail without the data being 607 # significantly different. 608 # TODO: we just deserialized from disk, investigate how to make sure that 609 # the job info and log entries are compatible to avoid this further step. 610 # TODO: Doing something like in testutils.py:UnifyValueType might be more 611 # efficient, though floats will be tricky 612 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 613 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 614 615 # Don't even try to wait if the job is no longer running, there will be 616 # no changes. 617 if (status not in (constants.JOB_STATUS_QUEUED, 618 constants.JOB_STATUS_RUNNING, 619 constants.JOB_STATUS_WAITING) or 620 job_info != self._prev_job_info or 621 (log_entries and self._prev_log_serial != log_entries[0][0])): 622 logging.debug("Job %s changed", job.id) 623 return (job_info, log_entries) 624 625 return None
626
627 628 -class _JobFileChangesWaiter(object):
629 - def __init__(self, filename):
630 """Initializes this class. 631 632 @type filename: string 633 @param filename: Path to job file 634 @raises errors.InotifyError: if the notifier cannot be setup 635 636 """ 637 self._wm = pyinotify.WatchManager() 638 self._inotify_handler = \ 639 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) 640 self._notifier = \ 641 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) 642 try: 643 self._inotify_handler.enable() 644 except Exception: 645 # pyinotify doesn't close file descriptors automatically 646 self._notifier.stop() 647 raise
648
649 - def _OnInotify(self, notifier_enabled):
650 """Callback for inotify. 651 652 """ 653 if not notifier_enabled: 654 self._inotify_handler.enable()
655
656 - def Wait(self, timeout):
657 """Waits for the job file to change. 658 659 @type timeout: float 660 @param timeout: Timeout in seconds 661 @return: Whether there have been events 662 663 """ 664 assert timeout >= 0 665 have_events = self._notifier.check_events(timeout * 1000) 666 if have_events: 667 self._notifier.read_events() 668 self._notifier.process_events() 669 return have_events
670
671 - def Close(self):
672 """Closes underlying notifier and its file descriptor. 673 674 """ 675 self._notifier.stop()
676
677 678 -class _JobChangesWaiter(object):
679 - def __init__(self, filename):
680 """Initializes this class. 681 682 @type filename: string 683 @param filename: Path to job file 684 685 """ 686 self._filewaiter = None 687 self._filename = filename
688
689 - def Wait(self, timeout):
690 """Waits for a job to change. 691 692 @type timeout: float 693 @param timeout: Timeout in seconds 694 @return: Whether there have been events 695 696 """ 697 if self._filewaiter: 698 return self._filewaiter.Wait(timeout) 699 700 # Lazy setup: Avoid inotify setup cost when job file has already changed. 701 # If this point is reached, return immediately and let caller check the job 702 # file again in case there were changes since the last check. This avoids a 703 # race condition. 704 self._filewaiter = _JobFileChangesWaiter(self._filename) 705 706 return True
707
708 - def Close(self):
709 """Closes underlying waiter. 710 711 """ 712 if self._filewaiter: 713 self._filewaiter.Close()
714
715 716 -class _WaitForJobChangesHelper(object):
717 """Helper class using inotify to wait for changes in a job file. 718 719 This class takes a previous job status and serial, and alerts the client when 720 the current job status has changed. 721 722 """ 723 @staticmethod
724 - def _CheckForChanges(counter, job_load_fn, check_fn):
725 if counter.next() > 0: 726 # If this isn't the first check the job is given some more time to change 727 # again. This gives better performance for jobs generating many 728 # changes/messages. 729 time.sleep(0.1) 730 731 job = job_load_fn() 732 if not job: 733 raise errors.JobLost() 734 735 result = check_fn(job) 736 if result is None: 737 raise utils.RetryAgain() 738 739 return result
740
741 - def __call__(self, filename, job_load_fn, 742 fields, prev_job_info, prev_log_serial, timeout):
743 """Waits for changes on a job. 744 745 @type filename: string 746 @param filename: File on which to wait for changes 747 @type job_load_fn: callable 748 @param job_load_fn: Function to load job 749 @type fields: list of strings 750 @param fields: Which fields to check for changes 751 @type prev_job_info: list or None 752 @param prev_job_info: Last job information returned 753 @type prev_log_serial: int 754 @param prev_log_serial: Last job message serial number 755 @type timeout: float 756 @param timeout: maximum time to wait in seconds 757 758 """ 759 counter = itertools.count() 760 try: 761 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) 762 waiter = _JobChangesWaiter(filename) 763 try: 764 return utils.Retry(compat.partial(self._CheckForChanges, 765 counter, job_load_fn, check_fn), 766 utils.RETRY_REMAINING_TIME, timeout, 767 wait_fn=waiter.Wait) 768 finally: 769 waiter.Close() 770 except (errors.InotifyError, errors.JobLost): 771 return None 772 except utils.RetryTimeout: 773 return constants.JOB_NOTCHANGED
774
775 776 -def _EncodeOpError(err):
777 """Encodes an error which occurred while processing an opcode. 778 779 """ 780 if isinstance(err, errors.GenericError): 781 to_encode = err 782 else: 783 to_encode = errors.OpExecError(str(err)) 784 785 return errors.EncodeException(to_encode)
786
787 788 -class _TimeoutStrategyWrapper:
789 - def __init__(self, fn):
790 """Initializes this class. 791 792 """ 793 self._fn = fn 794 self._next = None
795
796 - def _Advance(self):
797 """Gets the next timeout if necessary. 798 799 """ 800 if self._next is None: 801 self._next = self._fn()
802
803 - def Peek(self):
804 """Returns the next timeout. 805 806 """ 807 self._Advance() 808 return self._next
809
810 - def Next(self):
811 """Returns the current timeout and advances the internal state. 812 813 """ 814 self._Advance() 815 result = self._next 816 self._next = None 817 return result
818
819 820 -class _OpExecContext:
821 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
822 """Initializes this class. 823 824 """ 825 self.op = op 826 self.index = index 827 self.log_prefix = log_prefix 828 self.summary = op.input.Summary() 829 830 # Create local copy to modify 831 if getattr(op.input, opcodes.DEPEND_ATTR, None): 832 self.jobdeps = op.input.depends[:] 833 else: 834 self.jobdeps = None 835 836 self._timeout_strategy_factory = timeout_strategy_factory 837 self._ResetTimeoutStrategy()
838
839 - def _ResetTimeoutStrategy(self):
840 """Creates a new timeout strategy. 841 842 """ 843 self._timeout_strategy = \ 844 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
845
846 - def CheckPriorityIncrease(self):
847 """Checks whether priority can and should be increased. 848 849 Called when locks couldn't be acquired. 850 851 """ 852 op = self.op 853 854 # Exhausted all retries and next round should not use blocking acquire 855 # for locks? 856 if (self._timeout_strategy.Peek() is None and 857 op.priority > constants.OP_PRIO_HIGHEST): 858 logging.debug("Increasing priority") 859 op.priority -= 1 860 self._ResetTimeoutStrategy() 861 return True 862 863 return False
864
865 - def GetNextLockTimeout(self):
866 """Returns the next lock acquire timeout. 867 868 """ 869 return self._timeout_strategy.Next()
870
871 872 -class _JobProcessor(object):
873 (DEFER, 874 WAITDEP, 875 FINISHED) = range(1, 4) 876
877 - def __init__(self, queue, opexec_fn, job, 878 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
879 """Initializes this class. 880 881 """ 882 self.queue = queue 883 self.opexec_fn = opexec_fn 884 self.job = job 885 self._timeout_strategy_factory = _timeout_strategy_factory
886 887 @staticmethod
888 - def _FindNextOpcode(job, timeout_strategy_factory):
889 """Locates the next opcode to run. 890 891 @type job: L{_QueuedJob} 892 @param job: Job object 893 @param timeout_strategy_factory: Callable to create new timeout strategy 894 895 """ 896 # Create some sort of a cache to speed up locating next opcode for future 897 # lookups 898 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 899 # pending and one for processed ops. 900 if job.ops_iter is None: 901 job.ops_iter = enumerate(job.ops) 902 903 # Find next opcode to run 904 while True: 905 try: 906 (idx, op) = job.ops_iter.next() 907 except StopIteration: 908 raise errors.ProgrammerError("Called for a finished job") 909 910 if op.status == constants.OP_STATUS_RUNNING: 911 # Found an opcode already marked as running 912 raise errors.ProgrammerError("Called for job marked as running") 913 914 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 915 timeout_strategy_factory) 916 917 if op.status not in constants.OPS_FINALIZED: 918 return opctx 919 920 # This is a job that was partially completed before master daemon 921 # shutdown, so it can be expected that some opcodes are already 922 # completed successfully (if any did error out, then the whole job 923 # should have been aborted and not resubmitted for processing). 924 logging.info("%s: opcode %s already processed, skipping", 925 opctx.log_prefix, opctx.summary)
926 927 @staticmethod
928 - def _MarkWaitlock(job, op):
929 """Marks an opcode as waiting for locks. 930 931 The job's start timestamp is also set if necessary. 932 933 @type job: L{_QueuedJob} 934 @param job: Job object 935 @type op: L{_QueuedOpCode} 936 @param op: Opcode object 937 938 """ 939 assert op in job.ops 940 assert op.status in (constants.OP_STATUS_QUEUED, 941 constants.OP_STATUS_WAITING) 942 943 update = False 944 945 op.result = None 946 947 if op.status == constants.OP_STATUS_QUEUED: 948 op.status = constants.OP_STATUS_WAITING 949 update = True 950 951 if op.start_timestamp is None: 952 op.start_timestamp = TimeStampNow() 953 update = True 954 955 if job.start_timestamp is None: 956 job.start_timestamp = op.start_timestamp 957 update = True 958 959 assert op.status == constants.OP_STATUS_WAITING 960 961 return update
962 963 @staticmethod
964 - def _CheckDependencies(queue, job, opctx):
965 """Checks if an opcode has dependencies and if so, processes them. 966 967 @type queue: L{JobQueue} 968 @param queue: Queue object 969 @type job: L{_QueuedJob} 970 @param job: Job object 971 @type opctx: L{_OpExecContext} 972 @param opctx: Opcode execution context 973 @rtype: bool 974 @return: Whether opcode will be re-scheduled by dependency tracker 975 976 """ 977 op = opctx.op 978 979 result = False 980 981 while opctx.jobdeps: 982 (dep_job_id, dep_status) = opctx.jobdeps[0] 983 984 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 985 dep_status) 986 assert ht.TNonEmptyString(depmsg), "No dependency message" 987 988 logging.info("%s: %s", opctx.log_prefix, depmsg) 989 990 if depresult == _JobDependencyManager.CONTINUE: 991 # Remove dependency and continue 992 opctx.jobdeps.pop(0) 993 994 elif depresult == _JobDependencyManager.WAIT: 995 # Need to wait for notification, dependency tracker will re-add job 996 # to workerpool 997 result = True 998 break 999 1000 elif depresult == _JobDependencyManager.CANCEL: 1001 # Job was cancelled, cancel this job as well 1002 job.Cancel() 1003 assert op.status == constants.OP_STATUS_CANCELING 1004 break 1005 1006 elif depresult in (_JobDependencyManager.WRONGSTATUS, 1007 _JobDependencyManager.ERROR): 1008 # Job failed or there was an error, this job must fail 1009 op.status = constants.OP_STATUS_ERROR 1010 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 1011 break 1012 1013 else: 1014 raise errors.ProgrammerError("Unknown dependency result '%s'" % 1015 depresult) 1016 1017 return result
1018
1019 - def _ExecOpCodeUnlocked(self, opctx):
1020 """Processes one opcode and returns the result. 1021 1022 """ 1023 op = opctx.op 1024 1025 assert op.status == constants.OP_STATUS_WAITING 1026 1027 timeout = opctx.GetNextLockTimeout() 1028 1029 try: 1030 # Make sure not to hold queue lock while calling ExecOpCode 1031 result = self.opexec_fn(op.input, 1032 _OpExecCallbacks(self.queue, self.job, op), 1033 timeout=timeout, priority=op.priority) 1034 except mcpu.LockAcquireTimeout: 1035 assert timeout is not None, "Received timeout for blocking acquire" 1036 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 1037 1038 assert op.status in (constants.OP_STATUS_WAITING, 1039 constants.OP_STATUS_CANCELING) 1040 1041 # Was job cancelled while we were waiting for the lock? 1042 if op.status == constants.OP_STATUS_CANCELING: 1043 return (constants.OP_STATUS_CANCELING, None) 1044 1045 # Stay in waitlock while trying to re-acquire lock 1046 return (constants.OP_STATUS_WAITING, None) 1047 except CancelJob: 1048 logging.exception("%s: Canceling job", opctx.log_prefix) 1049 assert op.status == constants.OP_STATUS_CANCELING 1050 return (constants.OP_STATUS_CANCELING, None) 1051 except Exception, err: # pylint: disable=W0703 1052 logging.exception("%s: Caught exception in %s", 1053 opctx.log_prefix, opctx.summary) 1054 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 1055 else: 1056 logging.debug("%s: %s successful", 1057 opctx.log_prefix, opctx.summary) 1058 return (constants.OP_STATUS_SUCCESS, result)
1059
1060 - def __call__(self, _nextop_fn=None):
1061 """Continues execution of a job. 1062 1063 @param _nextop_fn: Callback function for tests 1064 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 1065 be deferred and C{WAITDEP} if the dependency manager 1066 (L{_JobDependencyManager}) will re-schedule the job when appropriate 1067 1068 """ 1069 queue = self.queue 1070 job = self.job 1071 1072 logging.debug("Processing job %s", job.id) 1073 1074 queue.acquire(shared=1) 1075 try: 1076 opcount = len(job.ops) 1077 1078 assert job.writable, "Expected writable job" 1079 1080 # Don't do anything for finalized jobs 1081 if job.CalcStatus() in constants.JOBS_FINALIZED: 1082 return self.FINISHED 1083 1084 # Is a previous opcode still pending? 1085 if job.cur_opctx: 1086 opctx = job.cur_opctx 1087 job.cur_opctx = None 1088 else: 1089 if __debug__ and _nextop_fn: 1090 _nextop_fn() 1091 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 1092 1093 op = opctx.op 1094 1095 # Consistency check 1096 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 1097 constants.OP_STATUS_CANCELING) 1098 for i in job.ops[opctx.index + 1:]) 1099 1100 assert op.status in (constants.OP_STATUS_QUEUED, 1101 constants.OP_STATUS_WAITING, 1102 constants.OP_STATUS_CANCELING) 1103 1104 assert (op.priority <= constants.OP_PRIO_LOWEST and 1105 op.priority >= constants.OP_PRIO_HIGHEST) 1106 1107 waitjob = None 1108 1109 if op.status != constants.OP_STATUS_CANCELING: 1110 assert op.status in (constants.OP_STATUS_QUEUED, 1111 constants.OP_STATUS_WAITING) 1112 1113 # Prepare to start opcode 1114 if self._MarkWaitlock(job, op): 1115 # Write to disk 1116 queue.UpdateJobUnlocked(job) 1117 1118 assert op.status == constants.OP_STATUS_WAITING 1119 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1120 assert job.start_timestamp and op.start_timestamp 1121 assert waitjob is None 1122 1123 # Check if waiting for a job is necessary 1124 waitjob = self._CheckDependencies(queue, job, opctx) 1125 1126 assert op.status in (constants.OP_STATUS_WAITING, 1127 constants.OP_STATUS_CANCELING, 1128 constants.OP_STATUS_ERROR) 1129 1130 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1131 constants.OP_STATUS_ERROR)): 1132 logging.info("%s: opcode %s waiting for locks", 1133 opctx.log_prefix, opctx.summary) 1134 1135 assert not opctx.jobdeps, "Not all dependencies were removed" 1136 1137 queue.release() 1138 try: 1139 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1140 finally: 1141 queue.acquire(shared=1) 1142 1143 op.status = op_status 1144 op.result = op_result 1145 1146 assert not waitjob 1147 1148 if op.status == constants.OP_STATUS_WAITING: 1149 # Couldn't get locks in time 1150 assert not op.end_timestamp 1151 else: 1152 # Finalize opcode 1153 op.end_timestamp = TimeStampNow() 1154 1155 if op.status == constants.OP_STATUS_CANCELING: 1156 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1157 for i in job.ops[opctx.index:]) 1158 else: 1159 assert op.status in constants.OPS_FINALIZED 1160 1161 if op.status == constants.OP_STATUS_WAITING or waitjob: 1162 finalize = False 1163 1164 if not waitjob and opctx.CheckPriorityIncrease(): 1165 # Priority was changed, need to update on-disk file 1166 queue.UpdateJobUnlocked(job) 1167 1168 # Keep around for another round 1169 job.cur_opctx = opctx 1170 1171 assert (op.priority <= constants.OP_PRIO_LOWEST and 1172 op.priority >= constants.OP_PRIO_HIGHEST) 1173 1174 # In no case must the status be finalized here 1175 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1176 1177 else: 1178 # Ensure all opcodes so far have been successful 1179 assert (opctx.index == 0 or 1180 compat.all(i.status == constants.OP_STATUS_SUCCESS 1181 for i in job.ops[:opctx.index])) 1182 1183 # Reset context 1184 job.cur_opctx = None 1185 1186 if op.status == constants.OP_STATUS_SUCCESS: 1187 finalize = False 1188 1189 elif op.status == constants.OP_STATUS_ERROR: 1190 # Ensure failed opcode has an exception as its result 1191 assert errors.GetEncodedError(job.ops[opctx.index].result) 1192 1193 to_encode = errors.OpExecError("Preceding opcode failed") 1194 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1195 _EncodeOpError(to_encode)) 1196 finalize = True 1197 1198 # Consistency check 1199 assert compat.all(i.status == constants.OP_STATUS_ERROR and 1200 errors.GetEncodedError(i.result) 1201 for i in job.ops[opctx.index:]) 1202 1203 elif op.status == constants.OP_STATUS_CANCELING: 1204 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1205 "Job canceled by request") 1206 finalize = True 1207 1208 else: 1209 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1210 1211 if opctx.index == (opcount - 1): 1212 # Finalize on last opcode 1213 finalize = True 1214 1215 if finalize: 1216 # All opcodes have been run, finalize job 1217 job.Finalize() 1218 1219 # Write to disk. If the job status is final, this is the final write 1220 # allowed. Once the file has been written, it can be archived anytime. 1221 queue.UpdateJobUnlocked(job) 1222 1223 assert not waitjob 1224 1225 if finalize: 1226 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1227 return self.FINISHED 1228 1229 assert not waitjob or queue.depmgr.JobWaiting(job) 1230 1231 if waitjob: 1232 return self.WAITDEP 1233 else: 1234 return self.DEFER 1235 finally: 1236 assert job.writable, "Job became read-only while being processed" 1237 queue.release()
1238
1239 1240 -class _JobQueueWorker(workerpool.BaseWorker):
1241 """The actual job workers. 1242 1243 """
1244 - def RunTask(self, job): # pylint: disable=W0221
1245 """Job executor. 1246 1247 @type job: L{_QueuedJob} 1248 @param job: the job to be processed 1249 1250 """ 1251 assert job.writable, "Expected writable job" 1252 1253 # Ensure only one worker is active on a single job. If a job registers for 1254 # a dependency job, and the other job notifies before the first worker is 1255 # done, the job can end up in the tasklist more than once. 1256 job.processor_lock.acquire() 1257 try: 1258 return self._RunTaskInner(job) 1259 finally: 1260 job.processor_lock.release()
1261
1262 - def _RunTaskInner(self, job):
1263 """Executes a job. 1264 1265 Must be called with per-job lock acquired. 1266 1267 """ 1268 queue = job.queue 1269 assert queue == self.pool.queue 1270 1271 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) 1272 setname_fn(None) 1273 1274 proc = mcpu.Processor(queue.context, job.id) 1275 1276 # Create wrapper for setting thread name 1277 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, 1278 proc.ExecOpCode) 1279 1280 result = _JobProcessor(queue, wrap_execop_fn, job)() 1281 1282 if result == _JobProcessor.FINISHED: 1283 # Notify waiting jobs 1284 queue.depmgr.NotifyWaiters(job.id) 1285 1286 elif result == _JobProcessor.DEFER: 1287 # Schedule again 1288 raise workerpool.DeferTask(priority=job.CalcPriority()) 1289 1290 elif result == _JobProcessor.WAITDEP: 1291 # No-op, dependency manager will re-schedule 1292 pass 1293 1294 else: 1295 raise errors.ProgrammerError("Job processor returned unknown status %s" % 1296 (result, ))
1297 1298 @staticmethod
1299 - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1300 """Updates the worker thread name to include a short summary of the opcode. 1301 1302 @param setname_fn: Callable setting worker thread name 1303 @param execop_fn: Callable for executing opcode (usually 1304 L{mcpu.Processor.ExecOpCode}) 1305 1306 """ 1307 setname_fn(op) 1308 try: 1309 return execop_fn(op, *args, **kwargs) 1310 finally: 1311 setname_fn(None)
1312 1313 @staticmethod
1314 - def _GetWorkerName(job, op):
1315 """Sets the worker thread name. 1316 1317 @type job: L{_QueuedJob} 1318 @type op: L{opcodes.OpCode} 1319 1320 """ 1321 parts = ["Job%s" % job.id] 1322 1323 if op: 1324 parts.append(op.TinySummary()) 1325 1326 return "/".join(parts)
1327
1328 1329 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1330 """Simple class implementing a job-processing workerpool. 1331 1332 """
1333 - def __init__(self, queue):
1334 super(_JobQueueWorkerPool, self).__init__("Jq", 1335 JOBQUEUE_THREADS, 1336 _JobQueueWorker) 1337 self.queue = queue
1338
1339 1340 -class _JobDependencyManager:
1341 """Keeps track of job dependencies. 1342 1343 """ 1344 (WAIT, 1345 ERROR, 1346 CANCEL, 1347 CONTINUE, 1348 WRONGSTATUS) = range(1, 6) 1349
1350 - def __init__(self, getstatus_fn, enqueue_fn):
1351 """Initializes this class. 1352 1353 """ 1354 self._getstatus_fn = getstatus_fn 1355 self._enqueue_fn = enqueue_fn 1356 1357 self._waiters = {} 1358 self._lock = locking.SharedLock("JobDepMgr")
1359 1360 @locking.ssynchronized(_LOCK, shared=1)
1361 - def GetLockInfo(self, requested): # pylint: disable=W0613
1362 """Retrieves information about waiting jobs. 1363 1364 @type requested: set 1365 @param requested: Requested information, see C{query.LQ_*} 1366 1367 """ 1368 # No need to sort here, that's being done by the lock manager and query 1369 # library. There are no priorities for notifying jobs, hence all show up as 1370 # one item under "pending". 1371 return [("job/%s" % job_id, None, None, 1372 [("job", [job.id for job in waiters])]) 1373 for job_id, waiters in self._waiters.items() 1374 if waiters]
1375 1376 @locking.ssynchronized(_LOCK, shared=1)
1377 - def JobWaiting(self, job):
1378 """Checks if a job is waiting. 1379 1380 """ 1381 return compat.any(job in jobs 1382 for jobs in self._waiters.values())
1383 1384 @locking.ssynchronized(_LOCK)
1385 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1386 """Checks if a dependency job has the requested status. 1387 1388 If the other job is not yet in a finalized status, the calling job will be 1389 notified (re-added to the workerpool) at a later point. 1390 1391 @type job: L{_QueuedJob} 1392 @param job: Job object 1393 @type dep_job_id: string 1394 @param dep_job_id: ID of dependency job 1395 @type dep_status: list 1396 @param dep_status: Required status 1397 1398 """ 1399 assert ht.TString(job.id) 1400 assert ht.TString(dep_job_id) 1401 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1402 1403 if job.id == dep_job_id: 1404 return (self.ERROR, "Job can't depend on itself") 1405 1406 # Get status of dependency job 1407 try: 1408 status = self._getstatus_fn(dep_job_id) 1409 except errors.JobLost, err: 1410 return (self.ERROR, "Dependency error: %s" % err) 1411 1412 assert status in constants.JOB_STATUS_ALL 1413 1414 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1415 1416 if status not in constants.JOBS_FINALIZED: 1417 # Register for notification and wait for job to finish 1418 job_id_waiters.add(job) 1419 return (self.WAIT, 1420 "Need to wait for job %s, wanted status '%s'" % 1421 (dep_job_id, dep_status)) 1422 1423 # Remove from waiters list 1424 if job in job_id_waiters: 1425 job_id_waiters.remove(job) 1426 1427 if (status == constants.JOB_STATUS_CANCELED and 1428 constants.JOB_STATUS_CANCELED not in dep_status): 1429 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1430 1431 elif not dep_status or status in dep_status: 1432 return (self.CONTINUE, 1433 "Dependency job %s finished with status '%s'" % 1434 (dep_job_id, status)) 1435 1436 else: 1437 return (self.WRONGSTATUS, 1438 "Dependency job %s finished with status '%s'," 1439 " not one of '%s' as required" % 1440 (dep_job_id, status, utils.CommaJoin(dep_status)))
1441
1442 - def _RemoveEmptyWaitersUnlocked(self):
1443 """Remove all jobs without actual waiters. 1444 1445 """ 1446 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1447 if not waiters]: 1448 del self._waiters[job_id]
1449
1450 - def NotifyWaiters(self, job_id):
1451 """Notifies all jobs waiting for a certain job ID. 1452 1453 @attention: Do not call until L{CheckAndRegister} returned a status other 1454 than C{WAITDEP} for C{job_id}, or behaviour is undefined 1455 @type job_id: string 1456 @param job_id: Job ID 1457 1458 """ 1459 assert ht.TString(job_id) 1460 1461 self._lock.acquire() 1462 try: 1463 self._RemoveEmptyWaitersUnlocked() 1464 1465 jobs = self._waiters.pop(job_id, None) 1466 finally: 1467 self._lock.release() 1468 1469 if jobs: 1470 # Re-add jobs to workerpool 1471 logging.debug("Re-adding %s jobs which were waiting for job %s", 1472 len(jobs), job_id) 1473 self._enqueue_fn(jobs)
1474
1475 1476 -def _RequireOpenQueue(fn):
1477 """Decorator for "public" functions. 1478 1479 This function should be used for all 'public' functions. That is, 1480 functions usually called from other classes. Note that this should 1481 be applied only to methods (not plain functions), since it expects 1482 that the decorated function is called with a first argument that has 1483 a '_queue_filelock' argument. 1484 1485 @warning: Use this decorator only after locking.ssynchronized 1486 1487 Example:: 1488 @locking.ssynchronized(_LOCK) 1489 @_RequireOpenQueue 1490 def Example(self): 1491 pass 1492 1493 """ 1494 def wrapper(self, *args, **kwargs): 1495 # pylint: disable=W0212 1496 assert self._queue_filelock is not None, "Queue should be open" 1497 return fn(self, *args, **kwargs)
1498 return wrapper 1499
1500 1501 -class JobQueue(object):
1502 """Queue used to manage the jobs. 1503 1504 """
1505 - def __init__(self, context):
1506 """Constructor for JobQueue. 1507 1508 The constructor will initialize the job queue object and then 1509 start loading the current jobs from disk, either for starting them 1510 (if they were queue) or for aborting them (if they were already 1511 running). 1512 1513 @type context: GanetiContext 1514 @param context: the context object for access to the configuration 1515 data and other ganeti objects 1516 1517 """ 1518 self.context = context 1519 self._memcache = weakref.WeakValueDictionary() 1520 self._my_hostname = netutils.Hostname.GetSysName() 1521 1522 # The Big JobQueue lock. If a code block or method acquires it in shared 1523 # mode safe it must guarantee concurrency with all the code acquiring it in 1524 # shared mode, including itself. In order not to acquire it at all 1525 # concurrency must be guaranteed with all code acquiring it in shared mode 1526 # and all code acquiring it exclusively. 1527 self._lock = locking.SharedLock("JobQueue") 1528 1529 self.acquire = self._lock.acquire 1530 self.release = self._lock.release 1531 1532 # Initialize the queue, and acquire the filelock. 1533 # This ensures no other process is working on the job queue. 1534 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) 1535 1536 # Read serial file 1537 self._last_serial = jstore.ReadSerial() 1538 assert self._last_serial is not None, ("Serial file was modified between" 1539 " check in jstore and here") 1540 1541 # Get initial list of nodes 1542 self._nodes = dict((n.name, n.primary_ip) 1543 for n in self.context.cfg.GetAllNodesInfo().values() 1544 if n.master_candidate) 1545 1546 # Remove master node 1547 self._nodes.pop(self._my_hostname, None) 1548 1549 # TODO: Check consistency across nodes 1550 1551 self._queue_size = 0 1552 self._UpdateQueueSizeUnlocked() 1553 self._drained = jstore.CheckDrainFlag() 1554 1555 # Job dependencies 1556 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, 1557 self._EnqueueJobs) 1558 self.context.glm.AddToLockMonitor(self.depmgr) 1559 1560 # Setup worker pool 1561 self._wpool = _JobQueueWorkerPool(self) 1562 try: 1563 self._InspectQueue() 1564 except: 1565 self._wpool.TerminateWorkers() 1566 raise
1567 1568 @locking.ssynchronized(_LOCK) 1569 @_RequireOpenQueue
1570 - def _InspectQueue(self):
1571 """Loads the whole job queue and resumes unfinished jobs. 1572 1573 This function needs the lock here because WorkerPool.AddTask() may start a 1574 job while we're still doing our work. 1575 1576 """ 1577 logging.info("Inspecting job queue") 1578 1579 restartjobs = [] 1580 1581 all_job_ids = self._GetJobIDsUnlocked() 1582 jobs_count = len(all_job_ids) 1583 lastinfo = time.time() 1584 for idx, job_id in enumerate(all_job_ids): 1585 # Give an update every 1000 jobs or 10 seconds 1586 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or 1587 idx == (jobs_count - 1)): 1588 logging.info("Job queue inspection: %d/%d (%0.1f %%)", 1589 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) 1590 lastinfo = time.time() 1591 1592 job = self._LoadJobUnlocked(job_id) 1593 1594 # a failure in loading the job can cause 'None' to be returned 1595 if job is None: 1596 continue 1597 1598 status = job.CalcStatus() 1599 1600 if status == constants.JOB_STATUS_QUEUED: 1601 restartjobs.append(job) 1602 1603 elif status in (constants.JOB_STATUS_RUNNING, 1604 constants.JOB_STATUS_WAITING, 1605 constants.JOB_STATUS_CANCELING): 1606 logging.warning("Unfinished job %s found: %s", job.id, job) 1607 1608 if status == constants.JOB_STATUS_WAITING: 1609 # Restart job 1610 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1611 restartjobs.append(job) 1612 else: 1613 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1614 "Unclean master daemon shutdown") 1615 job.Finalize() 1616 1617 self.UpdateJobUnlocked(job) 1618 1619 if restartjobs: 1620 logging.info("Restarting %s jobs", len(restartjobs)) 1621 self._EnqueueJobsUnlocked(restartjobs) 1622 1623 logging.info("Job queue inspection finished")
1624 1625 @locking.ssynchronized(_LOCK) 1626 @_RequireOpenQueue
1627 - def AddNode(self, node):
1628 """Register a new node with the queue. 1629 1630 @type node: L{objects.Node} 1631 @param node: the node object to be added 1632 1633 """ 1634 node_name = node.name 1635 assert node_name != self._my_hostname 1636 1637 # Clean queue directory on added node 1638 result = rpc.RpcRunner.call_jobqueue_purge(node_name) 1639 msg = result.fail_msg 1640 if msg: 1641 logging.warning("Cannot cleanup queue directory on node %s: %s", 1642 node_name, msg) 1643 1644 if not node.master_candidate: 1645 # remove if existing, ignoring errors 1646 self._nodes.pop(node_name, None) 1647 # and skip the replication of the job ids 1648 return 1649 1650 # Upload the whole queue excluding archived jobs 1651 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1652 1653 # Upload current serial file 1654 files.append(constants.JOB_QUEUE_SERIAL_FILE) 1655 1656 for file_name in files: 1657 # Read file content 1658 content = utils.ReadFile(file_name) 1659 1660 result = rpc.RpcRunner.call_jobqueue_update([node_name], 1661 [node.primary_ip], 1662 file_name, content) 1663 msg = result[node_name].fail_msg 1664 if msg: 1665 logging.error("Failed to upload file %s to node %s: %s", 1666 file_name, node_name, msg) 1667 1668 self._nodes[node_name] = node.primary_ip
1669 1670 @locking.ssynchronized(_LOCK) 1671 @_RequireOpenQueue
1672 - def RemoveNode(self, node_name):
1673 """Callback called when removing nodes from the cluster. 1674 1675 @type node_name: str 1676 @param node_name: the name of the node to remove 1677 1678 """ 1679 self._nodes.pop(node_name, None)
1680 1681 @staticmethod
1682 - def _CheckRpcResult(result, nodes, failmsg):
1683 """Verifies the status of an RPC call. 1684 1685 Since we aim to keep consistency should this node (the current 1686 master) fail, we will log errors if our rpc fail, and especially 1687 log the case when more than half of the nodes fails. 1688 1689 @param result: the data as returned from the rpc call 1690 @type nodes: list 1691 @param nodes: the list of nodes we made the call to 1692 @type failmsg: str 1693 @param failmsg: the identifier to be used for logging 1694 1695 """ 1696 failed = [] 1697 success = [] 1698 1699 for node in nodes: 1700 msg = result[node].fail_msg 1701 if msg: 1702 failed.append(node) 1703 logging.error("RPC call %s (%s) failed on node %s: %s", 1704 result[node].call, failmsg, node, msg) 1705 else: 1706 success.append(node) 1707 1708 # +1 for the master node 1709 if (len(success) + 1) < len(failed): 1710 # TODO: Handle failing nodes 1711 logging.error("More than half of the nodes failed")
1712
1713 - def _GetNodeIp(self):
1714 """Helper for returning the node name/ip list. 1715 1716 @rtype: (list, list) 1717 @return: a tuple of two lists, the first one with the node 1718 names and the second one with the node addresses 1719 1720 """ 1721 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1722 name_list = self._nodes.keys() 1723 addr_list = [self._nodes[name] for name in name_list] 1724 return name_list, addr_list
1725
1726 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1727 """Writes a file locally and then replicates it to all nodes. 1728 1729 This function will replace the contents of a file on the local 1730 node and then replicate it to all the other nodes we have. 1731 1732 @type file_name: str 1733 @param file_name: the path of the file to be replicated 1734 @type data: str 1735 @param data: the new contents of the file 1736 @type replicate: boolean 1737 @param replicate: whether to spread the changes to the remote nodes 1738 1739 """ 1740 getents = runtime.GetEnts() 1741 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1742 gid=getents.masterd_gid) 1743 1744 if replicate: 1745 names, addrs = self._GetNodeIp() 1746 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) 1747 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1748
1749 - def _RenameFilesUnlocked(self, rename):
1750 """Renames a file locally and then replicate the change. 1751 1752 This function will rename a file in the local queue directory 1753 and then replicate this rename to all the other nodes we have. 1754 1755 @type rename: list of (old, new) 1756 @param rename: List containing tuples mapping old to new names 1757 1758 """ 1759 # Rename them locally 1760 for old, new in rename: 1761 utils.RenameFile(old, new, mkdir=True) 1762 1763 # ... and on all nodes 1764 names, addrs = self._GetNodeIp() 1765 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) 1766 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1767 1768 @staticmethod
1769 - def _FormatJobID(job_id):
1770 """Convert a job ID to string format. 1771 1772 Currently this just does C{str(job_id)} after performing some 1773 checks, but if we want to change the job id format this will 1774 abstract this change. 1775 1776 @type job_id: int or long 1777 @param job_id: the numeric job id 1778 @rtype: str 1779 @return: the formatted job id 1780 1781 """ 1782 if not isinstance(job_id, (int, long)): 1783 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) 1784 if job_id < 0: 1785 raise errors.ProgrammerError("Job ID %s is negative" % job_id) 1786 1787 return str(job_id)
1788 1789 @classmethod
1790 - def _GetArchiveDirectory(cls, job_id):
1791 """Returns the archive directory for a job. 1792 1793 @type job_id: str 1794 @param job_id: Job identifier 1795 @rtype: str 1796 @return: Directory name 1797 1798 """ 1799 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1800
1801 - def _NewSerialsUnlocked(self, count):
1802 """Generates a new job identifier. 1803 1804 Job identifiers are unique during the lifetime of a cluster. 1805 1806 @type count: integer 1807 @param count: how many serials to return 1808 @rtype: str 1809 @return: a string representing the job identifier. 1810 1811 """ 1812 assert ht.TPositiveInt(count) 1813 1814 # New number 1815 serial = self._last_serial + count 1816 1817 # Write to file 1818 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE, 1819 "%s\n" % serial, True) 1820 1821 result = [self._FormatJobID(v) 1822 for v in range(self._last_serial + 1, serial + 1)] 1823 1824 # Keep it only if we were able to write the file 1825 self._last_serial = serial 1826 1827 assert len(result) == count 1828 1829 return result
1830 1831 @staticmethod
1832 - def _GetJobPath(job_id):
1833 """Returns the job file for a given job id. 1834 1835 @type job_id: str 1836 @param job_id: the job identifier 1837 @rtype: str 1838 @return: the path to the job file 1839 1840 """ 1841 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1842 1843 @classmethod
1844 - def _GetArchivedJobPath(cls, job_id):
1845 """Returns the archived job file for a give job id. 1846 1847 @type job_id: str 1848 @param job_id: the job identifier 1849 @rtype: str 1850 @return: the path to the archived job file 1851 1852 """ 1853 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, 1854 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1855 1856 @staticmethod
1857 - def _GetJobIDsUnlocked(sort=True):
1858 """Return all known job IDs. 1859 1860 The method only looks at disk because it's a requirement that all 1861 jobs are present on disk (so in the _memcache we don't have any 1862 extra IDs). 1863 1864 @type sort: boolean 1865 @param sort: perform sorting on the returned job ids 1866 @rtype: list 1867 @return: the list of job IDs 1868 1869 """ 1870 jlist = [] 1871 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR): 1872 m = constants.JOB_FILE_RE.match(filename) 1873 if m: 1874 jlist.append(m.group(1)) 1875 if sort: 1876 jlist = utils.NiceSort(jlist) 1877 return jlist
1878
1879 - def _LoadJobUnlocked(self, job_id):
1880 """Loads a job from the disk or memory. 1881 1882 Given a job id, this will return the cached job object if 1883 existing, or try to load the job from the disk. If loading from 1884 disk, it will also add the job to the cache. 1885 1886 @param job_id: the job id 1887 @rtype: L{_QueuedJob} or None 1888 @return: either None or the job object 1889 1890 """ 1891 job = self._memcache.get(job_id, None) 1892 if job: 1893 logging.debug("Found job %s in memcache", job_id) 1894 assert job.writable, "Found read-only job in memcache" 1895 return job 1896 1897 try: 1898 job = self._LoadJobFromDisk(job_id, False) 1899 if job is None: 1900 return job 1901 except errors.JobFileCorrupted: 1902 old_path = self._GetJobPath(job_id) 1903 new_path = self._GetArchivedJobPath(job_id) 1904 if old_path == new_path: 1905 # job already archived (future case) 1906 logging.exception("Can't parse job %s", job_id) 1907 else: 1908 # non-archived case 1909 logging.exception("Can't parse job %s, will archive.", job_id) 1910 self._RenameFilesUnlocked([(old_path, new_path)]) 1911 return None 1912 1913 assert job.writable, "Job just loaded is not writable" 1914 1915 self._memcache[job_id] = job 1916 logging.debug("Added job %s to the cache", job_id) 1917 return job
1918
1919 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1920 """Load the given job file from disk. 1921 1922 Given a job file, read, load and restore it in a _QueuedJob format. 1923 1924 @type job_id: string 1925 @param job_id: job identifier 1926 @type try_archived: bool 1927 @param try_archived: Whether to try loading an archived job 1928 @rtype: L{_QueuedJob} or None 1929 @return: either None or the job object 1930 1931 """ 1932 path_functions = [(self._GetJobPath, True)] 1933 1934 if try_archived: 1935 path_functions.append((self._GetArchivedJobPath, False)) 1936 1937 raw_data = None 1938 writable_default = None 1939 1940 for (fn, writable_default) in path_functions: 1941 filepath = fn(job_id) 1942 logging.debug("Loading job from %s", filepath) 1943 try: 1944 raw_data = utils.ReadFile(filepath) 1945 except EnvironmentError, err: 1946 if err.errno != errno.ENOENT: 1947 raise 1948 else: 1949 break 1950 1951 if not raw_data: 1952 return None 1953 1954 if writable is None: 1955 writable = writable_default 1956 1957 try: 1958 data = serializer.LoadJson(raw_data) 1959 job = _QueuedJob.Restore(self, data, writable) 1960 except Exception, err: # pylint: disable=W0703 1961 raise errors.JobFileCorrupted(err) 1962 1963 return job
1964
1965 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1966 """Load the given job file from disk. 1967 1968 Given a job file, read, load and restore it in a _QueuedJob format. 1969 In case of error reading the job, it gets returned as None, and the 1970 exception is logged. 1971 1972 @type job_id: string 1973 @param job_id: job identifier 1974 @type try_archived: bool 1975 @param try_archived: Whether to try loading an archived job 1976 @rtype: L{_QueuedJob} or None 1977 @return: either None or the job object 1978 1979 """ 1980 try: 1981 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 1982 except (errors.JobFileCorrupted, EnvironmentError): 1983 logging.exception("Can't load/parse job %s", job_id) 1984 return None
1985
1986 - def _UpdateQueueSizeUnlocked(self):
1987 """Update the queue size. 1988 1989 """ 1990 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1991 1992 @locking.ssynchronized(_LOCK) 1993 @_RequireOpenQueue
1994 - def SetDrainFlag(self, drain_flag):
1995 """Sets the drain flag for the queue. 1996 1997 @type drain_flag: boolean 1998 @param drain_flag: Whether to set or unset the drain flag 1999 2000 """ 2001 jstore.SetDrainFlag(drain_flag) 2002 2003 self._drained = drain_flag 2004 2005 return True
2006 2007 @_RequireOpenQueue
2008 - def _SubmitJobUnlocked(self, job_id, ops):
2009 """Create and store a new job. 2010 2011 This enters the job into our job queue and also puts it on the new 2012 queue, in order for it to be picked up by the queue processors. 2013 2014 @type job_id: job ID 2015 @param job_id: the job ID for the new job 2016 @type ops: list 2017 @param ops: The list of OpCodes that will become the new job. 2018 @rtype: L{_QueuedJob} 2019 @return: the job object to be queued 2020 @raise errors.JobQueueDrainError: if the job queue is marked for draining 2021 @raise errors.JobQueueFull: if the job queue has too many jobs in it 2022 @raise errors.GenericError: If an opcode is not valid 2023 2024 """ 2025 # Ok when sharing the big job queue lock, as the drain file is created when 2026 # the lock is exclusive. 2027 if self._drained: 2028 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 2029 2030 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: 2031 raise errors.JobQueueFull() 2032 2033 job = _QueuedJob(self, job_id, ops, True) 2034 2035 # Check priority 2036 for idx, op in enumerate(job.ops): 2037 if op.priority not in constants.OP_PRIO_SUBMIT_VALID: 2038 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2039 raise errors.GenericError("Opcode %s has invalid priority %s, allowed" 2040 " are %s" % (idx, op.priority, allowed)) 2041 2042 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) 2043 if not opcodes.TNoRelativeJobDependencies(dependencies): 2044 raise errors.GenericError("Opcode %s has invalid dependencies, must" 2045 " match %s: %s" % 2046 (idx, opcodes.TNoRelativeJobDependencies, 2047 dependencies)) 2048 2049 # Write to disk 2050 self.UpdateJobUnlocked(job) 2051 2052 self._queue_size += 1 2053 2054 logging.debug("Adding new job %s to the cache", job_id) 2055 self._memcache[job_id] = job 2056 2057 return job
2058 2059 @locking.ssynchronized(_LOCK) 2060 @_RequireOpenQueue
2061 - def SubmitJob(self, ops):
2062 """Create and store a new job. 2063 2064 @see: L{_SubmitJobUnlocked} 2065 2066 """ 2067 (job_id, ) = self._NewSerialsUnlocked(1) 2068 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) 2069 return job_id
2070 2071 @locking.ssynchronized(_LOCK) 2072 @_RequireOpenQueue
2073 - def SubmitManyJobs(self, jobs):
2074 """Create and store multiple jobs. 2075 2076 @see: L{_SubmitJobUnlocked} 2077 2078 """ 2079 all_job_ids = self._NewSerialsUnlocked(len(jobs)) 2080 2081 (results, added_jobs) = \ 2082 self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) 2083 2084 self._EnqueueJobsUnlocked(added_jobs) 2085 2086 return results
2087 2088 @staticmethod
2089 - def _FormatSubmitError(msg, ops):
2090 """Formats errors which occurred while submitting a job. 2091 2092 """ 2093 return ("%s; opcodes %s" % 2094 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2095 2096 @staticmethod
2097 - def _ResolveJobDependencies(resolve_fn, deps):
2098 """Resolves relative job IDs in dependencies. 2099 2100 @type resolve_fn: callable 2101 @param resolve_fn: Function to resolve a relative job ID 2102 @type deps: list 2103 @param deps: Dependencies 2104 @rtype: list 2105 @return: Resolved dependencies 2106 2107 """ 2108 result = [] 2109 2110 for (dep_job_id, dep_status) in deps: 2111 if ht.TRelativeJobId(dep_job_id): 2112 assert ht.TInt(dep_job_id) and dep_job_id < 0 2113 try: 2114 job_id = resolve_fn(dep_job_id) 2115 except IndexError: 2116 # Abort 2117 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 2118 else: 2119 job_id = dep_job_id 2120 2121 result.append((job_id, dep_status)) 2122 2123 return (True, result)
2124
2125 - def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2126 """Create and store multiple jobs. 2127 2128 @see: L{_SubmitJobUnlocked} 2129 2130 """ 2131 results = [] 2132 added_jobs = [] 2133 2134 def resolve_fn(job_idx, reljobid): 2135 assert reljobid < 0 2136 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2137 2138 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): 2139 for op in ops: 2140 if getattr(op, opcodes.DEPEND_ATTR, None): 2141 (status, data) = \ 2142 self._ResolveJobDependencies(compat.partial(resolve_fn, idx), 2143 op.depends) 2144 if not status: 2145 # Abort resolving dependencies 2146 assert ht.TNonEmptyString(data), "No error message" 2147 break 2148 # Use resolved dependencies 2149 op.depends = data 2150 else: 2151 try: 2152 job = self._SubmitJobUnlocked(job_id, ops) 2153 except errors.GenericError, err: 2154 status = False 2155 data = self._FormatSubmitError(str(err), ops) 2156 else: 2157 status = True 2158 data = job_id 2159 added_jobs.append(job) 2160 2161 results.append((status, data)) 2162 2163 return (results, added_jobs)
2164 2165 @locking.ssynchronized(_LOCK)
2166 - def _EnqueueJobs(self, jobs):
2167 """Helper function to add jobs to worker pool's queue. 2168 2169 @type jobs: list 2170 @param jobs: List of all jobs 2171 2172 """ 2173 return self._EnqueueJobsUnlocked(jobs)
2174
2175 - def _EnqueueJobsUnlocked(self, jobs):
2176 """Helper function to add jobs to worker pool's queue. 2177 2178 @type jobs: list 2179 @param jobs: List of all jobs 2180 2181 """ 2182 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" 2183 self._wpool.AddManyTasks([(job, ) for job in jobs], 2184 priority=[job.CalcPriority() for job in jobs])
2185
2186 - def _GetJobStatusForDependencies(self, job_id):
2187 """Gets the status of a job for dependencies. 2188 2189 @type job_id: string 2190 @param job_id: Job ID 2191 @raise errors.JobLost: If job can't be found 2192 2193 """ 2194 if not isinstance(job_id, basestring): 2195 job_id = self._FormatJobID(job_id) 2196 2197 # Not using in-memory cache as doing so would require an exclusive lock 2198 2199 # Try to load from disk 2200 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2201 2202 assert not job.writable, "Got writable job" # pylint: disable=E1101 2203 2204 if job: 2205 return job.CalcStatus() 2206 2207 raise errors.JobLost("Job %s not found" % job_id)
2208 2209 @_RequireOpenQueue
2210 - def UpdateJobUnlocked(self, job, replicate=True):
2211 """Update a job's on disk storage. 2212 2213 After a job has been modified, this function needs to be called in 2214 order to write the changes to disk and replicate them to the other 2215 nodes. 2216 2217 @type job: L{_QueuedJob} 2218 @param job: the changed job 2219 @type replicate: boolean 2220 @param replicate: whether to replicate the change to remote nodes 2221 2222 """ 2223 if __debug__: 2224 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 2225 assert (finalized ^ (job.end_timestamp is None)) 2226 assert job.writable, "Can't update read-only job" 2227 2228 filename = self._GetJobPath(job.id) 2229 data = serializer.DumpJson(job.Serialize(), indent=False) 2230 logging.debug("Writing job %s to %s", job.id, filename) 2231 self._UpdateJobQueueFile(filename, data, replicate)
2232
2233 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 2234 timeout):
2235 """Waits for changes in a job. 2236 2237 @type job_id: string 2238 @param job_id: Job identifier 2239 @type fields: list of strings 2240 @param fields: Which fields to check for changes 2241 @type prev_job_info: list or None 2242 @param prev_job_info: Last job information returned 2243 @type prev_log_serial: int 2244 @param prev_log_serial: Last job message serial number 2245 @type timeout: float 2246 @param timeout: maximum time to wait in seconds 2247 @rtype: tuple (job info, log entries) 2248 @return: a tuple of the job information as required via 2249 the fields parameter, and the log entries as a list 2250 2251 if the job has not changed and the timeout has expired, 2252 we instead return a special value, 2253 L{constants.JOB_NOTCHANGED}, which should be interpreted 2254 as such by the clients 2255 2256 """ 2257 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False, 2258 writable=False) 2259 2260 helper = _WaitForJobChangesHelper() 2261 2262 return helper(self._GetJobPath(job_id), load_fn, 2263 fields, prev_job_info, prev_log_serial, timeout)
2264 2265 @locking.ssynchronized(_LOCK) 2266 @_RequireOpenQueue
2267 - def CancelJob(self, job_id):
2268 """Cancels a job. 2269 2270 This will only succeed if the job has not started yet. 2271 2272 @type job_id: string 2273 @param job_id: job ID of job to be cancelled. 2274 2275 """ 2276 logging.info("Cancelling job %s", job_id) 2277 2278 job = self._LoadJobUnlocked(job_id) 2279 if not job: 2280 logging.debug("Job %s not found", job_id) 2281 return (False, "Job %s not found" % job_id) 2282 2283 assert job.writable, "Can't cancel read-only job" 2284 2285 (success, msg) = job.Cancel() 2286 2287 if success: 2288 # If the job was finalized (e.g. cancelled), this is the final write 2289 # allowed. The job can be archived anytime. 2290 self.UpdateJobUnlocked(job) 2291 2292 return (success, msg)
2293 2294 @_RequireOpenQueue
2295 - def _ArchiveJobsUnlocked(self, jobs):
2296 """Archives jobs. 2297 2298 @type jobs: list of L{_QueuedJob} 2299 @param jobs: Job objects 2300 @rtype: int 2301 @return: Number of archived jobs 2302 2303 """ 2304 archive_jobs = [] 2305 rename_files = [] 2306 for job in jobs: 2307 assert job.writable, "Can't archive read-only job" 2308 2309 if job.CalcStatus() not in constants.JOBS_FINALIZED: 2310 logging.debug("Job %s is not yet done", job.id) 2311 continue 2312 2313 archive_jobs.append(job) 2314 2315 old = self._GetJobPath(job.id) 2316 new = self._GetArchivedJobPath(job.id) 2317 rename_files.append((old, new)) 2318 2319 # TODO: What if 1..n files fail to rename? 2320 self._RenameFilesUnlocked(rename_files) 2321 2322 logging.debug("Successfully archived job(s) %s", 2323 utils.CommaJoin(job.id for job in archive_jobs)) 2324 2325 # Since we haven't quite checked, above, if we succeeded or failed renaming 2326 # the files, we update the cached queue size from the filesystem. When we 2327 # get around to fix the TODO: above, we can use the number of actually 2328 # archived jobs to fix this. 2329 self._UpdateQueueSizeUnlocked() 2330 return len(archive_jobs)
2331 2332 @locking.ssynchronized(_LOCK) 2333 @_RequireOpenQueue
2334 - def ArchiveJob(self, job_id):
2335 """Archives a job. 2336 2337 This is just a wrapper over L{_ArchiveJobsUnlocked}. 2338 2339 @type job_id: string 2340 @param job_id: Job ID of job to be archived. 2341 @rtype: bool 2342 @return: Whether job was archived 2343 2344 """ 2345 logging.info("Archiving job %s", job_id) 2346 2347 job = self._LoadJobUnlocked(job_id) 2348 if not job: 2349 logging.debug("Job %s not found", job_id) 2350 return False 2351 2352 return self._ArchiveJobsUnlocked([job]) == 1
2353 2354 @locking.ssynchronized(_LOCK) 2355 @_RequireOpenQueue
2356 - def AutoArchiveJobs(self, age, timeout):
2357 """Archives all jobs based on age. 2358 2359 The method will archive all jobs which are older than the age 2360 parameter. For jobs that don't have an end timestamp, the start 2361 timestamp will be considered. The special '-1' age will cause 2362 archival of all jobs (that are not running or queued). 2363 2364 @type age: int 2365 @param age: the minimum age in seconds 2366 2367 """ 2368 logging.info("Archiving jobs with age more than %s seconds", age) 2369 2370 now = time.time() 2371 end_time = now + timeout 2372 archived_count = 0 2373 last_touched = 0 2374 2375 all_job_ids = self._GetJobIDsUnlocked() 2376 pending = [] 2377 for idx, job_id in enumerate(all_job_ids): 2378 last_touched = idx + 1 2379 2380 # Not optimal because jobs could be pending 2381 # TODO: Measure average duration for job archival and take number of 2382 # pending jobs into account. 2383 if time.time() > end_time: 2384 break 2385 2386 # Returns None if the job failed to load 2387 job = self._LoadJobUnlocked(job_id) 2388 if job: 2389 if job.end_timestamp is None: 2390 if job.start_timestamp is None: 2391 job_age = job.received_timestamp 2392 else: 2393 job_age = job.start_timestamp 2394 else: 2395 job_age = job.end_timestamp 2396 2397 if age == -1 or now - job_age[0] > age: 2398 pending.append(job) 2399 2400 # Archive 10 jobs at a time 2401 if len(pending) >= 10: 2402 archived_count += self._ArchiveJobsUnlocked(pending) 2403 pending = [] 2404 2405 if pending: 2406 archived_count += self._ArchiveJobsUnlocked(pending) 2407 2408 return (archived_count, len(all_job_ids) - last_touched)
2409
2410 - def QueryJobs(self, job_ids, fields):
2411 """Returns a list of jobs in queue. 2412 2413 @type job_ids: list 2414 @param job_ids: sequence of job identifiers or None for all 2415 @type fields: list 2416 @param fields: names of fields to return 2417 @rtype: list 2418 @return: list one element per job, each element being list with 2419 the requested fields 2420 2421 """ 2422 jobs = [] 2423 list_all = False 2424 if not job_ids: 2425 # Since files are added to/removed from the queue atomically, there's no 2426 # risk of getting the job ids in an inconsistent state. 2427 job_ids = self._GetJobIDsUnlocked() 2428 list_all = True 2429 2430 for job_id in job_ids: 2431 job = self.SafeLoadJobFromDisk(job_id, True) 2432 if job is not None: 2433 jobs.append(job.GetInfo(fields)) 2434 elif not list_all: 2435 jobs.append(None) 2436 2437 return jobs
2438 2439 @locking.ssynchronized(_LOCK) 2440 @_RequireOpenQueue
2441 - def Shutdown(self):
2442 """Stops the job queue. 2443 2444 This shutdowns all the worker threads an closes the queue. 2445 2446 """ 2447 self._wpool.TerminateWorkers() 2448 2449 self._queue_filelock.Close() 2450 self._queue_filelock = None
2451