Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the job queue handling. 
  23   
  24  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  25  used by all other classes in this module. 
  26   
  27  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  28      processing jobs 
  29   
  30  """ 
  31   
  32  import logging 
  33  import errno 
  34  import time 
  35  import weakref 
  36  import threading 
  37  import itertools 
  38   
  39  try: 
  40    # pylint: disable=E0611 
  41    from pyinotify import pyinotify 
  42  except ImportError: 
  43    import pyinotify 
  44   
  45  from ganeti import asyncnotifier 
  46  from ganeti import constants 
  47  from ganeti import serializer 
  48  from ganeti import workerpool 
  49  from ganeti import locking 
  50  from ganeti import opcodes 
  51  from ganeti import errors 
  52  from ganeti import mcpu 
  53  from ganeti import utils 
  54  from ganeti import jstore 
  55  from ganeti import rpc 
  56  from ganeti import runtime 
  57  from ganeti import netutils 
  58  from ganeti import compat 
  59  from ganeti import ht 
  60  from ganeti import query 
  61  from ganeti import qlang 
  62   
  63   
  64  JOBQUEUE_THREADS = 25 
  65  JOBS_PER_ARCHIVE_DIRECTORY = 10000 
  66   
  67  # member lock names to be passed to @ssynchronized decorator 
  68  _LOCK = "_lock" 
  69  _QUEUE = "_queue" 
70 71 72 -class CancelJob(Exception):
73 """Special exception to cancel a job. 74 75 """
76
77 78 -class QueueShutdown(Exception):
79 """Special exception to abort a job when the job queue is shutting down. 80 81 """
82
83 84 -def TimeStampNow():
85 """Returns the current timestamp. 86 87 @rtype: tuple 88 @return: the current time in the (seconds, microseconds) format 89 90 """ 91 return utils.SplitTime(time.time())
92
93 94 -class _SimpleJobQuery:
95 """Wrapper for job queries. 96 97 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}. 98 99 """
100 - def __init__(self, fields):
101 """Initializes this class. 102 103 """ 104 self._query = query.Query(query.JOB_FIELDS, fields)
105
106 - def __call__(self, job):
107 """Executes a job query using cached field list. 108 109 """ 110 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
111
112 113 -class _QueuedOpCode(object):
114 """Encapsulates an opcode object. 115 116 @ivar log: holds the execution log and consists of tuples 117 of the form C{(log_serial, timestamp, level, message)} 118 @ivar input: the OpCode we encapsulate 119 @ivar status: the current status 120 @ivar result: the result of the LU execution 121 @ivar start_timestamp: timestamp for the start of the execution 122 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 123 @ivar stop_timestamp: timestamp for the end of the execution 124 125 """ 126 __slots__ = ["input", "status", "result", "log", "priority", 127 "start_timestamp", "exec_timestamp", "end_timestamp", 128 "__weakref__"] 129
130 - def __init__(self, op):
131 """Initializes instances of this class. 132 133 @type op: L{opcodes.OpCode} 134 @param op: the opcode we encapsulate 135 136 """ 137 self.input = op 138 self.status = constants.OP_STATUS_QUEUED 139 self.result = None 140 self.log = [] 141 self.start_timestamp = None 142 self.exec_timestamp = None 143 self.end_timestamp = None 144 145 # Get initial priority (it might change during the lifetime of this opcode) 146 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
147 148 @classmethod
149 - def Restore(cls, state):
150 """Restore the _QueuedOpCode from the serialized form. 151 152 @type state: dict 153 @param state: the serialized state 154 @rtype: _QueuedOpCode 155 @return: a new _QueuedOpCode instance 156 157 """ 158 obj = _QueuedOpCode.__new__(cls) 159 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 160 obj.status = state["status"] 161 obj.result = state["result"] 162 obj.log = state["log"] 163 obj.start_timestamp = state.get("start_timestamp", None) 164 obj.exec_timestamp = state.get("exec_timestamp", None) 165 obj.end_timestamp = state.get("end_timestamp", None) 166 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 167 return obj
168
169 - def Serialize(self):
170 """Serializes this _QueuedOpCode. 171 172 @rtype: dict 173 @return: the dictionary holding the serialized state 174 175 """ 176 return { 177 "input": self.input.__getstate__(), 178 "status": self.status, 179 "result": self.result, 180 "log": self.log, 181 "start_timestamp": self.start_timestamp, 182 "exec_timestamp": self.exec_timestamp, 183 "end_timestamp": self.end_timestamp, 184 "priority": self.priority, 185 }
186
187 188 -class _QueuedJob(object):
189 """In-memory job representation. 190 191 This is what we use to track the user-submitted jobs. Locking must 192 be taken care of by users of this class. 193 194 @type queue: L{JobQueue} 195 @ivar queue: the parent queue 196 @ivar id: the job ID 197 @type ops: list 198 @ivar ops: the list of _QueuedOpCode that constitute the job 199 @type log_serial: int 200 @ivar log_serial: holds the index for the next log entry 201 @ivar received_timestamp: the timestamp for when the job was received 202 @ivar start_timestmap: the timestamp for start of execution 203 @ivar end_timestamp: the timestamp for end of execution 204 @ivar writable: Whether the job is allowed to be modified 205 206 """ 207 # pylint: disable=W0212 208 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 209 "received_timestamp", "start_timestamp", "end_timestamp", 210 "__weakref__", "processor_lock", "writable"] 211
212 - def __init__(self, queue, job_id, ops, writable):
213 """Constructor for the _QueuedJob. 214 215 @type queue: L{JobQueue} 216 @param queue: our parent queue 217 @type job_id: job_id 218 @param job_id: our job id 219 @type ops: list 220 @param ops: the list of opcodes we hold, which will be encapsulated 221 in _QueuedOpCodes 222 @type writable: bool 223 @param writable: Whether job can be modified 224 225 """ 226 if not ops: 227 raise errors.GenericError("A job needs at least one opcode") 228 229 self.queue = queue 230 self.id = job_id 231 self.ops = [_QueuedOpCode(op) for op in ops] 232 self.log_serial = 0 233 self.received_timestamp = TimeStampNow() 234 self.start_timestamp = None 235 self.end_timestamp = None 236 237 self._InitInMemory(self, writable)
238 239 @staticmethod
240 - def _InitInMemory(obj, writable):
241 """Initializes in-memory variables. 242 243 """ 244 obj.writable = writable 245 obj.ops_iter = None 246 obj.cur_opctx = None 247 248 # Read-only jobs are not processed and therefore don't need a lock 249 if writable: 250 obj.processor_lock = threading.Lock() 251 else: 252 obj.processor_lock = None
253
254 - def __repr__(self):
255 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 256 "id=%s" % self.id, 257 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 258 259 return "<%s at %#x>" % (" ".join(status), id(self))
260 261 @classmethod
262 - def Restore(cls, queue, state, writable):
263 """Restore a _QueuedJob from serialized state: 264 265 @type queue: L{JobQueue} 266 @param queue: to which queue the restored job belongs 267 @type state: dict 268 @param state: the serialized state 269 @type writable: bool 270 @param writable: Whether job can be modified 271 @rtype: _JobQueue 272 @return: the restored _JobQueue instance 273 274 """ 275 obj = _QueuedJob.__new__(cls) 276 obj.queue = queue 277 obj.id = state["id"] 278 obj.received_timestamp = state.get("received_timestamp", None) 279 obj.start_timestamp = state.get("start_timestamp", None) 280 obj.end_timestamp = state.get("end_timestamp", None) 281 282 obj.ops = [] 283 obj.log_serial = 0 284 for op_state in state["ops"]: 285 op = _QueuedOpCode.Restore(op_state) 286 for log_entry in op.log: 287 obj.log_serial = max(obj.log_serial, log_entry[0]) 288 obj.ops.append(op) 289 290 cls._InitInMemory(obj, writable) 291 292 return obj
293
294 - def Serialize(self):
295 """Serialize the _JobQueue instance. 296 297 @rtype: dict 298 @return: the serialized state 299 300 """ 301 return { 302 "id": self.id, 303 "ops": [op.Serialize() for op in self.ops], 304 "start_timestamp": self.start_timestamp, 305 "end_timestamp": self.end_timestamp, 306 "received_timestamp": self.received_timestamp, 307 }
308
309 - def CalcStatus(self):
310 """Compute the status of this job. 311 312 This function iterates over all the _QueuedOpCodes in the job and 313 based on their status, computes the job status. 314 315 The algorithm is: 316 - if we find a cancelled, or finished with error, the job 317 status will be the same 318 - otherwise, the last opcode with the status one of: 319 - waitlock 320 - canceling 321 - running 322 323 will determine the job status 324 325 - otherwise, it means either all opcodes are queued, or success, 326 and the job status will be the same 327 328 @return: the job status 329 330 """ 331 status = constants.JOB_STATUS_QUEUED 332 333 all_success = True 334 for op in self.ops: 335 if op.status == constants.OP_STATUS_SUCCESS: 336 continue 337 338 all_success = False 339 340 if op.status == constants.OP_STATUS_QUEUED: 341 pass 342 elif op.status == constants.OP_STATUS_WAITING: 343 status = constants.JOB_STATUS_WAITING 344 elif op.status == constants.OP_STATUS_RUNNING: 345 status = constants.JOB_STATUS_RUNNING 346 elif op.status == constants.OP_STATUS_CANCELING: 347 status = constants.JOB_STATUS_CANCELING 348 break 349 elif op.status == constants.OP_STATUS_ERROR: 350 status = constants.JOB_STATUS_ERROR 351 # The whole job fails if one opcode failed 352 break 353 elif op.status == constants.OP_STATUS_CANCELED: 354 status = constants.OP_STATUS_CANCELED 355 break 356 357 if all_success: 358 status = constants.JOB_STATUS_SUCCESS 359 360 return status
361
362 - def CalcPriority(self):
363 """Gets the current priority for this job. 364 365 Only unfinished opcodes are considered. When all are done, the default 366 priority is used. 367 368 @rtype: int 369 370 """ 371 priorities = [op.priority for op in self.ops 372 if op.status not in constants.OPS_FINALIZED] 373 374 if not priorities: 375 # All opcodes are done, assume default priority 376 return constants.OP_PRIO_DEFAULT 377 378 return min(priorities)
379
380 - def GetLogEntries(self, newer_than):
381 """Selectively returns the log entries. 382 383 @type newer_than: None or int 384 @param newer_than: if this is None, return all log entries, 385 otherwise return only the log entries with serial higher 386 than this value 387 @rtype: list 388 @return: the list of the log entries selected 389 390 """ 391 if newer_than is None: 392 serial = -1 393 else: 394 serial = newer_than 395 396 entries = [] 397 for op in self.ops: 398 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 399 400 return entries
401
402 - def GetInfo(self, fields):
403 """Returns information about a job. 404 405 @type fields: list 406 @param fields: names of fields to return 407 @rtype: list 408 @return: list with one element for each field 409 @raise errors.OpExecError: when an invalid field 410 has been passed 411 412 """ 413 return _SimpleJobQuery(fields)(self)
414
415 - def MarkUnfinishedOps(self, status, result):
416 """Mark unfinished opcodes with a given status and result. 417 418 This is an utility function for marking all running or waiting to 419 be run opcodes with a given status. Opcodes which are already 420 finalised are not changed. 421 422 @param status: a given opcode status 423 @param result: the opcode result 424 425 """ 426 not_marked = True 427 for op in self.ops: 428 if op.status in constants.OPS_FINALIZED: 429 assert not_marked, "Finalized opcodes found after non-finalized ones" 430 continue 431 op.status = status 432 op.result = result 433 not_marked = False
434
435 - def Finalize(self):
436 """Marks the job as finalized. 437 438 """ 439 self.end_timestamp = TimeStampNow()
440
441 - def Cancel(self):
442 """Marks job as canceled/-ing if possible. 443 444 @rtype: tuple; (bool, string) 445 @return: Boolean describing whether job was successfully canceled or marked 446 as canceling and a text message 447 448 """ 449 status = self.CalcStatus() 450 451 if status == constants.JOB_STATUS_QUEUED: 452 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 453 "Job canceled by request") 454 self.Finalize() 455 return (True, "Job %s canceled" % self.id) 456 457 elif status == constants.JOB_STATUS_WAITING: 458 # The worker will notice the new status and cancel the job 459 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 460 return (True, "Job %s will be canceled" % self.id) 461 462 else: 463 logging.debug("Job %s is no longer waiting in the queue", self.id) 464 return (False, "Job %s is no longer waiting in the queue" % self.id)
465
466 467 -class _OpExecCallbacks(mcpu.OpExecCbBase):
468 - def __init__(self, queue, job, op):
469 """Initializes this class. 470 471 @type queue: L{JobQueue} 472 @param queue: Job queue 473 @type job: L{_QueuedJob} 474 @param job: Job object 475 @type op: L{_QueuedOpCode} 476 @param op: OpCode 477 478 """ 479 assert queue, "Queue is missing" 480 assert job, "Job is missing" 481 assert op, "Opcode is missing" 482 483 self._queue = queue 484 self._job = job 485 self._op = op
486
487 - def _CheckCancel(self):
488 """Raises an exception to cancel the job if asked to. 489 490 """ 491 # Cancel here if we were asked to 492 if self._op.status == constants.OP_STATUS_CANCELING: 493 logging.debug("Canceling opcode") 494 raise CancelJob() 495 496 # See if queue is shutting down 497 if not self._queue.AcceptingJobsUnlocked(): 498 logging.debug("Queue is shutting down") 499 raise QueueShutdown()
500 501 @locking.ssynchronized(_QUEUE, shared=1)
502 - def NotifyStart(self):
503 """Mark the opcode as running, not lock-waiting. 504 505 This is called from the mcpu code as a notifier function, when the LU is 506 finally about to start the Exec() method. Of course, to have end-user 507 visible results, the opcode must be initially (before calling into 508 Processor.ExecOpCode) set to OP_STATUS_WAITING. 509 510 """ 511 assert self._op in self._job.ops 512 assert self._op.status in (constants.OP_STATUS_WAITING, 513 constants.OP_STATUS_CANCELING) 514 515 # Cancel here if we were asked to 516 self._CheckCancel() 517 518 logging.debug("Opcode is now running") 519 520 self._op.status = constants.OP_STATUS_RUNNING 521 self._op.exec_timestamp = TimeStampNow() 522 523 # And finally replicate the job status 524 self._queue.UpdateJobUnlocked(self._job)
525 526 @locking.ssynchronized(_QUEUE, shared=1)
527 - def _AppendFeedback(self, timestamp, log_type, log_msg):
528 """Internal feedback append function, with locks 529 530 """ 531 self._job.log_serial += 1 532 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 533 self._queue.UpdateJobUnlocked(self._job, replicate=False)
534
535 - def Feedback(self, *args):
536 """Append a log entry. 537 538 """ 539 assert len(args) < 3 540 541 if len(args) == 1: 542 log_type = constants.ELOG_MESSAGE 543 log_msg = args[0] 544 else: 545 (log_type, log_msg) = args 546 547 # The time is split to make serialization easier and not lose 548 # precision. 549 timestamp = utils.SplitTime(time.time()) 550 self._AppendFeedback(timestamp, log_type, log_msg)
551
552 - def CheckCancel(self):
553 """Check whether job has been cancelled. 554 555 """ 556 assert self._op.status in (constants.OP_STATUS_WAITING, 557 constants.OP_STATUS_CANCELING) 558 559 # Cancel here if we were asked to 560 self._CheckCancel()
561
562 - def SubmitManyJobs(self, jobs):
563 """Submits jobs for processing. 564 565 See L{JobQueue.SubmitManyJobs}. 566 567 """ 568 # Locking is done in job queue 569 return self._queue.SubmitManyJobs(jobs)
570
571 572 -class _JobChangesChecker(object):
573 - def __init__(self, fields, prev_job_info, prev_log_serial):
574 """Initializes this class. 575 576 @type fields: list of strings 577 @param fields: Fields requested by LUXI client 578 @type prev_job_info: string 579 @param prev_job_info: previous job info, as passed by the LUXI client 580 @type prev_log_serial: string 581 @param prev_log_serial: previous job serial, as passed by the LUXI client 582 583 """ 584 self._squery = _SimpleJobQuery(fields) 585 self._prev_job_info = prev_job_info 586 self._prev_log_serial = prev_log_serial
587
588 - def __call__(self, job):
589 """Checks whether job has changed. 590 591 @type job: L{_QueuedJob} 592 @param job: Job object 593 594 """ 595 assert not job.writable, "Expected read-only job" 596 597 status = job.CalcStatus() 598 job_info = self._squery(job) 599 log_entries = job.GetLogEntries(self._prev_log_serial) 600 601 # Serializing and deserializing data can cause type changes (e.g. from 602 # tuple to list) or precision loss. We're doing it here so that we get 603 # the same modifications as the data received from the client. Without 604 # this, the comparison afterwards might fail without the data being 605 # significantly different. 606 # TODO: we just deserialized from disk, investigate how to make sure that 607 # the job info and log entries are compatible to avoid this further step. 608 # TODO: Doing something like in testutils.py:UnifyValueType might be more 609 # efficient, though floats will be tricky 610 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 611 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 612 613 # Don't even try to wait if the job is no longer running, there will be 614 # no changes. 615 if (status not in (constants.JOB_STATUS_QUEUED, 616 constants.JOB_STATUS_RUNNING, 617 constants.JOB_STATUS_WAITING) or 618 job_info != self._prev_job_info or 619 (log_entries and self._prev_log_serial != log_entries[0][0])): 620 logging.debug("Job %s changed", job.id) 621 return (job_info, log_entries) 622 623 return None
624
625 626 -class _JobFileChangesWaiter(object):
627 - def __init__(self, filename):
628 """Initializes this class. 629 630 @type filename: string 631 @param filename: Path to job file 632 @raises errors.InotifyError: if the notifier cannot be setup 633 634 """ 635 self._wm = pyinotify.WatchManager() 636 self._inotify_handler = \ 637 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) 638 self._notifier = \ 639 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) 640 try: 641 self._inotify_handler.enable() 642 except Exception: 643 # pyinotify doesn't close file descriptors automatically 644 self._notifier.stop() 645 raise
646
647 - def _OnInotify(self, notifier_enabled):
648 """Callback for inotify. 649 650 """ 651 if not notifier_enabled: 652 self._inotify_handler.enable()
653
654 - def Wait(self, timeout):
655 """Waits for the job file to change. 656 657 @type timeout: float 658 @param timeout: Timeout in seconds 659 @return: Whether there have been events 660 661 """ 662 assert timeout >= 0 663 have_events = self._notifier.check_events(timeout * 1000) 664 if have_events: 665 self._notifier.read_events() 666 self._notifier.process_events() 667 return have_events
668
669 - def Close(self):
670 """Closes underlying notifier and its file descriptor. 671 672 """ 673 self._notifier.stop()
674
675 676 -class _JobChangesWaiter(object):
677 - def __init__(self, filename):
678 """Initializes this class. 679 680 @type filename: string 681 @param filename: Path to job file 682 683 """ 684 self._filewaiter = None 685 self._filename = filename
686
687 - def Wait(self, timeout):
688 """Waits for a job to change. 689 690 @type timeout: float 691 @param timeout: Timeout in seconds 692 @return: Whether there have been events 693 694 """ 695 if self._filewaiter: 696 return self._filewaiter.Wait(timeout) 697 698 # Lazy setup: Avoid inotify setup cost when job file has already changed. 699 # If this point is reached, return immediately and let caller check the job 700 # file again in case there were changes since the last check. This avoids a 701 # race condition. 702 self._filewaiter = _JobFileChangesWaiter(self._filename) 703 704 return True
705
706 - def Close(self):
707 """Closes underlying waiter. 708 709 """ 710 if self._filewaiter: 711 self._filewaiter.Close()
712
713 714 -class _WaitForJobChangesHelper(object):
715 """Helper class using inotify to wait for changes in a job file. 716 717 This class takes a previous job status and serial, and alerts the client when 718 the current job status has changed. 719 720 """ 721 @staticmethod
722 - def _CheckForChanges(counter, job_load_fn, check_fn):
723 if counter.next() > 0: 724 # If this isn't the first check the job is given some more time to change 725 # again. This gives better performance for jobs generating many 726 # changes/messages. 727 time.sleep(0.1) 728 729 job = job_load_fn() 730 if not job: 731 raise errors.JobLost() 732 733 result = check_fn(job) 734 if result is None: 735 raise utils.RetryAgain() 736 737 return result
738
739 - def __call__(self, filename, job_load_fn, 740 fields, prev_job_info, prev_log_serial, timeout):
741 """Waits for changes on a job. 742 743 @type filename: string 744 @param filename: File on which to wait for changes 745 @type job_load_fn: callable 746 @param job_load_fn: Function to load job 747 @type fields: list of strings 748 @param fields: Which fields to check for changes 749 @type prev_job_info: list or None 750 @param prev_job_info: Last job information returned 751 @type prev_log_serial: int 752 @param prev_log_serial: Last job message serial number 753 @type timeout: float 754 @param timeout: maximum time to wait in seconds 755 756 """ 757 counter = itertools.count() 758 try: 759 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) 760 waiter = _JobChangesWaiter(filename) 761 try: 762 return utils.Retry(compat.partial(self._CheckForChanges, 763 counter, job_load_fn, check_fn), 764 utils.RETRY_REMAINING_TIME, timeout, 765 wait_fn=waiter.Wait) 766 finally: 767 waiter.Close() 768 except (errors.InotifyError, errors.JobLost): 769 return None 770 except utils.RetryTimeout: 771 return constants.JOB_NOTCHANGED
772
773 774 -def _EncodeOpError(err):
775 """Encodes an error which occurred while processing an opcode. 776 777 """ 778 if isinstance(err, errors.GenericError): 779 to_encode = err 780 else: 781 to_encode = errors.OpExecError(str(err)) 782 783 return errors.EncodeException(to_encode)
784
785 786 -class _TimeoutStrategyWrapper:
787 - def __init__(self, fn):
788 """Initializes this class. 789 790 """ 791 self._fn = fn 792 self._next = None
793
794 - def _Advance(self):
795 """Gets the next timeout if necessary. 796 797 """ 798 if self._next is None: 799 self._next = self._fn()
800
801 - def Peek(self):
802 """Returns the next timeout. 803 804 """ 805 self._Advance() 806 return self._next
807
808 - def Next(self):
809 """Returns the current timeout and advances the internal state. 810 811 """ 812 self._Advance() 813 result = self._next 814 self._next = None 815 return result
816
817 818 -class _OpExecContext:
819 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
820 """Initializes this class. 821 822 """ 823 self.op = op 824 self.index = index 825 self.log_prefix = log_prefix 826 self.summary = op.input.Summary() 827 828 # Create local copy to modify 829 if getattr(op.input, opcodes.DEPEND_ATTR, None): 830 self.jobdeps = op.input.depends[:] 831 else: 832 self.jobdeps = None 833 834 self._timeout_strategy_factory = timeout_strategy_factory 835 self._ResetTimeoutStrategy()
836
837 - def _ResetTimeoutStrategy(self):
838 """Creates a new timeout strategy. 839 840 """ 841 self._timeout_strategy = \ 842 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
843
844 - def CheckPriorityIncrease(self):
845 """Checks whether priority can and should be increased. 846 847 Called when locks couldn't be acquired. 848 849 """ 850 op = self.op 851 852 # Exhausted all retries and next round should not use blocking acquire 853 # for locks? 854 if (self._timeout_strategy.Peek() is None and 855 op.priority > constants.OP_PRIO_HIGHEST): 856 logging.debug("Increasing priority") 857 op.priority -= 1 858 self._ResetTimeoutStrategy() 859 return True 860 861 return False
862
863 - def GetNextLockTimeout(self):
864 """Returns the next lock acquire timeout. 865 866 """ 867 return self._timeout_strategy.Next()
868
869 870 -class _JobProcessor(object):
871 (DEFER, 872 WAITDEP, 873 FINISHED) = range(1, 4) 874
875 - def __init__(self, queue, opexec_fn, job, 876 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
877 """Initializes this class. 878 879 """ 880 self.queue = queue 881 self.opexec_fn = opexec_fn 882 self.job = job 883 self._timeout_strategy_factory = _timeout_strategy_factory
884 885 @staticmethod
886 - def _FindNextOpcode(job, timeout_strategy_factory):
887 """Locates the next opcode to run. 888 889 @type job: L{_QueuedJob} 890 @param job: Job object 891 @param timeout_strategy_factory: Callable to create new timeout strategy 892 893 """ 894 # Create some sort of a cache to speed up locating next opcode for future 895 # lookups 896 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 897 # pending and one for processed ops. 898 if job.ops_iter is None: 899 job.ops_iter = enumerate(job.ops) 900 901 # Find next opcode to run 902 while True: 903 try: 904 (idx, op) = job.ops_iter.next() 905 except StopIteration: 906 raise errors.ProgrammerError("Called for a finished job") 907 908 if op.status == constants.OP_STATUS_RUNNING: 909 # Found an opcode already marked as running 910 raise errors.ProgrammerError("Called for job marked as running") 911 912 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 913 timeout_strategy_factory) 914 915 if op.status not in constants.OPS_FINALIZED: 916 return opctx 917 918 # This is a job that was partially completed before master daemon 919 # shutdown, so it can be expected that some opcodes are already 920 # completed successfully (if any did error out, then the whole job 921 # should have been aborted and not resubmitted for processing). 922 logging.info("%s: opcode %s already processed, skipping", 923 opctx.log_prefix, opctx.summary)
924 925 @staticmethod
926 - def _MarkWaitlock(job, op):
927 """Marks an opcode as waiting for locks. 928 929 The job's start timestamp is also set if necessary. 930 931 @type job: L{_QueuedJob} 932 @param job: Job object 933 @type op: L{_QueuedOpCode} 934 @param op: Opcode object 935 936 """ 937 assert op in job.ops 938 assert op.status in (constants.OP_STATUS_QUEUED, 939 constants.OP_STATUS_WAITING) 940 941 update = False 942 943 op.result = None 944 945 if op.status == constants.OP_STATUS_QUEUED: 946 op.status = constants.OP_STATUS_WAITING 947 update = True 948 949 if op.start_timestamp is None: 950 op.start_timestamp = TimeStampNow() 951 update = True 952 953 if job.start_timestamp is None: 954 job.start_timestamp = op.start_timestamp 955 update = True 956 957 assert op.status == constants.OP_STATUS_WAITING 958 959 return update
960 961 @staticmethod
962 - def _CheckDependencies(queue, job, opctx):
963 """Checks if an opcode has dependencies and if so, processes them. 964 965 @type queue: L{JobQueue} 966 @param queue: Queue object 967 @type job: L{_QueuedJob} 968 @param job: Job object 969 @type opctx: L{_OpExecContext} 970 @param opctx: Opcode execution context 971 @rtype: bool 972 @return: Whether opcode will be re-scheduled by dependency tracker 973 974 """ 975 op = opctx.op 976 977 result = False 978 979 while opctx.jobdeps: 980 (dep_job_id, dep_status) = opctx.jobdeps[0] 981 982 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 983 dep_status) 984 assert ht.TNonEmptyString(depmsg), "No dependency message" 985 986 logging.info("%s: %s", opctx.log_prefix, depmsg) 987 988 if depresult == _JobDependencyManager.CONTINUE: 989 # Remove dependency and continue 990 opctx.jobdeps.pop(0) 991 992 elif depresult == _JobDependencyManager.WAIT: 993 # Need to wait for notification, dependency tracker will re-add job 994 # to workerpool 995 result = True 996 break 997 998 elif depresult == _JobDependencyManager.CANCEL: 999 # Job was cancelled, cancel this job as well 1000 job.Cancel() 1001 assert op.status == constants.OP_STATUS_CANCELING 1002 break 1003 1004 elif depresult in (_JobDependencyManager.WRONGSTATUS, 1005 _JobDependencyManager.ERROR): 1006 # Job failed or there was an error, this job must fail 1007 op.status = constants.OP_STATUS_ERROR 1008 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 1009 break 1010 1011 else: 1012 raise errors.ProgrammerError("Unknown dependency result '%s'" % 1013 depresult) 1014 1015 return result
1016
1017 - def _ExecOpCodeUnlocked(self, opctx):
1018 """Processes one opcode and returns the result. 1019 1020 """ 1021 op = opctx.op 1022 1023 assert op.status == constants.OP_STATUS_WAITING 1024 1025 timeout = opctx.GetNextLockTimeout() 1026 1027 try: 1028 # Make sure not to hold queue lock while calling ExecOpCode 1029 result = self.opexec_fn(op.input, 1030 _OpExecCallbacks(self.queue, self.job, op), 1031 timeout=timeout, priority=op.priority) 1032 except mcpu.LockAcquireTimeout: 1033 assert timeout is not None, "Received timeout for blocking acquire" 1034 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 1035 1036 assert op.status in (constants.OP_STATUS_WAITING, 1037 constants.OP_STATUS_CANCELING) 1038 1039 # Was job cancelled while we were waiting for the lock? 1040 if op.status == constants.OP_STATUS_CANCELING: 1041 return (constants.OP_STATUS_CANCELING, None) 1042 1043 # Queue is shutting down, return to queued 1044 if not self.queue.AcceptingJobsUnlocked(): 1045 return (constants.OP_STATUS_QUEUED, None) 1046 1047 # Stay in waitlock while trying to re-acquire lock 1048 return (constants.OP_STATUS_WAITING, None) 1049 except CancelJob: 1050 logging.exception("%s: Canceling job", opctx.log_prefix) 1051 assert op.status == constants.OP_STATUS_CANCELING 1052 return (constants.OP_STATUS_CANCELING, None) 1053 1054 except QueueShutdown: 1055 logging.exception("%s: Queue is shutting down", opctx.log_prefix) 1056 1057 assert op.status == constants.OP_STATUS_WAITING 1058 1059 # Job hadn't been started yet, so it should return to the queue 1060 return (constants.OP_STATUS_QUEUED, None) 1061 1062 except Exception, err: # pylint: disable=W0703 1063 logging.exception("%s: Caught exception in %s", 1064 opctx.log_prefix, opctx.summary) 1065 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 1066 else: 1067 logging.debug("%s: %s successful", 1068 opctx.log_prefix, opctx.summary) 1069 return (constants.OP_STATUS_SUCCESS, result)
1070
1071 - def __call__(self, _nextop_fn=None):
1072 """Continues execution of a job. 1073 1074 @param _nextop_fn: Callback function for tests 1075 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 1076 be deferred and C{WAITDEP} if the dependency manager 1077 (L{_JobDependencyManager}) will re-schedule the job when appropriate 1078 1079 """ 1080 queue = self.queue 1081 job = self.job 1082 1083 logging.debug("Processing job %s", job.id) 1084 1085 queue.acquire(shared=1) 1086 try: 1087 opcount = len(job.ops) 1088 1089 assert job.writable, "Expected writable job" 1090 1091 # Don't do anything for finalized jobs 1092 if job.CalcStatus() in constants.JOBS_FINALIZED: 1093 return self.FINISHED 1094 1095 # Is a previous opcode still pending? 1096 if job.cur_opctx: 1097 opctx = job.cur_opctx 1098 job.cur_opctx = None 1099 else: 1100 if __debug__ and _nextop_fn: 1101 _nextop_fn() 1102 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 1103 1104 op = opctx.op 1105 1106 # Consistency check 1107 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 1108 constants.OP_STATUS_CANCELING) 1109 for i in job.ops[opctx.index + 1:]) 1110 1111 assert op.status in (constants.OP_STATUS_QUEUED, 1112 constants.OP_STATUS_WAITING, 1113 constants.OP_STATUS_CANCELING) 1114 1115 assert (op.priority <= constants.OP_PRIO_LOWEST and 1116 op.priority >= constants.OP_PRIO_HIGHEST) 1117 1118 waitjob = None 1119 1120 if op.status != constants.OP_STATUS_CANCELING: 1121 assert op.status in (constants.OP_STATUS_QUEUED, 1122 constants.OP_STATUS_WAITING) 1123 1124 # Prepare to start opcode 1125 if self._MarkWaitlock(job, op): 1126 # Write to disk 1127 queue.UpdateJobUnlocked(job) 1128 1129 assert op.status == constants.OP_STATUS_WAITING 1130 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1131 assert job.start_timestamp and op.start_timestamp 1132 assert waitjob is None 1133 1134 # Check if waiting for a job is necessary 1135 waitjob = self._CheckDependencies(queue, job, opctx) 1136 1137 assert op.status in (constants.OP_STATUS_WAITING, 1138 constants.OP_STATUS_CANCELING, 1139 constants.OP_STATUS_ERROR) 1140 1141 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1142 constants.OP_STATUS_ERROR)): 1143 logging.info("%s: opcode %s waiting for locks", 1144 opctx.log_prefix, opctx.summary) 1145 1146 assert not opctx.jobdeps, "Not all dependencies were removed" 1147 1148 queue.release() 1149 try: 1150 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1151 finally: 1152 queue.acquire(shared=1) 1153 1154 op.status = op_status 1155 op.result = op_result 1156 1157 assert not waitjob 1158 1159 if op.status in (constants.OP_STATUS_WAITING, 1160 constants.OP_STATUS_QUEUED): 1161 # waiting: Couldn't get locks in time 1162 # queued: Queue is shutting down 1163 assert not op.end_timestamp 1164 else: 1165 # Finalize opcode 1166 op.end_timestamp = TimeStampNow() 1167 1168 if op.status == constants.OP_STATUS_CANCELING: 1169 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1170 for i in job.ops[opctx.index:]) 1171 else: 1172 assert op.status in constants.OPS_FINALIZED 1173 1174 if op.status == constants.OP_STATUS_QUEUED: 1175 # Queue is shutting down 1176 assert not waitjob 1177 1178 finalize = False 1179 1180 # Reset context 1181 job.cur_opctx = None 1182 1183 # In no case must the status be finalized here 1184 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1185 1186 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1187 finalize = False 1188 1189 if not waitjob and opctx.CheckPriorityIncrease(): 1190 # Priority was changed, need to update on-disk file 1191 queue.UpdateJobUnlocked(job) 1192 1193 # Keep around for another round 1194 job.cur_opctx = opctx 1195 1196 assert (op.priority <= constants.OP_PRIO_LOWEST and 1197 op.priority >= constants.OP_PRIO_HIGHEST) 1198 1199 # In no case must the status be finalized here 1200 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1201 1202 else: 1203 # Ensure all opcodes so far have been successful 1204 assert (opctx.index == 0 or 1205 compat.all(i.status == constants.OP_STATUS_SUCCESS 1206 for i in job.ops[:opctx.index])) 1207 1208 # Reset context 1209 job.cur_opctx = None 1210 1211 if op.status == constants.OP_STATUS_SUCCESS: 1212 finalize = False 1213 1214 elif op.status == constants.OP_STATUS_ERROR: 1215 # Ensure failed opcode has an exception as its result 1216 assert errors.GetEncodedError(job.ops[opctx.index].result) 1217 1218 to_encode = errors.OpExecError("Preceding opcode failed") 1219 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1220 _EncodeOpError(to_encode)) 1221 finalize = True 1222 1223 # Consistency check 1224 assert compat.all(i.status == constants.OP_STATUS_ERROR and 1225 errors.GetEncodedError(i.result) 1226 for i in job.ops[opctx.index:]) 1227 1228 elif op.status == constants.OP_STATUS_CANCELING: 1229 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1230 "Job canceled by request") 1231 finalize = True 1232 1233 else: 1234 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1235 1236 if opctx.index == (opcount - 1): 1237 # Finalize on last opcode 1238 finalize = True 1239 1240 if finalize: 1241 # All opcodes have been run, finalize job 1242 job.Finalize() 1243 1244 # Write to disk. If the job status is final, this is the final write 1245 # allowed. Once the file has been written, it can be archived anytime. 1246 queue.UpdateJobUnlocked(job) 1247 1248 assert not waitjob 1249 1250 if finalize: 1251 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1252 return self.FINISHED 1253 1254 assert not waitjob or queue.depmgr.JobWaiting(job) 1255 1256 if waitjob: 1257 return self.WAITDEP 1258 else: 1259 return self.DEFER 1260 finally: 1261 assert job.writable, "Job became read-only while being processed" 1262 queue.release()
1263
1264 1265 -def _EvaluateJobProcessorResult(depmgr, job, result):
1266 """Looks at a result from L{_JobProcessor} for a job. 1267 1268 To be used in a L{_JobQueueWorker}. 1269 1270 """ 1271 if result == _JobProcessor.FINISHED: 1272 # Notify waiting jobs 1273 depmgr.NotifyWaiters(job.id) 1274 1275 elif result == _JobProcessor.DEFER: 1276 # Schedule again 1277 raise workerpool.DeferTask(priority=job.CalcPriority()) 1278 1279 elif result == _JobProcessor.WAITDEP: 1280 # No-op, dependency manager will re-schedule 1281 pass 1282 1283 else: 1284 raise errors.ProgrammerError("Job processor returned unknown status %s" % 1285 (result, ))
1286
1287 1288 -class _JobQueueWorker(workerpool.BaseWorker):
1289 """The actual job workers. 1290 1291 """
1292 - def RunTask(self, job): # pylint: disable=W0221
1293 """Job executor. 1294 1295 @type job: L{_QueuedJob} 1296 @param job: the job to be processed 1297 1298 """ 1299 assert job.writable, "Expected writable job" 1300 1301 # Ensure only one worker is active on a single job. If a job registers for 1302 # a dependency job, and the other job notifies before the first worker is 1303 # done, the job can end up in the tasklist more than once. 1304 job.processor_lock.acquire() 1305 try: 1306 return self._RunTaskInner(job) 1307 finally: 1308 job.processor_lock.release()
1309
1310 - def _RunTaskInner(self, job):
1311 """Executes a job. 1312 1313 Must be called with per-job lock acquired. 1314 1315 """ 1316 queue = job.queue 1317 assert queue == self.pool.queue 1318 1319 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) 1320 setname_fn(None) 1321 1322 proc = mcpu.Processor(queue.context, job.id) 1323 1324 # Create wrapper for setting thread name 1325 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, 1326 proc.ExecOpCode) 1327 1328 _EvaluateJobProcessorResult(queue.depmgr, job, 1329 _JobProcessor(queue, wrap_execop_fn, job)())
1330 1331 @staticmethod
1332 - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1333 """Updates the worker thread name to include a short summary of the opcode. 1334 1335 @param setname_fn: Callable setting worker thread name 1336 @param execop_fn: Callable for executing opcode (usually 1337 L{mcpu.Processor.ExecOpCode}) 1338 1339 """ 1340 setname_fn(op) 1341 try: 1342 return execop_fn(op, *args, **kwargs) 1343 finally: 1344 setname_fn(None)
1345 1346 @staticmethod
1347 - def _GetWorkerName(job, op):
1348 """Sets the worker thread name. 1349 1350 @type job: L{_QueuedJob} 1351 @type op: L{opcodes.OpCode} 1352 1353 """ 1354 parts = ["Job%s" % job.id] 1355 1356 if op: 1357 parts.append(op.TinySummary()) 1358 1359 return "/".join(parts)
1360
1361 1362 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1363 """Simple class implementing a job-processing workerpool. 1364 1365 """
1366 - def __init__(self, queue):
1367 super(_JobQueueWorkerPool, self).__init__("Jq", 1368 JOBQUEUE_THREADS, 1369 _JobQueueWorker) 1370 self.queue = queue
1371
1372 1373 -class _JobDependencyManager:
1374 """Keeps track of job dependencies. 1375 1376 """ 1377 (WAIT, 1378 ERROR, 1379 CANCEL, 1380 CONTINUE, 1381 WRONGSTATUS) = range(1, 6) 1382
1383 - def __init__(self, getstatus_fn, enqueue_fn):
1384 """Initializes this class. 1385 1386 """ 1387 self._getstatus_fn = getstatus_fn 1388 self._enqueue_fn = enqueue_fn 1389 1390 self._waiters = {} 1391 self._lock = locking.SharedLock("JobDepMgr")
1392 1393 @locking.ssynchronized(_LOCK, shared=1)
1394 - def GetLockInfo(self, requested): # pylint: disable=W0613
1395 """Retrieves information about waiting jobs. 1396 1397 @type requested: set 1398 @param requested: Requested information, see C{query.LQ_*} 1399 1400 """ 1401 # No need to sort here, that's being done by the lock manager and query 1402 # library. There are no priorities for notifying jobs, hence all show up as 1403 # one item under "pending". 1404 return [("job/%s" % job_id, None, None, 1405 [("job", [job.id for job in waiters])]) 1406 for job_id, waiters in self._waiters.items() 1407 if waiters]
1408 1409 @locking.ssynchronized(_LOCK, shared=1)
1410 - def JobWaiting(self, job):
1411 """Checks if a job is waiting. 1412 1413 """ 1414 return compat.any(job in jobs 1415 for jobs in self._waiters.values())
1416 1417 @locking.ssynchronized(_LOCK)
1418 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1419 """Checks if a dependency job has the requested status. 1420 1421 If the other job is not yet in a finalized status, the calling job will be 1422 notified (re-added to the workerpool) at a later point. 1423 1424 @type job: L{_QueuedJob} 1425 @param job: Job object 1426 @type dep_job_id: string 1427 @param dep_job_id: ID of dependency job 1428 @type dep_status: list 1429 @param dep_status: Required status 1430 1431 """ 1432 assert ht.TString(job.id) 1433 assert ht.TString(dep_job_id) 1434 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1435 1436 if job.id == dep_job_id: 1437 return (self.ERROR, "Job can't depend on itself") 1438 1439 # Get status of dependency job 1440 try: 1441 status = self._getstatus_fn(dep_job_id) 1442 except errors.JobLost, err: 1443 return (self.ERROR, "Dependency error: %s" % err) 1444 1445 assert status in constants.JOB_STATUS_ALL 1446 1447 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1448 1449 if status not in constants.JOBS_FINALIZED: 1450 # Register for notification and wait for job to finish 1451 job_id_waiters.add(job) 1452 return (self.WAIT, 1453 "Need to wait for job %s, wanted status '%s'" % 1454 (dep_job_id, dep_status)) 1455 1456 # Remove from waiters list 1457 if job in job_id_waiters: 1458 job_id_waiters.remove(job) 1459 1460 if (status == constants.JOB_STATUS_CANCELED and 1461 constants.JOB_STATUS_CANCELED not in dep_status): 1462 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1463 1464 elif not dep_status or status in dep_status: 1465 return (self.CONTINUE, 1466 "Dependency job %s finished with status '%s'" % 1467 (dep_job_id, status)) 1468 1469 else: 1470 return (self.WRONGSTATUS, 1471 "Dependency job %s finished with status '%s'," 1472 " not one of '%s' as required" % 1473 (dep_job_id, status, utils.CommaJoin(dep_status)))
1474
1475 - def _RemoveEmptyWaitersUnlocked(self):
1476 """Remove all jobs without actual waiters. 1477 1478 """ 1479 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1480 if not waiters]: 1481 del self._waiters[job_id]
1482
1483 - def NotifyWaiters(self, job_id):
1484 """Notifies all jobs waiting for a certain job ID. 1485 1486 @attention: Do not call until L{CheckAndRegister} returned a status other 1487 than C{WAITDEP} for C{job_id}, or behaviour is undefined 1488 @type job_id: string 1489 @param job_id: Job ID 1490 1491 """ 1492 assert ht.TString(job_id) 1493 1494 self._lock.acquire() 1495 try: 1496 self._RemoveEmptyWaitersUnlocked() 1497 1498 jobs = self._waiters.pop(job_id, None) 1499 finally: 1500 self._lock.release() 1501 1502 if jobs: 1503 # Re-add jobs to workerpool 1504 logging.debug("Re-adding %s jobs which were waiting for job %s", 1505 len(jobs), job_id) 1506 self._enqueue_fn(jobs)
1507
1508 1509 -def _RequireOpenQueue(fn):
1510 """Decorator for "public" functions. 1511 1512 This function should be used for all 'public' functions. That is, 1513 functions usually called from other classes. Note that this should 1514 be applied only to methods (not plain functions), since it expects 1515 that the decorated function is called with a first argument that has 1516 a '_queue_filelock' argument. 1517 1518 @warning: Use this decorator only after locking.ssynchronized 1519 1520 Example:: 1521 @locking.ssynchronized(_LOCK) 1522 @_RequireOpenQueue 1523 def Example(self): 1524 pass 1525 1526 """ 1527 def wrapper(self, *args, **kwargs): 1528 # pylint: disable=W0212 1529 assert self._queue_filelock is not None, "Queue should be open" 1530 return fn(self, *args, **kwargs)
1531 return wrapper 1532
1533 1534 -def _RequireNonDrainedQueue(fn):
1535 """Decorator checking for a non-drained queue. 1536 1537 To be used with functions submitting new jobs. 1538 1539 """ 1540 def wrapper(self, *args, **kwargs): 1541 """Wrapper function. 1542 1543 @raise errors.JobQueueDrainError: if the job queue is marked for draining 1544 1545 """ 1546 # Ok when sharing the big job queue lock, as the drain file is created when 1547 # the lock is exclusive. 1548 # Needs access to protected member, pylint: disable=W0212 1549 if self._drained: 1550 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 1551 1552 if not self._accepting_jobs: 1553 raise errors.JobQueueError("Job queue is shutting down, refusing job") 1554 1555 return fn(self, *args, **kwargs)
1556 return wrapper 1557
1558 1559 -class JobQueue(object):
1560 """Queue used to manage the jobs. 1561 1562 """
1563 - def __init__(self, context):
1564 """Constructor for JobQueue. 1565 1566 The constructor will initialize the job queue object and then 1567 start loading the current jobs from disk, either for starting them 1568 (if they were queue) or for aborting them (if they were already 1569 running). 1570 1571 @type context: GanetiContext 1572 @param context: the context object for access to the configuration 1573 data and other ganeti objects 1574 1575 """ 1576 self.context = context 1577 self._memcache = weakref.WeakValueDictionary() 1578 self._my_hostname = netutils.Hostname.GetSysName() 1579 1580 # The Big JobQueue lock. If a code block or method acquires it in shared 1581 # mode safe it must guarantee concurrency with all the code acquiring it in 1582 # shared mode, including itself. In order not to acquire it at all 1583 # concurrency must be guaranteed with all code acquiring it in shared mode 1584 # and all code acquiring it exclusively. 1585 self._lock = locking.SharedLock("JobQueue") 1586 1587 self.acquire = self._lock.acquire 1588 self.release = self._lock.release 1589 1590 # Accept jobs by default 1591 self._accepting_jobs = True 1592 1593 # Initialize the queue, and acquire the filelock. 1594 # This ensures no other process is working on the job queue. 1595 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) 1596 1597 # Read serial file 1598 self._last_serial = jstore.ReadSerial() 1599 assert self._last_serial is not None, ("Serial file was modified between" 1600 " check in jstore and here") 1601 1602 # Get initial list of nodes 1603 self._nodes = dict((n.name, n.primary_ip) 1604 for n in self.context.cfg.GetAllNodesInfo().values() 1605 if n.master_candidate) 1606 1607 # Remove master node 1608 self._nodes.pop(self._my_hostname, None) 1609 1610 # TODO: Check consistency across nodes 1611 1612 self._queue_size = None 1613 self._UpdateQueueSizeUnlocked() 1614 assert ht.TInt(self._queue_size) 1615 self._drained = jstore.CheckDrainFlag() 1616 1617 # Job dependencies 1618 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, 1619 self._EnqueueJobs) 1620 self.context.glm.AddToLockMonitor(self.depmgr) 1621 1622 # Setup worker pool 1623 self._wpool = _JobQueueWorkerPool(self) 1624 try: 1625 self._InspectQueue() 1626 except: 1627 self._wpool.TerminateWorkers() 1628 raise
1629 1630 @locking.ssynchronized(_LOCK) 1631 @_RequireOpenQueue
1632 - def _InspectQueue(self):
1633 """Loads the whole job queue and resumes unfinished jobs. 1634 1635 This function needs the lock here because WorkerPool.AddTask() may start a 1636 job while we're still doing our work. 1637 1638 """ 1639 logging.info("Inspecting job queue") 1640 1641 restartjobs = [] 1642 1643 all_job_ids = self._GetJobIDsUnlocked() 1644 jobs_count = len(all_job_ids) 1645 lastinfo = time.time() 1646 for idx, job_id in enumerate(all_job_ids): 1647 # Give an update every 1000 jobs or 10 seconds 1648 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or 1649 idx == (jobs_count - 1)): 1650 logging.info("Job queue inspection: %d/%d (%0.1f %%)", 1651 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) 1652 lastinfo = time.time() 1653 1654 job = self._LoadJobUnlocked(job_id) 1655 1656 # a failure in loading the job can cause 'None' to be returned 1657 if job is None: 1658 continue 1659 1660 status = job.CalcStatus() 1661 1662 if status == constants.JOB_STATUS_QUEUED: 1663 restartjobs.append(job) 1664 1665 elif status in (constants.JOB_STATUS_RUNNING, 1666 constants.JOB_STATUS_WAITING, 1667 constants.JOB_STATUS_CANCELING): 1668 logging.warning("Unfinished job %s found: %s", job.id, job) 1669 1670 if status == constants.JOB_STATUS_WAITING: 1671 # Restart job 1672 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1673 restartjobs.append(job) 1674 else: 1675 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1676 "Unclean master daemon shutdown") 1677 job.Finalize() 1678 1679 self.UpdateJobUnlocked(job) 1680 1681 if restartjobs: 1682 logging.info("Restarting %s jobs", len(restartjobs)) 1683 self._EnqueueJobsUnlocked(restartjobs) 1684 1685 logging.info("Job queue inspection finished")
1686
1687 - def _GetRpc(self, address_list):
1688 """Gets RPC runner with context. 1689 1690 """ 1691 return rpc.JobQueueRunner(self.context, address_list)
1692 1693 @locking.ssynchronized(_LOCK) 1694 @_RequireOpenQueue
1695 - def AddNode(self, node):
1696 """Register a new node with the queue. 1697 1698 @type node: L{objects.Node} 1699 @param node: the node object to be added 1700 1701 """ 1702 node_name = node.name 1703 assert node_name != self._my_hostname 1704 1705 # Clean queue directory on added node 1706 result = self._GetRpc(None).call_jobqueue_purge(node_name) 1707 msg = result.fail_msg 1708 if msg: 1709 logging.warning("Cannot cleanup queue directory on node %s: %s", 1710 node_name, msg) 1711 1712 if not node.master_candidate: 1713 # remove if existing, ignoring errors 1714 self._nodes.pop(node_name, None) 1715 # and skip the replication of the job ids 1716 return 1717 1718 # Upload the whole queue excluding archived jobs 1719 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1720 1721 # Upload current serial file 1722 files.append(constants.JOB_QUEUE_SERIAL_FILE) 1723 1724 # Static address list 1725 addrs = [node.primary_ip] 1726 1727 for file_name in files: 1728 # Read file content 1729 content = utils.ReadFile(file_name) 1730 1731 result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name, 1732 content) 1733 msg = result[node_name].fail_msg 1734 if msg: 1735 logging.error("Failed to upload file %s to node %s: %s", 1736 file_name, node_name, msg) 1737 1738 self._nodes[node_name] = node.primary_ip
1739 1740 @locking.ssynchronized(_LOCK) 1741 @_RequireOpenQueue
1742 - def RemoveNode(self, node_name):
1743 """Callback called when removing nodes from the cluster. 1744 1745 @type node_name: str 1746 @param node_name: the name of the node to remove 1747 1748 """ 1749 self._nodes.pop(node_name, None)
1750 1751 @staticmethod
1752 - def _CheckRpcResult(result, nodes, failmsg):
1753 """Verifies the status of an RPC call. 1754 1755 Since we aim to keep consistency should this node (the current 1756 master) fail, we will log errors if our rpc fail, and especially 1757 log the case when more than half of the nodes fails. 1758 1759 @param result: the data as returned from the rpc call 1760 @type nodes: list 1761 @param nodes: the list of nodes we made the call to 1762 @type failmsg: str 1763 @param failmsg: the identifier to be used for logging 1764 1765 """ 1766 failed = [] 1767 success = [] 1768 1769 for node in nodes: 1770 msg = result[node].fail_msg 1771 if msg: 1772 failed.append(node) 1773 logging.error("RPC call %s (%s) failed on node %s: %s", 1774 result[node].call, failmsg, node, msg) 1775 else: 1776 success.append(node) 1777 1778 # +1 for the master node 1779 if (len(success) + 1) < len(failed): 1780 # TODO: Handle failing nodes 1781 logging.error("More than half of the nodes failed")
1782
1783 - def _GetNodeIp(self):
1784 """Helper for returning the node name/ip list. 1785 1786 @rtype: (list, list) 1787 @return: a tuple of two lists, the first one with the node 1788 names and the second one with the node addresses 1789 1790 """ 1791 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1792 name_list = self._nodes.keys() 1793 addr_list = [self._nodes[name] for name in name_list] 1794 return name_list, addr_list
1795
1796 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1797 """Writes a file locally and then replicates it to all nodes. 1798 1799 This function will replace the contents of a file on the local 1800 node and then replicate it to all the other nodes we have. 1801 1802 @type file_name: str 1803 @param file_name: the path of the file to be replicated 1804 @type data: str 1805 @param data: the new contents of the file 1806 @type replicate: boolean 1807 @param replicate: whether to spread the changes to the remote nodes 1808 1809 """ 1810 getents = runtime.GetEnts() 1811 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1812 gid=getents.masterd_gid) 1813 1814 if replicate: 1815 names, addrs = self._GetNodeIp() 1816 result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data) 1817 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1818
1819 - def _RenameFilesUnlocked(self, rename):
1820 """Renames a file locally and then replicate the change. 1821 1822 This function will rename a file in the local queue directory 1823 and then replicate this rename to all the other nodes we have. 1824 1825 @type rename: list of (old, new) 1826 @param rename: List containing tuples mapping old to new names 1827 1828 """ 1829 # Rename them locally 1830 for old, new in rename: 1831 utils.RenameFile(old, new, mkdir=True) 1832 1833 # ... and on all nodes 1834 names, addrs = self._GetNodeIp() 1835 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1836 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1837 1838 @staticmethod
1839 - def _FormatJobID(job_id):
1840 """Convert a job ID to string format. 1841 1842 Currently this just does C{str(job_id)} after performing some 1843 checks, but if we want to change the job id format this will 1844 abstract this change. 1845 1846 @type job_id: int or long 1847 @param job_id: the numeric job id 1848 @rtype: str 1849 @return: the formatted job id 1850 1851 """ 1852 if not isinstance(job_id, (int, long)): 1853 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) 1854 if job_id < 0: 1855 raise errors.ProgrammerError("Job ID %s is negative" % job_id) 1856 1857 return str(job_id)
1858 1859 @classmethod
1860 - def _GetArchiveDirectory(cls, job_id):
1861 """Returns the archive directory for a job. 1862 1863 @type job_id: str 1864 @param job_id: Job identifier 1865 @rtype: str 1866 @return: Directory name 1867 1868 """ 1869 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1870
1871 - def _NewSerialsUnlocked(self, count):
1872 """Generates a new job identifier. 1873 1874 Job identifiers are unique during the lifetime of a cluster. 1875 1876 @type count: integer 1877 @param count: how many serials to return 1878 @rtype: str 1879 @return: a string representing the job identifier. 1880 1881 """ 1882 assert ht.TPositiveInt(count) 1883 1884 # New number 1885 serial = self._last_serial + count 1886 1887 # Write to file 1888 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE, 1889 "%s\n" % serial, True) 1890 1891 result = [self._FormatJobID(v) 1892 for v in range(self._last_serial + 1, serial + 1)] 1893 1894 # Keep it only if we were able to write the file 1895 self._last_serial = serial 1896 1897 assert len(result) == count 1898 1899 return result
1900 1901 @staticmethod
1902 - def _GetJobPath(job_id):
1903 """Returns the job file for a given job id. 1904 1905 @type job_id: str 1906 @param job_id: the job identifier 1907 @rtype: str 1908 @return: the path to the job file 1909 1910 """ 1911 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1912 1913 @classmethod
1914 - def _GetArchivedJobPath(cls, job_id):
1915 """Returns the archived job file for a give job id. 1916 1917 @type job_id: str 1918 @param job_id: the job identifier 1919 @rtype: str 1920 @return: the path to the archived job file 1921 1922 """ 1923 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, 1924 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1925 1926 @staticmethod
1927 - def _GetJobIDsUnlocked(sort=True):
1928 """Return all known job IDs. 1929 1930 The method only looks at disk because it's a requirement that all 1931 jobs are present on disk (so in the _memcache we don't have any 1932 extra IDs). 1933 1934 @type sort: boolean 1935 @param sort: perform sorting on the returned job ids 1936 @rtype: list 1937 @return: the list of job IDs 1938 1939 """ 1940 jlist = [] 1941 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR): 1942 m = constants.JOB_FILE_RE.match(filename) 1943 if m: 1944 jlist.append(m.group(1)) 1945 if sort: 1946 jlist = utils.NiceSort(jlist) 1947 return jlist
1948
1949 - def _LoadJobUnlocked(self, job_id):
1950 """Loads a job from the disk or memory. 1951 1952 Given a job id, this will return the cached job object if 1953 existing, or try to load the job from the disk. If loading from 1954 disk, it will also add the job to the cache. 1955 1956 @param job_id: the job id 1957 @rtype: L{_QueuedJob} or None 1958 @return: either None or the job object 1959 1960 """ 1961 job = self._memcache.get(job_id, None) 1962 if job: 1963 logging.debug("Found job %s in memcache", job_id) 1964 assert job.writable, "Found read-only job in memcache" 1965 return job 1966 1967 try: 1968 job = self._LoadJobFromDisk(job_id, False) 1969 if job is None: 1970 return job 1971 except errors.JobFileCorrupted: 1972 old_path = self._GetJobPath(job_id) 1973 new_path = self._GetArchivedJobPath(job_id) 1974 if old_path == new_path: 1975 # job already archived (future case) 1976 logging.exception("Can't parse job %s", job_id) 1977 else: 1978 # non-archived case 1979 logging.exception("Can't parse job %s, will archive.", job_id) 1980 self._RenameFilesUnlocked([(old_path, new_path)]) 1981 return None 1982 1983 assert job.writable, "Job just loaded is not writable" 1984 1985 self._memcache[job_id] = job 1986 logging.debug("Added job %s to the cache", job_id) 1987 return job
1988
1989 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1990 """Load the given job file from disk. 1991 1992 Given a job file, read, load and restore it in a _QueuedJob format. 1993 1994 @type job_id: string 1995 @param job_id: job identifier 1996 @type try_archived: bool 1997 @param try_archived: Whether to try loading an archived job 1998 @rtype: L{_QueuedJob} or None 1999 @return: either None or the job object 2000 2001 """ 2002 path_functions = [(self._GetJobPath, True)] 2003 2004 if try_archived: 2005 path_functions.append((self._GetArchivedJobPath, False)) 2006 2007 raw_data = None 2008 writable_default = None 2009 2010 for (fn, writable_default) in path_functions: 2011 filepath = fn(job_id) 2012 logging.debug("Loading job from %s", filepath) 2013 try: 2014 raw_data = utils.ReadFile(filepath) 2015 except EnvironmentError, err: 2016 if err.errno != errno.ENOENT: 2017 raise 2018 else: 2019 break 2020 2021 if not raw_data: 2022 return None 2023 2024 if writable is None: 2025 writable = writable_default 2026 2027 try: 2028 data = serializer.LoadJson(raw_data) 2029 job = _QueuedJob.Restore(self, data, writable) 2030 except Exception, err: # pylint: disable=W0703 2031 raise errors.JobFileCorrupted(err) 2032 2033 return job
2034
2035 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2036 """Load the given job file from disk. 2037 2038 Given a job file, read, load and restore it in a _QueuedJob format. 2039 In case of error reading the job, it gets returned as None, and the 2040 exception is logged. 2041 2042 @type job_id: string 2043 @param job_id: job identifier 2044 @type try_archived: bool 2045 @param try_archived: Whether to try loading an archived job 2046 @rtype: L{_QueuedJob} or None 2047 @return: either None or the job object 2048 2049 """ 2050 try: 2051 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 2052 except (errors.JobFileCorrupted, EnvironmentError): 2053 logging.exception("Can't load/parse job %s", job_id) 2054 return None
2055
2056 - def _UpdateQueueSizeUnlocked(self):
2057 """Update the queue size. 2058 2059 """ 2060 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2061 2062 @locking.ssynchronized(_LOCK) 2063 @_RequireOpenQueue
2064 - def SetDrainFlag(self, drain_flag):
2065 """Sets the drain flag for the queue. 2066 2067 @type drain_flag: boolean 2068 @param drain_flag: Whether to set or unset the drain flag 2069 2070 """ 2071 jstore.SetDrainFlag(drain_flag) 2072 2073 self._drained = drain_flag 2074 2075 return True
2076 2077 @_RequireOpenQueue
2078 - def _SubmitJobUnlocked(self, job_id, ops):
2079 """Create and store a new job. 2080 2081 This enters the job into our job queue and also puts it on the new 2082 queue, in order for it to be picked up by the queue processors. 2083 2084 @type job_id: job ID 2085 @param job_id: the job ID for the new job 2086 @type ops: list 2087 @param ops: The list of OpCodes that will become the new job. 2088 @rtype: L{_QueuedJob} 2089 @return: the job object to be queued 2090 @raise errors.JobQueueFull: if the job queue has too many jobs in it 2091 @raise errors.GenericError: If an opcode is not valid 2092 2093 """ 2094 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: 2095 raise errors.JobQueueFull() 2096 2097 job = _QueuedJob(self, job_id, ops, True) 2098 2099 # Check priority 2100 for idx, op in enumerate(job.ops): 2101 if op.priority not in constants.OP_PRIO_SUBMIT_VALID: 2102 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2103 raise errors.GenericError("Opcode %s has invalid priority %s, allowed" 2104 " are %s" % (idx, op.priority, allowed)) 2105 2106 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) 2107 if not opcodes.TNoRelativeJobDependencies(dependencies): 2108 raise errors.GenericError("Opcode %s has invalid dependencies, must" 2109 " match %s: %s" % 2110 (idx, opcodes.TNoRelativeJobDependencies, 2111 dependencies)) 2112 2113 # Write to disk 2114 self.UpdateJobUnlocked(job) 2115 2116 self._queue_size += 1 2117 2118 logging.debug("Adding new job %s to the cache", job_id) 2119 self._memcache[job_id] = job 2120 2121 return job
2122 2123 @locking.ssynchronized(_LOCK) 2124 @_RequireOpenQueue 2125 @_RequireNonDrainedQueue
2126 - def SubmitJob(self, ops):
2127 """Create and store a new job. 2128 2129 @see: L{_SubmitJobUnlocked} 2130 2131 """ 2132 (job_id, ) = self._NewSerialsUnlocked(1) 2133 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) 2134 return job_id
2135 2136 @locking.ssynchronized(_LOCK) 2137 @_RequireOpenQueue 2138 @_RequireNonDrainedQueue
2139 - def SubmitManyJobs(self, jobs):
2140 """Create and store multiple jobs. 2141 2142 @see: L{_SubmitJobUnlocked} 2143 2144 """ 2145 all_job_ids = self._NewSerialsUnlocked(len(jobs)) 2146 2147 (results, added_jobs) = \ 2148 self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) 2149 2150 self._EnqueueJobsUnlocked(added_jobs) 2151 2152 return results
2153 2154 @staticmethod
2155 - def _FormatSubmitError(msg, ops):
2156 """Formats errors which occurred while submitting a job. 2157 2158 """ 2159 return ("%s; opcodes %s" % 2160 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2161 2162 @staticmethod
2163 - def _ResolveJobDependencies(resolve_fn, deps):
2164 """Resolves relative job IDs in dependencies. 2165 2166 @type resolve_fn: callable 2167 @param resolve_fn: Function to resolve a relative job ID 2168 @type deps: list 2169 @param deps: Dependencies 2170 @rtype: list 2171 @return: Resolved dependencies 2172 2173 """ 2174 result = [] 2175 2176 for (dep_job_id, dep_status) in deps: 2177 if ht.TRelativeJobId(dep_job_id): 2178 assert ht.TInt(dep_job_id) and dep_job_id < 0 2179 try: 2180 job_id = resolve_fn(dep_job_id) 2181 except IndexError: 2182 # Abort 2183 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 2184 else: 2185 job_id = dep_job_id 2186 2187 result.append((job_id, dep_status)) 2188 2189 return (True, result)
2190
2191 - def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2192 """Create and store multiple jobs. 2193 2194 @see: L{_SubmitJobUnlocked} 2195 2196 """ 2197 results = [] 2198 added_jobs = [] 2199 2200 def resolve_fn(job_idx, reljobid): 2201 assert reljobid < 0 2202 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2203 2204 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): 2205 for op in ops: 2206 if getattr(op, opcodes.DEPEND_ATTR, None): 2207 (status, data) = \ 2208 self._ResolveJobDependencies(compat.partial(resolve_fn, idx), 2209 op.depends) 2210 if not status: 2211 # Abort resolving dependencies 2212 assert ht.TNonEmptyString(data), "No error message" 2213 break 2214 # Use resolved dependencies 2215 op.depends = data 2216 else: 2217 try: 2218 job = self._SubmitJobUnlocked(job_id, ops) 2219 except errors.GenericError, err: 2220 status = False 2221 data = self._FormatSubmitError(str(err), ops) 2222 else: 2223 status = True 2224 data = job_id 2225 added_jobs.append(job) 2226 2227 results.append((status, data)) 2228 2229 return (results, added_jobs)
2230 2231 @locking.ssynchronized(_LOCK)
2232 - def _EnqueueJobs(self, jobs):
2233 """Helper function to add jobs to worker pool's queue. 2234 2235 @type jobs: list 2236 @param jobs: List of all jobs 2237 2238 """ 2239 return self._EnqueueJobsUnlocked(jobs)
2240
2241 - def _EnqueueJobsUnlocked(self, jobs):
2242 """Helper function to add jobs to worker pool's queue. 2243 2244 @type jobs: list 2245 @param jobs: List of all jobs 2246 2247 """ 2248 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" 2249 self._wpool.AddManyTasks([(job, ) for job in jobs], 2250 priority=[job.CalcPriority() for job in jobs])
2251
2252 - def _GetJobStatusForDependencies(self, job_id):
2253 """Gets the status of a job for dependencies. 2254 2255 @type job_id: string 2256 @param job_id: Job ID 2257 @raise errors.JobLost: If job can't be found 2258 2259 """ 2260 if not isinstance(job_id, basestring): 2261 job_id = self._FormatJobID(job_id) 2262 2263 # Not using in-memory cache as doing so would require an exclusive lock 2264 2265 # Try to load from disk 2266 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2267 2268 assert not job.writable, "Got writable job" # pylint: disable=E1101 2269 2270 if job: 2271 return job.CalcStatus() 2272 2273 raise errors.JobLost("Job %s not found" % job_id)
2274 2275 @_RequireOpenQueue
2276 - def UpdateJobUnlocked(self, job, replicate=True):
2277 """Update a job's on disk storage. 2278 2279 After a job has been modified, this function needs to be called in 2280 order to write the changes to disk and replicate them to the other 2281 nodes. 2282 2283 @type job: L{_QueuedJob} 2284 @param job: the changed job 2285 @type replicate: boolean 2286 @param replicate: whether to replicate the change to remote nodes 2287 2288 """ 2289 if __debug__: 2290 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 2291 assert (finalized ^ (job.end_timestamp is None)) 2292 assert job.writable, "Can't update read-only job" 2293 2294 filename = self._GetJobPath(job.id) 2295 data = serializer.DumpJson(job.Serialize()) 2296 logging.debug("Writing job %s to %s", job.id, filename) 2297 self._UpdateJobQueueFile(filename, data, replicate)
2298
2299 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 2300 timeout):
2301 """Waits for changes in a job. 2302 2303 @type job_id: string 2304 @param job_id: Job identifier 2305 @type fields: list of strings 2306 @param fields: Which fields to check for changes 2307 @type prev_job_info: list or None 2308 @param prev_job_info: Last job information returned 2309 @type prev_log_serial: int 2310 @param prev_log_serial: Last job message serial number 2311 @type timeout: float 2312 @param timeout: maximum time to wait in seconds 2313 @rtype: tuple (job info, log entries) 2314 @return: a tuple of the job information as required via 2315 the fields parameter, and the log entries as a list 2316 2317 if the job has not changed and the timeout has expired, 2318 we instead return a special value, 2319 L{constants.JOB_NOTCHANGED}, which should be interpreted 2320 as such by the clients 2321 2322 """ 2323 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True, 2324 writable=False) 2325 2326 helper = _WaitForJobChangesHelper() 2327 2328 return helper(self._GetJobPath(job_id), load_fn, 2329 fields, prev_job_info, prev_log_serial, timeout)
2330 2331 @locking.ssynchronized(_LOCK) 2332 @_RequireOpenQueue
2333 - def CancelJob(self, job_id):
2334 """Cancels a job. 2335 2336 This will only succeed if the job has not started yet. 2337 2338 @type job_id: string 2339 @param job_id: job ID of job to be cancelled. 2340 2341 """ 2342 logging.info("Cancelling job %s", job_id) 2343 2344 job = self._LoadJobUnlocked(job_id) 2345 if not job: 2346 logging.debug("Job %s not found", job_id) 2347 return (False, "Job %s not found" % job_id) 2348 2349 assert job.writable, "Can't cancel read-only job" 2350 2351 (success, msg) = job.Cancel() 2352 2353 if success: 2354 # If the job was finalized (e.g. cancelled), this is the final write 2355 # allowed. The job can be archived anytime. 2356 self.UpdateJobUnlocked(job) 2357 2358 return (success, msg)
2359 2360 @_RequireOpenQueue
2361 - def _ArchiveJobsUnlocked(self, jobs):
2362 """Archives jobs. 2363 2364 @type jobs: list of L{_QueuedJob} 2365 @param jobs: Job objects 2366 @rtype: int 2367 @return: Number of archived jobs 2368 2369 """ 2370 archive_jobs = [] 2371 rename_files = [] 2372 for job in jobs: 2373 assert job.writable, "Can't archive read-only job" 2374 2375 if job.CalcStatus() not in constants.JOBS_FINALIZED: 2376 logging.debug("Job %s is not yet done", job.id) 2377 continue 2378 2379 archive_jobs.append(job) 2380 2381 old = self._GetJobPath(job.id) 2382 new = self._GetArchivedJobPath(job.id) 2383 rename_files.append((old, new)) 2384 2385 # TODO: What if 1..n files fail to rename? 2386 self._RenameFilesUnlocked(rename_files) 2387 2388 logging.debug("Successfully archived job(s) %s", 2389 utils.CommaJoin(job.id for job in archive_jobs)) 2390 2391 # Since we haven't quite checked, above, if we succeeded or failed renaming 2392 # the files, we update the cached queue size from the filesystem. When we 2393 # get around to fix the TODO: above, we can use the number of actually 2394 # archived jobs to fix this. 2395 self._UpdateQueueSizeUnlocked() 2396 return len(archive_jobs)
2397 2398 @locking.ssynchronized(_LOCK) 2399 @_RequireOpenQueue
2400 - def ArchiveJob(self, job_id):
2401 """Archives a job. 2402 2403 This is just a wrapper over L{_ArchiveJobsUnlocked}. 2404 2405 @type job_id: string 2406 @param job_id: Job ID of job to be archived. 2407 @rtype: bool 2408 @return: Whether job was archived 2409 2410 """ 2411 logging.info("Archiving job %s", job_id) 2412 2413 job = self._LoadJobUnlocked(job_id) 2414 if not job: 2415 logging.debug("Job %s not found", job_id) 2416 return False 2417 2418 return self._ArchiveJobsUnlocked([job]) == 1
2419 2420 @locking.ssynchronized(_LOCK) 2421 @_RequireOpenQueue
2422 - def AutoArchiveJobs(self, age, timeout):
2423 """Archives all jobs based on age. 2424 2425 The method will archive all jobs which are older than the age 2426 parameter. For jobs that don't have an end timestamp, the start 2427 timestamp will be considered. The special '-1' age will cause 2428 archival of all jobs (that are not running or queued). 2429 2430 @type age: int 2431 @param age: the minimum age in seconds 2432 2433 """ 2434 logging.info("Archiving jobs with age more than %s seconds", age) 2435 2436 now = time.time() 2437 end_time = now + timeout 2438 archived_count = 0 2439 last_touched = 0 2440 2441 all_job_ids = self._GetJobIDsUnlocked() 2442 pending = [] 2443 for idx, job_id in enumerate(all_job_ids): 2444 last_touched = idx + 1 2445 2446 # Not optimal because jobs could be pending 2447 # TODO: Measure average duration for job archival and take number of 2448 # pending jobs into account. 2449 if time.time() > end_time: 2450 break 2451 2452 # Returns None if the job failed to load 2453 job = self._LoadJobUnlocked(job_id) 2454 if job: 2455 if job.end_timestamp is None: 2456 if job.start_timestamp is None: 2457 job_age = job.received_timestamp 2458 else: 2459 job_age = job.start_timestamp 2460 else: 2461 job_age = job.end_timestamp 2462 2463 if age == -1 or now - job_age[0] > age: 2464 pending.append(job) 2465 2466 # Archive 10 jobs at a time 2467 if len(pending) >= 10: 2468 archived_count += self._ArchiveJobsUnlocked(pending) 2469 pending = [] 2470 2471 if pending: 2472 archived_count += self._ArchiveJobsUnlocked(pending) 2473 2474 return (archived_count, len(all_job_ids) - last_touched)
2475
2476 - def _Query(self, fields, qfilter):
2477 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, 2478 namefield="id") 2479 2480 job_ids = qobj.RequestedNames() 2481 2482 list_all = (job_ids is None) 2483 2484 if list_all: 2485 # Since files are added to/removed from the queue atomically, there's no 2486 # risk of getting the job ids in an inconsistent state. 2487 job_ids = self._GetJobIDsUnlocked() 2488 2489 jobs = [] 2490 2491 for job_id in job_ids: 2492 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2493 if job is not None or not list_all: 2494 jobs.append((job_id, job)) 2495 2496 return (qobj, jobs, list_all)
2497
2498 - def QueryJobs(self, fields, qfilter):
2499 """Returns a list of jobs in queue. 2500 2501 @type fields: sequence 2502 @param fields: List of wanted fields 2503 @type qfilter: None or query2 filter (list) 2504 @param qfilter: Query filter 2505 2506 """ 2507 (qobj, ctx, sort_by_name) = self._Query(fields, qfilter) 2508 2509 return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2510
2511 - def OldStyleQueryJobs(self, job_ids, fields):
2512 """Returns a list of jobs in queue. 2513 2514 @type job_ids: list 2515 @param job_ids: sequence of job identifiers or None for all 2516 @type fields: list 2517 @param fields: names of fields to return 2518 @rtype: list 2519 @return: list one element per job, each element being list with 2520 the requested fields 2521 2522 """ 2523 qfilter = qlang.MakeSimpleFilter("id", job_ids) 2524 2525 (qobj, ctx, sort_by_name) = self._Query(fields, qfilter) 2526 2527 return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2528 2529 @locking.ssynchronized(_LOCK)
2530 - def PrepareShutdown(self):
2531 """Prepare to stop the job queue. 2532 2533 Disables execution of jobs in the workerpool and returns whether there are 2534 any jobs currently running. If the latter is the case, the job queue is not 2535 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can 2536 be called without interfering with any job. Queued and unfinished jobs will 2537 be resumed next time. 2538 2539 Once this function has been called no new job submissions will be accepted 2540 (see L{_RequireNonDrainedQueue}). 2541 2542 @rtype: bool 2543 @return: Whether there are any running jobs 2544 2545 """ 2546 if self._accepting_jobs: 2547 self._accepting_jobs = False 2548 2549 # Tell worker pool to stop processing pending tasks 2550 self._wpool.SetActive(False) 2551 2552 return self._wpool.HasRunningTasks()
2553
2554 - def AcceptingJobsUnlocked(self):
2555 """Returns whether jobs are accepted. 2556 2557 Once L{PrepareShutdown} has been called, no new jobs are accepted and the 2558 queue is shutting down. 2559 2560 @rtype: bool 2561 2562 """ 2563 return self._accepting_jobs
2564 2565 @locking.ssynchronized(_LOCK) 2566 @_RequireOpenQueue
2567 - def Shutdown(self):
2568 """Stops the job queue. 2569 2570 This shutdowns all the worker threads an closes the queue. 2571 2572 """ 2573 self._wpool.TerminateWorkers() 2574 2575 self._queue_filelock.Close() 2576 self._queue_filelock = None
2577