Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Module implementing the job queue handling. 
  32   
  33  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  34  used by all other classes in this module. 
  35   
  36  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  37      processing jobs 
  38   
  39  """ 
  40   
  41  import logging 
  42  import errno 
  43  import time 
  44  import weakref 
  45  import threading 
  46  import itertools 
  47  import operator 
  48   
  49  try: 
  50    # pylint: disable=E0611 
  51    from pyinotify import pyinotify 
  52  except ImportError: 
  53    import pyinotify 
  54   
  55  from ganeti import asyncnotifier 
  56  from ganeti import constants 
  57  from ganeti import serializer 
  58  from ganeti import workerpool 
  59  from ganeti import locking 
  60  from ganeti import opcodes 
  61  from ganeti import opcodes_base 
  62  from ganeti import errors 
  63  from ganeti import mcpu 
  64  from ganeti import utils 
  65  from ganeti import jstore 
  66  from ganeti import rpc 
  67  from ganeti import runtime 
  68  from ganeti import netutils 
  69  from ganeti import compat 
  70  from ganeti import ht 
  71  from ganeti import query 
  72  from ganeti import qlang 
  73  from ganeti import pathutils 
  74  from ganeti import vcluster 
  75   
  76   
  77  JOBQUEUE_THREADS = 25 
  78   
  79  # member lock names to be passed to @ssynchronized decorator 
  80  _LOCK = "_lock" 
  81  _QUEUE = "_queue" 
  82   
  83  #: Retrieves "id" attribute 
  84  _GetIdAttr = operator.attrgetter("id") 
85 86 87 -class CancelJob(Exception):
88 """Special exception to cancel a job. 89 90 """
91
92 93 -class QueueShutdown(Exception):
94 """Special exception to abort a job when the job queue is shutting down. 95 96 """
97
98 99 -def TimeStampNow():
100 """Returns the current timestamp. 101 102 @rtype: tuple 103 @return: the current time in the (seconds, microseconds) format 104 105 """ 106 return utils.SplitTime(time.time())
107
108 109 -def _CallJqUpdate(runner, names, file_name, content):
110 """Updates job queue file after virtualizing filename. 111 112 """ 113 virt_file_name = vcluster.MakeVirtualPath(file_name) 114 return runner.call_jobqueue_update(names, virt_file_name, content)
115
116 117 -class _SimpleJobQuery:
118 """Wrapper for job queries. 119 120 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}. 121 122 """
123 - def __init__(self, fields):
124 """Initializes this class. 125 126 """ 127 self._query = query.Query(query.JOB_FIELDS, fields)
128
129 - def __call__(self, job):
130 """Executes a job query using cached field list. 131 132 """ 133 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
134
135 136 -class _QueuedOpCode(object):
137 """Encapsulates an opcode object. 138 139 @ivar log: holds the execution log and consists of tuples 140 of the form C{(log_serial, timestamp, level, message)} 141 @ivar input: the OpCode we encapsulate 142 @ivar status: the current status 143 @ivar result: the result of the LU execution 144 @ivar start_timestamp: timestamp for the start of the execution 145 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 146 @ivar stop_timestamp: timestamp for the end of the execution 147 148 """ 149 __slots__ = ["input", "status", "result", "log", "priority", 150 "start_timestamp", "exec_timestamp", "end_timestamp", 151 "__weakref__"] 152
153 - def __init__(self, op):
154 """Initializes instances of this class. 155 156 @type op: L{opcodes.OpCode} 157 @param op: the opcode we encapsulate 158 159 """ 160 self.input = op 161 self.status = constants.OP_STATUS_QUEUED 162 self.result = None 163 self.log = [] 164 self.start_timestamp = None 165 self.exec_timestamp = None 166 self.end_timestamp = None 167 168 # Get initial priority (it might change during the lifetime of this opcode) 169 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
170 171 @classmethod
172 - def Restore(cls, state):
173 """Restore the _QueuedOpCode from the serialized form. 174 175 @type state: dict 176 @param state: the serialized state 177 @rtype: _QueuedOpCode 178 @return: a new _QueuedOpCode instance 179 180 """ 181 obj = _QueuedOpCode.__new__(cls) 182 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 183 obj.status = state["status"] 184 obj.result = state["result"] 185 obj.log = state["log"] 186 obj.start_timestamp = state.get("start_timestamp", None) 187 obj.exec_timestamp = state.get("exec_timestamp", None) 188 obj.end_timestamp = state.get("end_timestamp", None) 189 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 190 return obj
191
192 - def Serialize(self):
193 """Serializes this _QueuedOpCode. 194 195 @rtype: dict 196 @return: the dictionary holding the serialized state 197 198 """ 199 return { 200 "input": self.input.__getstate__(), 201 "status": self.status, 202 "result": self.result, 203 "log": self.log, 204 "start_timestamp": self.start_timestamp, 205 "exec_timestamp": self.exec_timestamp, 206 "end_timestamp": self.end_timestamp, 207 "priority": self.priority, 208 }
209
210 211 -class _QueuedJob(object):
212 """In-memory job representation. 213 214 This is what we use to track the user-submitted jobs. Locking must 215 be taken care of by users of this class. 216 217 @type queue: L{JobQueue} 218 @ivar queue: the parent queue 219 @ivar id: the job ID 220 @type ops: list 221 @ivar ops: the list of _QueuedOpCode that constitute the job 222 @type log_serial: int 223 @ivar log_serial: holds the index for the next log entry 224 @ivar received_timestamp: the timestamp for when the job was received 225 @ivar start_timestmap: the timestamp for start of execution 226 @ivar end_timestamp: the timestamp for end of execution 227 @ivar writable: Whether the job is allowed to be modified 228 229 """ 230 # pylint: disable=W0212 231 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 232 "received_timestamp", "start_timestamp", "end_timestamp", 233 "__weakref__", "processor_lock", "writable", "archived"] 234
235 - def _AddReasons(self):
236 """Extend the reason trail 237 238 Add the reason for all the opcodes of this job to be executed. 239 240 """ 241 count = 0 242 for queued_op in self.ops: 243 op = queued_op.input 244 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__) 245 reason_text = "job=%d;index=%d" % (self.id, count) 246 reason = getattr(op, "reason", []) 247 reason.append((reason_src, reason_text, utils.EpochNano())) 248 op.reason = reason 249 count = count + 1
250
251 - def __init__(self, queue, job_id, ops, writable):
252 """Constructor for the _QueuedJob. 253 254 @type queue: L{JobQueue} 255 @param queue: our parent queue 256 @type job_id: job_id 257 @param job_id: our job id 258 @type ops: list 259 @param ops: the list of opcodes we hold, which will be encapsulated 260 in _QueuedOpCodes 261 @type writable: bool 262 @param writable: Whether job can be modified 263 264 """ 265 if not ops: 266 raise errors.GenericError("A job needs at least one opcode") 267 268 self.queue = queue 269 self.id = int(job_id) 270 self.ops = [_QueuedOpCode(op) for op in ops] 271 self._AddReasons() 272 self.log_serial = 0 273 self.received_timestamp = TimeStampNow() 274 self.start_timestamp = None 275 self.end_timestamp = None 276 self.archived = False 277 278 self._InitInMemory(self, writable) 279 280 assert not self.archived, "New jobs can not be marked as archived"
281 282 @staticmethod
283 - def _InitInMemory(obj, writable):
284 """Initializes in-memory variables. 285 286 """ 287 obj.writable = writable 288 obj.ops_iter = None 289 obj.cur_opctx = None 290 291 # Read-only jobs are not processed and therefore don't need a lock 292 if writable: 293 obj.processor_lock = threading.Lock() 294 else: 295 obj.processor_lock = None
296
297 - def __repr__(self):
298 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 299 "id=%s" % self.id, 300 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 301 302 return "<%s at %#x>" % (" ".join(status), id(self))
303 304 @classmethod
305 - def Restore(cls, queue, state, writable, archived):
306 """Restore a _QueuedJob from serialized state: 307 308 @type queue: L{JobQueue} 309 @param queue: to which queue the restored job belongs 310 @type state: dict 311 @param state: the serialized state 312 @type writable: bool 313 @param writable: Whether job can be modified 314 @type archived: bool 315 @param archived: Whether job was already archived 316 @rtype: _JobQueue 317 @return: the restored _JobQueue instance 318 319 """ 320 obj = _QueuedJob.__new__(cls) 321 obj.queue = queue 322 obj.id = int(state["id"]) 323 obj.received_timestamp = state.get("received_timestamp", None) 324 obj.start_timestamp = state.get("start_timestamp", None) 325 obj.end_timestamp = state.get("end_timestamp", None) 326 obj.archived = archived 327 328 obj.ops = [] 329 obj.log_serial = 0 330 for op_state in state["ops"]: 331 op = _QueuedOpCode.Restore(op_state) 332 for log_entry in op.log: 333 obj.log_serial = max(obj.log_serial, log_entry[0]) 334 obj.ops.append(op) 335 336 cls._InitInMemory(obj, writable) 337 338 return obj
339
340 - def Serialize(self):
341 """Serialize the _JobQueue instance. 342 343 @rtype: dict 344 @return: the serialized state 345 346 """ 347 return { 348 "id": self.id, 349 "ops": [op.Serialize() for op in self.ops], 350 "start_timestamp": self.start_timestamp, 351 "end_timestamp": self.end_timestamp, 352 "received_timestamp": self.received_timestamp, 353 }
354
355 - def CalcStatus(self):
356 """Compute the status of this job. 357 358 This function iterates over all the _QueuedOpCodes in the job and 359 based on their status, computes the job status. 360 361 The algorithm is: 362 - if we find a cancelled, or finished with error, the job 363 status will be the same 364 - otherwise, the last opcode with the status one of: 365 - waitlock 366 - canceling 367 - running 368 369 will determine the job status 370 371 - otherwise, it means either all opcodes are queued, or success, 372 and the job status will be the same 373 374 @return: the job status 375 376 """ 377 status = constants.JOB_STATUS_QUEUED 378 379 all_success = True 380 for op in self.ops: 381 if op.status == constants.OP_STATUS_SUCCESS: 382 continue 383 384 all_success = False 385 386 if op.status == constants.OP_STATUS_QUEUED: 387 pass 388 elif op.status == constants.OP_STATUS_WAITING: 389 status = constants.JOB_STATUS_WAITING 390 elif op.status == constants.OP_STATUS_RUNNING: 391 status = constants.JOB_STATUS_RUNNING 392 elif op.status == constants.OP_STATUS_CANCELING: 393 status = constants.JOB_STATUS_CANCELING 394 break 395 elif op.status == constants.OP_STATUS_ERROR: 396 status = constants.JOB_STATUS_ERROR 397 # The whole job fails if one opcode failed 398 break 399 elif op.status == constants.OP_STATUS_CANCELED: 400 status = constants.OP_STATUS_CANCELED 401 break 402 403 if all_success: 404 status = constants.JOB_STATUS_SUCCESS 405 406 return status
407
408 - def CalcPriority(self):
409 """Gets the current priority for this job. 410 411 Only unfinished opcodes are considered. When all are done, the default 412 priority is used. 413 414 @rtype: int 415 416 """ 417 priorities = [op.priority for op in self.ops 418 if op.status not in constants.OPS_FINALIZED] 419 420 if not priorities: 421 # All opcodes are done, assume default priority 422 return constants.OP_PRIO_DEFAULT 423 424 return min(priorities)
425
426 - def GetLogEntries(self, newer_than):
427 """Selectively returns the log entries. 428 429 @type newer_than: None or int 430 @param newer_than: if this is None, return all log entries, 431 otherwise return only the log entries with serial higher 432 than this value 433 @rtype: list 434 @return: the list of the log entries selected 435 436 """ 437 if newer_than is None: 438 serial = -1 439 else: 440 serial = newer_than 441 442 entries = [] 443 for op in self.ops: 444 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 445 446 return entries
447
448 - def GetInfo(self, fields):
449 """Returns information about a job. 450 451 @type fields: list 452 @param fields: names of fields to return 453 @rtype: list 454 @return: list with one element for each field 455 @raise errors.OpExecError: when an invalid field 456 has been passed 457 458 """ 459 return _SimpleJobQuery(fields)(self)
460
461 - def MarkUnfinishedOps(self, status, result):
462 """Mark unfinished opcodes with a given status and result. 463 464 This is an utility function for marking all running or waiting to 465 be run opcodes with a given status. Opcodes which are already 466 finalised are not changed. 467 468 @param status: a given opcode status 469 @param result: the opcode result 470 471 """ 472 not_marked = True 473 for op in self.ops: 474 if op.status in constants.OPS_FINALIZED: 475 assert not_marked, "Finalized opcodes found after non-finalized ones" 476 continue 477 op.status = status 478 op.result = result 479 not_marked = False
480
481 - def Finalize(self):
482 """Marks the job as finalized. 483 484 """ 485 self.end_timestamp = TimeStampNow()
486
487 - def Cancel(self):
488 """Marks job as canceled/-ing if possible. 489 490 @rtype: tuple; (bool, string) 491 @return: Boolean describing whether job was successfully canceled or marked 492 as canceling and a text message 493 494 """ 495 status = self.CalcStatus() 496 497 if status == constants.JOB_STATUS_QUEUED: 498 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 499 "Job canceled by request") 500 self.Finalize() 501 return (True, "Job %s canceled" % self.id) 502 503 elif status == constants.JOB_STATUS_WAITING: 504 # The worker will notice the new status and cancel the job 505 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 506 return (True, "Job %s will be canceled" % self.id) 507 508 else: 509 logging.debug("Job %s is no longer waiting in the queue", self.id) 510 return (False, "Job %s is no longer waiting in the queue" % self.id)
511
512 - def ChangePriority(self, priority):
513 """Changes the job priority. 514 515 @type priority: int 516 @param priority: New priority 517 @rtype: tuple; (bool, string) 518 @return: Boolean describing whether job's priority was successfully changed 519 and a text message 520 521 """ 522 status = self.CalcStatus() 523 524 if status in constants.JOBS_FINALIZED: 525 return (False, "Job %s is finished" % self.id) 526 elif status == constants.JOB_STATUS_CANCELING: 527 return (False, "Job %s is cancelling" % self.id) 528 else: 529 assert status in (constants.JOB_STATUS_QUEUED, 530 constants.JOB_STATUS_WAITING, 531 constants.JOB_STATUS_RUNNING) 532 533 changed = False 534 for op in self.ops: 535 if (op.status == constants.OP_STATUS_RUNNING or 536 op.status in constants.OPS_FINALIZED): 537 assert not changed, \ 538 ("Found opcode for which priority should not be changed after" 539 " priority has been changed for previous opcodes") 540 continue 541 542 assert op.status in (constants.OP_STATUS_QUEUED, 543 constants.OP_STATUS_WAITING) 544 545 changed = True 546 547 # Set new priority (doesn't modify opcode input) 548 op.priority = priority 549 550 if changed: 551 return (True, ("Priorities of pending opcodes for job %s have been" 552 " changed to %s" % (self.id, priority))) 553 else: 554 return (False, "Job %s had no pending opcodes" % self.id)
555
556 557 -class _OpExecCallbacks(mcpu.OpExecCbBase):
558
559 - def __init__(self, queue, job, op):
560 """Initializes this class. 561 562 @type queue: L{JobQueue} 563 @param queue: Job queue 564 @type job: L{_QueuedJob} 565 @param job: Job object 566 @type op: L{_QueuedOpCode} 567 @param op: OpCode 568 569 """ 570 super(_OpExecCallbacks, self).__init__() 571 572 assert queue, "Queue is missing" 573 assert job, "Job is missing" 574 assert op, "Opcode is missing" 575 576 self._queue = queue 577 self._job = job 578 self._op = op
579
580 - def _CheckCancel(self):
581 """Raises an exception to cancel the job if asked to. 582 583 """ 584 # Cancel here if we were asked to 585 if self._op.status == constants.OP_STATUS_CANCELING: 586 logging.debug("Canceling opcode") 587 raise CancelJob() 588 589 # See if queue is shutting down 590 if not self._queue.AcceptingJobsUnlocked(): 591 logging.debug("Queue is shutting down") 592 raise QueueShutdown()
593 594 @locking.ssynchronized(_QUEUE, shared=1)
595 - def NotifyStart(self):
596 """Mark the opcode as running, not lock-waiting. 597 598 This is called from the mcpu code as a notifier function, when the LU is 599 finally about to start the Exec() method. Of course, to have end-user 600 visible results, the opcode must be initially (before calling into 601 Processor.ExecOpCode) set to OP_STATUS_WAITING. 602 603 """ 604 assert self._op in self._job.ops 605 assert self._op.status in (constants.OP_STATUS_WAITING, 606 constants.OP_STATUS_CANCELING) 607 608 # Cancel here if we were asked to 609 self._CheckCancel() 610 611 logging.debug("Opcode is now running") 612 613 self._op.status = constants.OP_STATUS_RUNNING 614 self._op.exec_timestamp = TimeStampNow() 615 616 # And finally replicate the job status 617 self._queue.UpdateJobUnlocked(self._job)
618 619 @locking.ssynchronized(_QUEUE, shared=1)
620 - def _AppendFeedback(self, timestamp, log_type, log_msg):
621 """Internal feedback append function, with locks 622 623 """ 624 self._job.log_serial += 1 625 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 626 self._queue.UpdateJobUnlocked(self._job, replicate=False)
627
628 - def Feedback(self, *args):
629 """Append a log entry. 630 631 """ 632 assert len(args) < 3 633 634 if len(args) == 1: 635 log_type = constants.ELOG_MESSAGE 636 log_msg = args[0] 637 else: 638 (log_type, log_msg) = args 639 640 # The time is split to make serialization easier and not lose 641 # precision. 642 timestamp = utils.SplitTime(time.time()) 643 self._AppendFeedback(timestamp, log_type, log_msg)
644
645 - def CurrentPriority(self):
646 """Returns current priority for opcode. 647 648 """ 649 assert self._op.status in (constants.OP_STATUS_WAITING, 650 constants.OP_STATUS_CANCELING) 651 652 # Cancel here if we were asked to 653 self._CheckCancel() 654 655 return self._op.priority
656
657 - def SubmitManyJobs(self, jobs):
658 """Submits jobs for processing. 659 660 See L{JobQueue.SubmitManyJobs}. 661 662 """ 663 # Locking is done in job queue 664 return self._queue.SubmitManyJobs(jobs)
665
666 667 -class _JobChangesChecker(object):
668 - def __init__(self, fields, prev_job_info, prev_log_serial):
669 """Initializes this class. 670 671 @type fields: list of strings 672 @param fields: Fields requested by LUXI client 673 @type prev_job_info: string 674 @param prev_job_info: previous job info, as passed by the LUXI client 675 @type prev_log_serial: string 676 @param prev_log_serial: previous job serial, as passed by the LUXI client 677 678 """ 679 self._squery = _SimpleJobQuery(fields) 680 self._prev_job_info = prev_job_info 681 self._prev_log_serial = prev_log_serial
682
683 - def __call__(self, job):
684 """Checks whether job has changed. 685 686 @type job: L{_QueuedJob} 687 @param job: Job object 688 689 """ 690 assert not job.writable, "Expected read-only job" 691 692 status = job.CalcStatus() 693 job_info = self._squery(job) 694 log_entries = job.GetLogEntries(self._prev_log_serial) 695 696 # Serializing and deserializing data can cause type changes (e.g. from 697 # tuple to list) or precision loss. We're doing it here so that we get 698 # the same modifications as the data received from the client. Without 699 # this, the comparison afterwards might fail without the data being 700 # significantly different. 701 # TODO: we just deserialized from disk, investigate how to make sure that 702 # the job info and log entries are compatible to avoid this further step. 703 # TODO: Doing something like in testutils.py:UnifyValueType might be more 704 # efficient, though floats will be tricky 705 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 706 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 707 708 # Don't even try to wait if the job is no longer running, there will be 709 # no changes. 710 if (status not in (constants.JOB_STATUS_QUEUED, 711 constants.JOB_STATUS_RUNNING, 712 constants.JOB_STATUS_WAITING) or 713 job_info != self._prev_job_info or 714 (log_entries and self._prev_log_serial != log_entries[0][0])): 715 logging.debug("Job %s changed", job.id) 716 return (job_info, log_entries) 717 718 return None
719
720 721 -class _JobFileChangesWaiter(object):
722 - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
723 """Initializes this class. 724 725 @type filename: string 726 @param filename: Path to job file 727 @raises errors.InotifyError: if the notifier cannot be setup 728 729 """ 730 self._wm = _inotify_wm_cls() 731 self._inotify_handler = \ 732 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) 733 self._notifier = \ 734 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) 735 try: 736 self._inotify_handler.enable() 737 except Exception: 738 # pyinotify doesn't close file descriptors automatically 739 self._notifier.stop() 740 raise
741
742 - def _OnInotify(self, notifier_enabled):
743 """Callback for inotify. 744 745 """ 746 if not notifier_enabled: 747 self._inotify_handler.enable()
748
749 - def Wait(self, timeout):
750 """Waits for the job file to change. 751 752 @type timeout: float 753 @param timeout: Timeout in seconds 754 @return: Whether there have been events 755 756 """ 757 assert timeout >= 0 758 have_events = self._notifier.check_events(timeout * 1000) 759 if have_events: 760 self._notifier.read_events() 761 self._notifier.process_events() 762 return have_events
763
764 - def Close(self):
765 """Closes underlying notifier and its file descriptor. 766 767 """ 768 self._notifier.stop()
769
770 771 -class _JobChangesWaiter(object):
772 - def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
773 """Initializes this class. 774 775 @type filename: string 776 @param filename: Path to job file 777 778 """ 779 self._filewaiter = None 780 self._filename = filename 781 self._waiter_cls = _waiter_cls
782
783 - def Wait(self, timeout):
784 """Waits for a job to change. 785 786 @type timeout: float 787 @param timeout: Timeout in seconds 788 @return: Whether there have been events 789 790 """ 791 if self._filewaiter: 792 return self._filewaiter.Wait(timeout) 793 794 # Lazy setup: Avoid inotify setup cost when job file has already changed. 795 # If this point is reached, return immediately and let caller check the job 796 # file again in case there were changes since the last check. This avoids a 797 # race condition. 798 self._filewaiter = self._waiter_cls(self._filename) 799 800 return True
801
802 - def Close(self):
803 """Closes underlying waiter. 804 805 """ 806 if self._filewaiter: 807 self._filewaiter.Close()
808
809 810 -class _WaitForJobChangesHelper(object):
811 """Helper class using inotify to wait for changes in a job file. 812 813 This class takes a previous job status and serial, and alerts the client when 814 the current job status has changed. 815 816 """ 817 @staticmethod
818 - def _CheckForChanges(counter, job_load_fn, check_fn):
819 if counter.next() > 0: 820 # If this isn't the first check the job is given some more time to change 821 # again. This gives better performance for jobs generating many 822 # changes/messages. 823 time.sleep(0.1) 824 825 job = job_load_fn() 826 if not job: 827 raise errors.JobLost() 828 829 result = check_fn(job) 830 if result is None: 831 raise utils.RetryAgain() 832 833 return result
834
835 - def __call__(self, filename, job_load_fn, 836 fields, prev_job_info, prev_log_serial, timeout, 837 _waiter_cls=_JobChangesWaiter):
838 """Waits for changes on a job. 839 840 @type filename: string 841 @param filename: File on which to wait for changes 842 @type job_load_fn: callable 843 @param job_load_fn: Function to load job 844 @type fields: list of strings 845 @param fields: Which fields to check for changes 846 @type prev_job_info: list or None 847 @param prev_job_info: Last job information returned 848 @type prev_log_serial: int 849 @param prev_log_serial: Last job message serial number 850 @type timeout: float 851 @param timeout: maximum time to wait in seconds 852 853 """ 854 counter = itertools.count() 855 try: 856 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) 857 waiter = _waiter_cls(filename) 858 try: 859 return utils.Retry(compat.partial(self._CheckForChanges, 860 counter, job_load_fn, check_fn), 861 utils.RETRY_REMAINING_TIME, timeout, 862 wait_fn=waiter.Wait) 863 finally: 864 waiter.Close() 865 except errors.JobLost: 866 return None 867 except utils.RetryTimeout: 868 return constants.JOB_NOTCHANGED
869
870 871 -def _EncodeOpError(err):
872 """Encodes an error which occurred while processing an opcode. 873 874 """ 875 if isinstance(err, errors.GenericError): 876 to_encode = err 877 else: 878 to_encode = errors.OpExecError(str(err)) 879 880 return errors.EncodeException(to_encode)
881
882 883 -class _TimeoutStrategyWrapper:
884 - def __init__(self, fn):
885 """Initializes this class. 886 887 """ 888 self._fn = fn 889 self._next = None
890
891 - def _Advance(self):
892 """Gets the next timeout if necessary. 893 894 """ 895 if self._next is None: 896 self._next = self._fn()
897
898 - def Peek(self):
899 """Returns the next timeout. 900 901 """ 902 self._Advance() 903 return self._next
904
905 - def Next(self):
906 """Returns the current timeout and advances the internal state. 907 908 """ 909 self._Advance() 910 result = self._next 911 self._next = None 912 return result
913
914 915 -class _OpExecContext:
916 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
917 """Initializes this class. 918 919 """ 920 self.op = op 921 self.index = index 922 self.log_prefix = log_prefix 923 self.summary = op.input.Summary() 924 925 # Create local copy to modify 926 if getattr(op.input, opcodes_base.DEPEND_ATTR, None): 927 self.jobdeps = op.input.depends[:] 928 else: 929 self.jobdeps = None 930 931 self._timeout_strategy_factory = timeout_strategy_factory 932 self._ResetTimeoutStrategy()
933
934 - def _ResetTimeoutStrategy(self):
935 """Creates a new timeout strategy. 936 937 """ 938 self._timeout_strategy = \ 939 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
940
941 - def CheckPriorityIncrease(self):
942 """Checks whether priority can and should be increased. 943 944 Called when locks couldn't be acquired. 945 946 """ 947 op = self.op 948 949 # Exhausted all retries and next round should not use blocking acquire 950 # for locks? 951 if (self._timeout_strategy.Peek() is None and 952 op.priority > constants.OP_PRIO_HIGHEST): 953 logging.debug("Increasing priority") 954 op.priority -= 1 955 self._ResetTimeoutStrategy() 956 return True 957 958 return False
959
960 - def GetNextLockTimeout(self):
961 """Returns the next lock acquire timeout. 962 963 """ 964 return self._timeout_strategy.Next()
965
966 967 -class _JobProcessor(object):
968 (DEFER, 969 WAITDEP, 970 FINISHED) = range(1, 4) 971
972 - def __init__(self, queue, opexec_fn, job, 973 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
974 """Initializes this class. 975 976 """ 977 self.queue = queue 978 self.opexec_fn = opexec_fn 979 self.job = job 980 self._timeout_strategy_factory = _timeout_strategy_factory
981 982 @staticmethod
983 - def _FindNextOpcode(job, timeout_strategy_factory):
984 """Locates the next opcode to run. 985 986 @type job: L{_QueuedJob} 987 @param job: Job object 988 @param timeout_strategy_factory: Callable to create new timeout strategy 989 990 """ 991 # Create some sort of a cache to speed up locating next opcode for future 992 # lookups 993 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 994 # pending and one for processed ops. 995 if job.ops_iter is None: 996 job.ops_iter = enumerate(job.ops) 997 998 # Find next opcode to run 999 while True: 1000 try: 1001 (idx, op) = job.ops_iter.next() 1002 except StopIteration: 1003 raise errors.ProgrammerError("Called for a finished job") 1004 1005 if op.status == constants.OP_STATUS_RUNNING: 1006 # Found an opcode already marked as running 1007 raise errors.ProgrammerError("Called for job marked as running") 1008 1009 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 1010 timeout_strategy_factory) 1011 1012 if op.status not in constants.OPS_FINALIZED: 1013 return opctx 1014 1015 # This is a job that was partially completed before master daemon 1016 # shutdown, so it can be expected that some opcodes are already 1017 # completed successfully (if any did error out, then the whole job 1018 # should have been aborted and not resubmitted for processing). 1019 logging.info("%s: opcode %s already processed, skipping", 1020 opctx.log_prefix, opctx.summary)
1021 1022 @staticmethod
1023 - def _MarkWaitlock(job, op):
1024 """Marks an opcode as waiting for locks. 1025 1026 The job's start timestamp is also set if necessary. 1027 1028 @type job: L{_QueuedJob} 1029 @param job: Job object 1030 @type op: L{_QueuedOpCode} 1031 @param op: Opcode object 1032 1033 """ 1034 assert op in job.ops 1035 assert op.status in (constants.OP_STATUS_QUEUED, 1036 constants.OP_STATUS_WAITING) 1037 1038 update = False 1039 1040 op.result = None 1041 1042 if op.status == constants.OP_STATUS_QUEUED: 1043 op.status = constants.OP_STATUS_WAITING 1044 update = True 1045 1046 if op.start_timestamp is None: 1047 op.start_timestamp = TimeStampNow() 1048 update = True 1049 1050 if job.start_timestamp is None: 1051 job.start_timestamp = op.start_timestamp 1052 update = True 1053 1054 assert op.status == constants.OP_STATUS_WAITING 1055 1056 return update
1057 1058 @staticmethod
1059 - def _CheckDependencies(queue, job, opctx):
1060 """Checks if an opcode has dependencies and if so, processes them. 1061 1062 @type queue: L{JobQueue} 1063 @param queue: Queue object 1064 @type job: L{_QueuedJob} 1065 @param job: Job object 1066 @type opctx: L{_OpExecContext} 1067 @param opctx: Opcode execution context 1068 @rtype: bool 1069 @return: Whether opcode will be re-scheduled by dependency tracker 1070 1071 """ 1072 op = opctx.op 1073 1074 result = False 1075 1076 while opctx.jobdeps: 1077 (dep_job_id, dep_status) = opctx.jobdeps[0] 1078 1079 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 1080 dep_status) 1081 assert ht.TNonEmptyString(depmsg), "No dependency message" 1082 1083 logging.info("%s: %s", opctx.log_prefix, depmsg) 1084 1085 if depresult == _JobDependencyManager.CONTINUE: 1086 # Remove dependency and continue 1087 opctx.jobdeps.pop(0) 1088 1089 elif depresult == _JobDependencyManager.WAIT: 1090 # Need to wait for notification, dependency tracker will re-add job 1091 # to workerpool 1092 result = True 1093 break 1094 1095 elif depresult == _JobDependencyManager.CANCEL: 1096 # Job was cancelled, cancel this job as well 1097 job.Cancel() 1098 assert op.status == constants.OP_STATUS_CANCELING 1099 break 1100 1101 elif depresult in (_JobDependencyManager.WRONGSTATUS, 1102 _JobDependencyManager.ERROR): 1103 # Job failed or there was an error, this job must fail 1104 op.status = constants.OP_STATUS_ERROR 1105 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 1106 break 1107 1108 else: 1109 raise errors.ProgrammerError("Unknown dependency result '%s'" % 1110 depresult) 1111 1112 return result
1113
1114 - def _ExecOpCodeUnlocked(self, opctx):
1115 """Processes one opcode and returns the result. 1116 1117 """ 1118 op = opctx.op 1119 1120 assert op.status in (constants.OP_STATUS_WAITING, 1121 constants.OP_STATUS_CANCELING) 1122 1123 # The very last check if the job was cancelled before trying to execute 1124 if op.status == constants.OP_STATUS_CANCELING: 1125 return (constants.OP_STATUS_CANCELING, None) 1126 1127 timeout = opctx.GetNextLockTimeout() 1128 1129 try: 1130 # Make sure not to hold queue lock while calling ExecOpCode 1131 result = self.opexec_fn(op.input, 1132 _OpExecCallbacks(self.queue, self.job, op), 1133 timeout=timeout) 1134 except mcpu.LockAcquireTimeout: 1135 assert timeout is not None, "Received timeout for blocking acquire" 1136 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 1137 1138 assert op.status in (constants.OP_STATUS_WAITING, 1139 constants.OP_STATUS_CANCELING) 1140 1141 # Was job cancelled while we were waiting for the lock? 1142 if op.status == constants.OP_STATUS_CANCELING: 1143 return (constants.OP_STATUS_CANCELING, None) 1144 1145 # Queue is shutting down, return to queued 1146 if not self.queue.AcceptingJobsUnlocked(): 1147 return (constants.OP_STATUS_QUEUED, None) 1148 1149 # Stay in waitlock while trying to re-acquire lock 1150 return (constants.OP_STATUS_WAITING, None) 1151 except CancelJob: 1152 logging.exception("%s: Canceling job", opctx.log_prefix) 1153 assert op.status == constants.OP_STATUS_CANCELING 1154 return (constants.OP_STATUS_CANCELING, None) 1155 1156 except QueueShutdown: 1157 logging.exception("%s: Queue is shutting down", opctx.log_prefix) 1158 1159 assert op.status == constants.OP_STATUS_WAITING 1160 1161 # Job hadn't been started yet, so it should return to the queue 1162 return (constants.OP_STATUS_QUEUED, None) 1163 1164 except Exception, err: # pylint: disable=W0703 1165 logging.exception("%s: Caught exception in %s", 1166 opctx.log_prefix, opctx.summary) 1167 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 1168 else: 1169 logging.debug("%s: %s successful", 1170 opctx.log_prefix, opctx.summary) 1171 return (constants.OP_STATUS_SUCCESS, result)
1172
1173 - def __call__(self, _nextop_fn=None):
1174 """Continues execution of a job. 1175 1176 @param _nextop_fn: Callback function for tests 1177 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 1178 be deferred and C{WAITDEP} if the dependency manager 1179 (L{_JobDependencyManager}) will re-schedule the job when appropriate 1180 1181 """ 1182 queue = self.queue 1183 job = self.job 1184 1185 logging.debug("Processing job %s", job.id) 1186 1187 queue.acquire(shared=1) 1188 try: 1189 opcount = len(job.ops) 1190 1191 assert job.writable, "Expected writable job" 1192 1193 # Don't do anything for finalized jobs 1194 if job.CalcStatus() in constants.JOBS_FINALIZED: 1195 return self.FINISHED 1196 1197 # Is a previous opcode still pending? 1198 if job.cur_opctx: 1199 opctx = job.cur_opctx 1200 job.cur_opctx = None 1201 else: 1202 if __debug__ and _nextop_fn: 1203 _nextop_fn() 1204 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 1205 1206 op = opctx.op 1207 1208 # Consistency check 1209 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 1210 constants.OP_STATUS_CANCELING) 1211 for i in job.ops[opctx.index + 1:]) 1212 1213 assert op.status in (constants.OP_STATUS_QUEUED, 1214 constants.OP_STATUS_WAITING, 1215 constants.OP_STATUS_CANCELING) 1216 1217 assert (op.priority <= constants.OP_PRIO_LOWEST and 1218 op.priority >= constants.OP_PRIO_HIGHEST) 1219 1220 waitjob = None 1221 1222 if op.status != constants.OP_STATUS_CANCELING: 1223 assert op.status in (constants.OP_STATUS_QUEUED, 1224 constants.OP_STATUS_WAITING) 1225 1226 # Prepare to start opcode 1227 if self._MarkWaitlock(job, op): 1228 # Write to disk 1229 queue.UpdateJobUnlocked(job) 1230 1231 assert op.status == constants.OP_STATUS_WAITING 1232 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1233 assert job.start_timestamp and op.start_timestamp 1234 assert waitjob is None 1235 1236 # Check if waiting for a job is necessary 1237 waitjob = self._CheckDependencies(queue, job, opctx) 1238 1239 assert op.status in (constants.OP_STATUS_WAITING, 1240 constants.OP_STATUS_CANCELING, 1241 constants.OP_STATUS_ERROR) 1242 1243 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1244 constants.OP_STATUS_ERROR)): 1245 logging.info("%s: opcode %s waiting for locks", 1246 opctx.log_prefix, opctx.summary) 1247 1248 assert not opctx.jobdeps, "Not all dependencies were removed" 1249 1250 queue.release() 1251 try: 1252 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1253 finally: 1254 queue.acquire(shared=1) 1255 1256 op.status = op_status 1257 op.result = op_result 1258 1259 assert not waitjob 1260 1261 if op.status in (constants.OP_STATUS_WAITING, 1262 constants.OP_STATUS_QUEUED): 1263 # waiting: Couldn't get locks in time 1264 # queued: Queue is shutting down 1265 assert not op.end_timestamp 1266 else: 1267 # Finalize opcode 1268 op.end_timestamp = TimeStampNow() 1269 1270 if op.status == constants.OP_STATUS_CANCELING: 1271 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1272 for i in job.ops[opctx.index:]) 1273 else: 1274 assert op.status in constants.OPS_FINALIZED 1275 1276 if op.status == constants.OP_STATUS_QUEUED: 1277 # Queue is shutting down 1278 assert not waitjob 1279 1280 finalize = False 1281 1282 # Reset context 1283 job.cur_opctx = None 1284 1285 # In no case must the status be finalized here 1286 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1287 1288 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1289 finalize = False 1290 1291 if not waitjob and opctx.CheckPriorityIncrease(): 1292 # Priority was changed, need to update on-disk file 1293 queue.UpdateJobUnlocked(job) 1294 1295 # Keep around for another round 1296 job.cur_opctx = opctx 1297 1298 assert (op.priority <= constants.OP_PRIO_LOWEST and 1299 op.priority >= constants.OP_PRIO_HIGHEST) 1300 1301 # In no case must the status be finalized here 1302 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1303 1304 else: 1305 # Ensure all opcodes so far have been successful 1306 assert (opctx.index == 0 or 1307 compat.all(i.status == constants.OP_STATUS_SUCCESS 1308 for i in job.ops[:opctx.index])) 1309 1310 # Reset context 1311 job.cur_opctx = None 1312 1313 if op.status == constants.OP_STATUS_SUCCESS: 1314 finalize = False 1315 1316 elif op.status == constants.OP_STATUS_ERROR: 1317 # Ensure failed opcode has an exception as its result 1318 assert errors.GetEncodedError(job.ops[opctx.index].result) 1319 1320 to_encode = errors.OpExecError("Preceding opcode failed") 1321 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1322 _EncodeOpError(to_encode)) 1323 finalize = True 1324 1325 # Consistency check 1326 assert compat.all(i.status == constants.OP_STATUS_ERROR and 1327 errors.GetEncodedError(i.result) 1328 for i in job.ops[opctx.index:]) 1329 1330 elif op.status == constants.OP_STATUS_CANCELING: 1331 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1332 "Job canceled by request") 1333 finalize = True 1334 1335 else: 1336 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1337 1338 if opctx.index == (opcount - 1): 1339 # Finalize on last opcode 1340 finalize = True 1341 1342 if finalize: 1343 # All opcodes have been run, finalize job 1344 job.Finalize() 1345 1346 # Write to disk. If the job status is final, this is the final write 1347 # allowed. Once the file has been written, it can be archived anytime. 1348 queue.UpdateJobUnlocked(job) 1349 1350 assert not waitjob 1351 1352 if finalize: 1353 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1354 return self.FINISHED 1355 1356 assert not waitjob or queue.depmgr.JobWaiting(job) 1357 1358 if waitjob: 1359 return self.WAITDEP 1360 else: 1361 return self.DEFER 1362 finally: 1363 assert job.writable, "Job became read-only while being processed" 1364 queue.release()
1365
1366 1367 -def _EvaluateJobProcessorResult(depmgr, job, result):
1368 """Looks at a result from L{_JobProcessor} for a job. 1369 1370 To be used in a L{_JobQueueWorker}. 1371 1372 """ 1373 if result == _JobProcessor.FINISHED: 1374 # Notify waiting jobs 1375 depmgr.NotifyWaiters(job.id) 1376 1377 elif result == _JobProcessor.DEFER: 1378 # Schedule again 1379 raise workerpool.DeferTask(priority=job.CalcPriority()) 1380 1381 elif result == _JobProcessor.WAITDEP: 1382 # No-op, dependency manager will re-schedule 1383 pass 1384 1385 else: 1386 raise errors.ProgrammerError("Job processor returned unknown status %s" % 1387 (result, ))
1388
1389 1390 -class _JobQueueWorker(workerpool.BaseWorker):
1391 """The actual job workers. 1392 1393 """
1394 - def RunTask(self, job): # pylint: disable=W0221
1395 """Job executor. 1396 1397 @type job: L{_QueuedJob} 1398 @param job: the job to be processed 1399 1400 """ 1401 assert job.writable, "Expected writable job" 1402 1403 # Ensure only one worker is active on a single job. If a job registers for 1404 # a dependency job, and the other job notifies before the first worker is 1405 # done, the job can end up in the tasklist more than once. 1406 job.processor_lock.acquire() 1407 try: 1408 return self._RunTaskInner(job) 1409 finally: 1410 job.processor_lock.release()
1411
1412 - def _RunTaskInner(self, job):
1413 """Executes a job. 1414 1415 Must be called with per-job lock acquired. 1416 1417 """ 1418 queue = job.queue 1419 assert queue == self.pool.queue 1420 1421 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) 1422 setname_fn(None) 1423 1424 proc = mcpu.Processor(queue.context, job.id) 1425 1426 # Create wrapper for setting thread name 1427 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, 1428 proc.ExecOpCode) 1429 1430 _EvaluateJobProcessorResult(queue.depmgr, job, 1431 _JobProcessor(queue, wrap_execop_fn, job)())
1432 1433 @staticmethod
1434 - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1435 """Updates the worker thread name to include a short summary of the opcode. 1436 1437 @param setname_fn: Callable setting worker thread name 1438 @param execop_fn: Callable for executing opcode (usually 1439 L{mcpu.Processor.ExecOpCode}) 1440 1441 """ 1442 setname_fn(op) 1443 try: 1444 return execop_fn(op, *args, **kwargs) 1445 finally: 1446 setname_fn(None)
1447 1448 @staticmethod
1449 - def _GetWorkerName(job, op):
1450 """Sets the worker thread name. 1451 1452 @type job: L{_QueuedJob} 1453 @type op: L{opcodes.OpCode} 1454 1455 """ 1456 parts = ["Job%s" % job.id] 1457 1458 if op: 1459 parts.append(op.TinySummary()) 1460 1461 return "/".join(parts)
1462
1463 1464 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1465 """Simple class implementing a job-processing workerpool. 1466 1467 """
1468 - def __init__(self, queue):
1469 super(_JobQueueWorkerPool, self).__init__("Jq", 1470 JOBQUEUE_THREADS, 1471 _JobQueueWorker) 1472 self.queue = queue
1473
1474 1475 -class _JobDependencyManager:
1476 """Keeps track of job dependencies. 1477 1478 """ 1479 (WAIT, 1480 ERROR, 1481 CANCEL, 1482 CONTINUE, 1483 WRONGSTATUS) = range(1, 6) 1484
1485 - def __init__(self, getstatus_fn, enqueue_fn):
1486 """Initializes this class. 1487 1488 """ 1489 self._getstatus_fn = getstatus_fn 1490 self._enqueue_fn = enqueue_fn 1491 1492 self._waiters = {} 1493 self._lock = locking.SharedLock("JobDepMgr")
1494 1495 @locking.ssynchronized(_LOCK, shared=1)
1496 - def GetLockInfo(self, requested): # pylint: disable=W0613
1497 """Retrieves information about waiting jobs. 1498 1499 @type requested: set 1500 @param requested: Requested information, see C{query.LQ_*} 1501 1502 """ 1503 # No need to sort here, that's being done by the lock manager and query 1504 # library. There are no priorities for notifying jobs, hence all show up as 1505 # one item under "pending". 1506 return [("job/%s" % job_id, None, None, 1507 [("job", [job.id for job in waiters])]) 1508 for job_id, waiters in self._waiters.items() 1509 if waiters]
1510 1511 @locking.ssynchronized(_LOCK, shared=1)
1512 - def JobWaiting(self, job):
1513 """Checks if a job is waiting. 1514 1515 """ 1516 return compat.any(job in jobs 1517 for jobs in self._waiters.values())
1518 1519 @locking.ssynchronized(_LOCK)
1520 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1521 """Checks if a dependency job has the requested status. 1522 1523 If the other job is not yet in a finalized status, the calling job will be 1524 notified (re-added to the workerpool) at a later point. 1525 1526 @type job: L{_QueuedJob} 1527 @param job: Job object 1528 @type dep_job_id: int 1529 @param dep_job_id: ID of dependency job 1530 @type dep_status: list 1531 @param dep_status: Required status 1532 1533 """ 1534 assert ht.TJobId(job.id) 1535 assert ht.TJobId(dep_job_id) 1536 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1537 1538 if job.id == dep_job_id: 1539 return (self.ERROR, "Job can't depend on itself") 1540 1541 # Get status of dependency job 1542 try: 1543 status = self._getstatus_fn(dep_job_id) 1544 except errors.JobLost, err: 1545 return (self.ERROR, "Dependency error: %s" % err) 1546 1547 assert status in constants.JOB_STATUS_ALL 1548 1549 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1550 1551 if status not in constants.JOBS_FINALIZED: 1552 # Register for notification and wait for job to finish 1553 job_id_waiters.add(job) 1554 return (self.WAIT, 1555 "Need to wait for job %s, wanted status '%s'" % 1556 (dep_job_id, dep_status)) 1557 1558 # Remove from waiters list 1559 if job in job_id_waiters: 1560 job_id_waiters.remove(job) 1561 1562 if (status == constants.JOB_STATUS_CANCELED and 1563 constants.JOB_STATUS_CANCELED not in dep_status): 1564 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1565 1566 elif not dep_status or status in dep_status: 1567 return (self.CONTINUE, 1568 "Dependency job %s finished with status '%s'" % 1569 (dep_job_id, status)) 1570 1571 else: 1572 return (self.WRONGSTATUS, 1573 "Dependency job %s finished with status '%s'," 1574 " not one of '%s' as required" % 1575 (dep_job_id, status, utils.CommaJoin(dep_status)))
1576
1577 - def _RemoveEmptyWaitersUnlocked(self):
1578 """Remove all jobs without actual waiters. 1579 1580 """ 1581 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1582 if not waiters]: 1583 del self._waiters[job_id]
1584
1585 - def NotifyWaiters(self, job_id):
1586 """Notifies all jobs waiting for a certain job ID. 1587 1588 @attention: Do not call until L{CheckAndRegister} returned a status other 1589 than C{WAITDEP} for C{job_id}, or behaviour is undefined 1590 @type job_id: int 1591 @param job_id: Job ID 1592 1593 """ 1594 assert ht.TJobId(job_id) 1595 1596 self._lock.acquire() 1597 try: 1598 self._RemoveEmptyWaitersUnlocked() 1599 1600 jobs = self._waiters.pop(job_id, None) 1601 finally: 1602 self._lock.release() 1603 1604 if jobs: 1605 # Re-add jobs to workerpool 1606 logging.debug("Re-adding %s jobs which were waiting for job %s", 1607 len(jobs), job_id) 1608 self._enqueue_fn(jobs)
1609
1610 1611 -def _RequireOpenQueue(fn):
1612 """Decorator for "public" functions. 1613 1614 This function should be used for all 'public' functions. That is, 1615 functions usually called from other classes. Note that this should 1616 be applied only to methods (not plain functions), since it expects 1617 that the decorated function is called with a first argument that has 1618 a '_queue_filelock' argument. 1619 1620 @warning: Use this decorator only after locking.ssynchronized 1621 1622 Example:: 1623 @locking.ssynchronized(_LOCK) 1624 @_RequireOpenQueue 1625 def Example(self): 1626 pass 1627 1628 """ 1629 def wrapper(self, *args, **kwargs): 1630 # pylint: disable=W0212 1631 assert self._queue_filelock is not None, "Queue should be open" 1632 return fn(self, *args, **kwargs)
1633 return wrapper 1634
1635 1636 -def _RequireNonDrainedQueue(fn):
1637 """Decorator checking for a non-drained queue. 1638 1639 To be used with functions submitting new jobs. 1640 1641 """ 1642 def wrapper(self, *args, **kwargs): 1643 """Wrapper function. 1644 1645 @raise errors.JobQueueDrainError: if the job queue is marked for draining 1646 1647 """ 1648 # Ok when sharing the big job queue lock, as the drain file is created when 1649 # the lock is exclusive. 1650 # Needs access to protected member, pylint: disable=W0212 1651 if self._drained: 1652 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 1653 1654 if not self._accepting_jobs: 1655 raise errors.JobQueueError("Job queue is shutting down, refusing job") 1656 1657 return fn(self, *args, **kwargs)
1658 return wrapper 1659
1660 1661 -class JobQueue(object):
1662 """Queue used to manage the jobs. 1663 1664 """
1665 - def __init__(self, context):
1666 """Constructor for JobQueue. 1667 1668 The constructor will initialize the job queue object and then 1669 start loading the current jobs from disk, either for starting them 1670 (if they were queue) or for aborting them (if they were already 1671 running). 1672 1673 @type context: GanetiContext 1674 @param context: the context object for access to the configuration 1675 data and other ganeti objects 1676 1677 """ 1678 self.context = context 1679 self._memcache = weakref.WeakValueDictionary() 1680 self._my_hostname = netutils.Hostname.GetSysName() 1681 1682 # The Big JobQueue lock. If a code block or method acquires it in shared 1683 # mode safe it must guarantee concurrency with all the code acquiring it in 1684 # shared mode, including itself. In order not to acquire it at all 1685 # concurrency must be guaranteed with all code acquiring it in shared mode 1686 # and all code acquiring it exclusively. 1687 self._lock = locking.SharedLock("JobQueue") 1688 1689 self.acquire = self._lock.acquire 1690 self.release = self._lock.release 1691 1692 # Accept jobs by default 1693 self._accepting_jobs = True 1694 1695 # Initialize the queue, and acquire the filelock. 1696 # This ensures no other process is working on the job queue. 1697 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) 1698 1699 # Read serial file 1700 self._last_serial = jstore.ReadSerial() 1701 assert self._last_serial is not None, ("Serial file was modified between" 1702 " check in jstore and here") 1703 1704 # Get initial list of nodes 1705 self._nodes = dict((n.name, n.primary_ip) 1706 for n in self.context.cfg.GetAllNodesInfo().values() 1707 if n.master_candidate) 1708 1709 # Remove master node 1710 self._nodes.pop(self._my_hostname, None) 1711 1712 # TODO: Check consistency across nodes 1713 1714 self._queue_size = None 1715 self._UpdateQueueSizeUnlocked() 1716 assert ht.TInt(self._queue_size) 1717 self._drained = jstore.CheckDrainFlag() 1718 1719 # Job dependencies 1720 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, 1721 self._EnqueueJobs) 1722 self.context.glm.AddToLockMonitor(self.depmgr) 1723 1724 # Setup worker pool 1725 self._wpool = _JobQueueWorkerPool(self) 1726 try: 1727 self._InspectQueue() 1728 except: 1729 self._wpool.TerminateWorkers() 1730 raise
1731 1732 @locking.ssynchronized(_LOCK) 1733 @_RequireOpenQueue
1734 - def _InspectQueue(self):
1735 """Loads the whole job queue and resumes unfinished jobs. 1736 1737 This function needs the lock here because WorkerPool.AddTask() may start a 1738 job while we're still doing our work. 1739 1740 """ 1741 logging.info("Inspecting job queue") 1742 1743 restartjobs = [] 1744 1745 all_job_ids = self._GetJobIDsUnlocked() 1746 jobs_count = len(all_job_ids) 1747 lastinfo = time.time() 1748 for idx, job_id in enumerate(all_job_ids): 1749 # Give an update every 1000 jobs or 10 seconds 1750 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or 1751 idx == (jobs_count - 1)): 1752 logging.info("Job queue inspection: %d/%d (%0.1f %%)", 1753 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) 1754 lastinfo = time.time() 1755 1756 job = self._LoadJobUnlocked(job_id) 1757 1758 # a failure in loading the job can cause 'None' to be returned 1759 if job is None: 1760 continue 1761 1762 status = job.CalcStatus() 1763 1764 if status == constants.JOB_STATUS_QUEUED: 1765 restartjobs.append(job) 1766 1767 elif status in (constants.JOB_STATUS_RUNNING, 1768 constants.JOB_STATUS_WAITING, 1769 constants.JOB_STATUS_CANCELING): 1770 logging.warning("Unfinished job %s found: %s", job.id, job) 1771 1772 if status == constants.JOB_STATUS_WAITING: 1773 # Restart job 1774 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1775 restartjobs.append(job) 1776 else: 1777 to_encode = errors.OpExecError("Unclean master daemon shutdown") 1778 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1779 _EncodeOpError(to_encode)) 1780 job.Finalize() 1781 1782 self.UpdateJobUnlocked(job) 1783 1784 if restartjobs: 1785 logging.info("Restarting %s jobs", len(restartjobs)) 1786 self._EnqueueJobsUnlocked(restartjobs) 1787 1788 logging.info("Job queue inspection finished")
1789
1790 - def _GetRpc(self, address_list):
1791 """Gets RPC runner with context. 1792 1793 """ 1794 return rpc.JobQueueRunner(self.context, address_list)
1795 1796 @locking.ssynchronized(_LOCK) 1797 @_RequireOpenQueue
1798 - def AddNode(self, node):
1799 """Register a new node with the queue. 1800 1801 @type node: L{objects.Node} 1802 @param node: the node object to be added 1803 1804 """ 1805 node_name = node.name 1806 assert node_name != self._my_hostname 1807 1808 # Clean queue directory on added node 1809 result = self._GetRpc(None).call_jobqueue_purge(node_name) 1810 msg = result.fail_msg 1811 if msg: 1812 logging.warning("Cannot cleanup queue directory on node %s: %s", 1813 node_name, msg) 1814 1815 if not node.master_candidate: 1816 # remove if existing, ignoring errors 1817 self._nodes.pop(node_name, None) 1818 # and skip the replication of the job ids 1819 return 1820 1821 # Upload the whole queue excluding archived jobs 1822 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1823 1824 # Upload current serial file 1825 files.append(pathutils.JOB_QUEUE_SERIAL_FILE) 1826 1827 # Static address list 1828 addrs = [node.primary_ip] 1829 1830 for file_name in files: 1831 # Read file content 1832 content = utils.ReadFile(file_name) 1833 1834 result = _CallJqUpdate(self._GetRpc(addrs), [node_name], 1835 file_name, content) 1836 msg = result[node_name].fail_msg 1837 if msg: 1838 logging.error("Failed to upload file %s to node %s: %s", 1839 file_name, node_name, msg) 1840 1841 # Set queue drained flag 1842 result = \ 1843 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name], 1844 self._drained) 1845 msg = result[node_name].fail_msg 1846 if msg: 1847 logging.error("Failed to set queue drained flag on node %s: %s", 1848 node_name, msg) 1849 1850 self._nodes[node_name] = node.primary_ip
1851 1852 @locking.ssynchronized(_LOCK) 1853 @_RequireOpenQueue
1854 - def RemoveNode(self, node_name):
1855 """Callback called when removing nodes from the cluster. 1856 1857 @type node_name: str 1858 @param node_name: the name of the node to remove 1859 1860 """ 1861 self._nodes.pop(node_name, None)
1862 1863 @staticmethod
1864 - def _CheckRpcResult(result, nodes, failmsg):
1865 """Verifies the status of an RPC call. 1866 1867 Since we aim to keep consistency should this node (the current 1868 master) fail, we will log errors if our rpc fail, and especially 1869 log the case when more than half of the nodes fails. 1870 1871 @param result: the data as returned from the rpc call 1872 @type nodes: list 1873 @param nodes: the list of nodes we made the call to 1874 @type failmsg: str 1875 @param failmsg: the identifier to be used for logging 1876 1877 """ 1878 failed = [] 1879 success = [] 1880 1881 for node in nodes: 1882 msg = result[node].fail_msg 1883 if msg: 1884 failed.append(node) 1885 logging.error("RPC call %s (%s) failed on node %s: %s", 1886 result[node].call, failmsg, node, msg) 1887 else: 1888 success.append(node) 1889 1890 # +1 for the master node 1891 if (len(success) + 1) < len(failed): 1892 # TODO: Handle failing nodes 1893 logging.error("More than half of the nodes failed")
1894
1895 - def _GetNodeIp(self):
1896 """Helper for returning the node name/ip list. 1897 1898 @rtype: (list, list) 1899 @return: a tuple of two lists, the first one with the node 1900 names and the second one with the node addresses 1901 1902 """ 1903 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1904 name_list = self._nodes.keys() 1905 addr_list = [self._nodes[name] for name in name_list] 1906 return name_list, addr_list
1907
1908 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1909 """Writes a file locally and then replicates it to all nodes. 1910 1911 This function will replace the contents of a file on the local 1912 node and then replicate it to all the other nodes we have. 1913 1914 @type file_name: str 1915 @param file_name: the path of the file to be replicated 1916 @type data: str 1917 @param data: the new contents of the file 1918 @type replicate: boolean 1919 @param replicate: whether to spread the changes to the remote nodes 1920 1921 """ 1922 getents = runtime.GetEnts() 1923 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1924 gid=getents.daemons_gid, 1925 mode=constants.JOB_QUEUE_FILES_PERMS) 1926 1927 if replicate: 1928 names, addrs = self._GetNodeIp() 1929 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) 1930 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1931
1932 - def _RenameFilesUnlocked(self, rename):
1933 """Renames a file locally and then replicate the change. 1934 1935 This function will rename a file in the local queue directory 1936 and then replicate this rename to all the other nodes we have. 1937 1938 @type rename: list of (old, new) 1939 @param rename: List containing tuples mapping old to new names 1940 1941 """ 1942 # Rename them locally 1943 for old, new in rename: 1944 utils.RenameFile(old, new, mkdir=True) 1945 1946 # ... and on all nodes 1947 names, addrs = self._GetNodeIp() 1948 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1949 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1950
1951 - def _NewSerialsUnlocked(self, count):
1952 """Generates a new job identifier. 1953 1954 Job identifiers are unique during the lifetime of a cluster. 1955 1956 @type count: integer 1957 @param count: how many serials to return 1958 @rtype: list of int 1959 @return: a list of job identifiers. 1960 1961 """ 1962 assert ht.TNonNegativeInt(count) 1963 1964 # New number 1965 serial = self._last_serial + count 1966 1967 # Write to file 1968 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE, 1969 "%s\n" % serial, True) 1970 1971 result = [jstore.FormatJobID(v) 1972 for v in range(self._last_serial + 1, serial + 1)] 1973 1974 # Keep it only if we were able to write the file 1975 self._last_serial = serial 1976 1977 assert len(result) == count 1978 1979 return result
1980 1981 @staticmethod
1982 - def _GetJobPath(job_id):
1983 """Returns the job file for a given job id. 1984 1985 @type job_id: str 1986 @param job_id: the job identifier 1987 @rtype: str 1988 @return: the path to the job file 1989 1990 """ 1991 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1992 1993 @staticmethod
1994 - def _GetArchivedJobPath(job_id):
1995 """Returns the archived job file for a give job id. 1996 1997 @type job_id: str 1998 @param job_id: the job identifier 1999 @rtype: str 2000 @return: the path to the archived job file 2001 2002 """ 2003 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, 2004 jstore.GetArchiveDirectory(job_id), 2005 "job-%s" % job_id)
2006 2007 @staticmethod
2008 - def _DetermineJobDirectories(archived):
2009 """Build list of directories containing job files. 2010 2011 @type archived: bool 2012 @param archived: Whether to include directories for archived jobs 2013 @rtype: list 2014 2015 """ 2016 result = [pathutils.QUEUE_DIR] 2017 2018 if archived: 2019 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR 2020 result.extend(map(compat.partial(utils.PathJoin, archive_path), 2021 utils.ListVisibleFiles(archive_path))) 2022 2023 return result
2024 2025 @classmethod
2026 - def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2027 """Return all known job IDs. 2028 2029 The method only looks at disk because it's a requirement that all 2030 jobs are present on disk (so in the _memcache we don't have any 2031 extra IDs). 2032 2033 @type sort: boolean 2034 @param sort: perform sorting on the returned job ids 2035 @rtype: list 2036 @return: the list of job IDs 2037 2038 """ 2039 jlist = [] 2040 2041 for path in cls._DetermineJobDirectories(archived): 2042 for filename in utils.ListVisibleFiles(path): 2043 m = constants.JOB_FILE_RE.match(filename) 2044 if m: 2045 jlist.append(int(m.group(1))) 2046 2047 if sort: 2048 jlist.sort() 2049 return jlist
2050
2051 - def _LoadJobUnlocked(self, job_id):
2052 """Loads a job from the disk or memory. 2053 2054 Given a job id, this will return the cached job object if 2055 existing, or try to load the job from the disk. If loading from 2056 disk, it will also add the job to the cache. 2057 2058 @type job_id: int 2059 @param job_id: the job id 2060 @rtype: L{_QueuedJob} or None 2061 @return: either None or the job object 2062 2063 """ 2064 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!" 2065 2066 job = self._memcache.get(job_id, None) 2067 if job: 2068 logging.debug("Found job %s in memcache", job_id) 2069 assert job.writable, "Found read-only job in memcache" 2070 return job 2071 2072 try: 2073 job = self._LoadJobFromDisk(job_id, False) 2074 if job is None: 2075 return job 2076 except errors.JobFileCorrupted: 2077 old_path = self._GetJobPath(job_id) 2078 new_path = self._GetArchivedJobPath(job_id) 2079 if old_path == new_path: 2080 # job already archived (future case) 2081 logging.exception("Can't parse job %s", job_id) 2082 else: 2083 # non-archived case 2084 logging.exception("Can't parse job %s, will archive.", job_id) 2085 self._RenameFilesUnlocked([(old_path, new_path)]) 2086 return None 2087 2088 assert job.writable, "Job just loaded is not writable" 2089 2090 self._memcache[job_id] = job 2091 logging.debug("Added job %s to the cache", job_id) 2092 return job
2093
2094 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2095 """Load the given job file from disk. 2096 2097 Given a job file, read, load and restore it in a _QueuedJob format. 2098 2099 @type job_id: int 2100 @param job_id: job identifier 2101 @type try_archived: bool 2102 @param try_archived: Whether to try loading an archived job 2103 @rtype: L{_QueuedJob} or None 2104 @return: either None or the job object 2105 2106 """ 2107 path_functions = [(self._GetJobPath, False)] 2108 2109 if try_archived: 2110 path_functions.append((self._GetArchivedJobPath, True)) 2111 2112 raw_data = None 2113 archived = None 2114 2115 for (fn, archived) in path_functions: 2116 filepath = fn(job_id) 2117 logging.debug("Loading job from %s", filepath) 2118 try: 2119 raw_data = utils.ReadFile(filepath) 2120 except EnvironmentError, err: 2121 if err.errno != errno.ENOENT: 2122 raise 2123 else: 2124 break 2125 2126 if not raw_data: 2127 return None 2128 2129 if writable is None: 2130 writable = not archived 2131 2132 try: 2133 data = serializer.LoadJson(raw_data) 2134 job = _QueuedJob.Restore(self, data, writable, archived) 2135 except Exception, err: # pylint: disable=W0703 2136 raise errors.JobFileCorrupted(err) 2137 2138 return job
2139
2140 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2141 """Load the given job file from disk. 2142 2143 Given a job file, read, load and restore it in a _QueuedJob format. 2144 In case of error reading the job, it gets returned as None, and the 2145 exception is logged. 2146 2147 @type job_id: int 2148 @param job_id: job identifier 2149 @type try_archived: bool 2150 @param try_archived: Whether to try loading an archived job 2151 @rtype: L{_QueuedJob} or None 2152 @return: either None or the job object 2153 2154 """ 2155 try: 2156 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 2157 except (errors.JobFileCorrupted, EnvironmentError): 2158 logging.exception("Can't load/parse job %s", job_id) 2159 return None
2160
2161 - def _UpdateQueueSizeUnlocked(self):
2162 """Update the queue size. 2163 2164 """ 2165 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2166 2167 @locking.ssynchronized(_LOCK) 2168 @_RequireOpenQueue
2169 - def SetDrainFlag(self, drain_flag):
2170 """Sets the drain flag for the queue. 2171 2172 @type drain_flag: boolean 2173 @param drain_flag: Whether to set or unset the drain flag 2174 2175 """ 2176 # Change flag locally 2177 jstore.SetDrainFlag(drain_flag) 2178 2179 self._drained = drain_flag 2180 2181 # ... and on all nodes 2182 (names, addrs) = self._GetNodeIp() 2183 result = \ 2184 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag) 2185 self._CheckRpcResult(result, self._nodes, 2186 "Setting queue drain flag to %s" % drain_flag) 2187 2188 return True
2189 2190 @_RequireOpenQueue
2191 - def _SubmitJobUnlocked(self, job_id, ops):
2192 """Create and store a new job. 2193 2194 This enters the job into our job queue and also puts it on the new 2195 queue, in order for it to be picked up by the queue processors. 2196 2197 @type job_id: job ID 2198 @param job_id: the job ID for the new job 2199 @type ops: list 2200 @param ops: The list of OpCodes that will become the new job. 2201 @rtype: L{_QueuedJob} 2202 @return: the job object to be queued 2203 @raise errors.JobQueueFull: if the job queue has too many jobs in it 2204 @raise errors.GenericError: If an opcode is not valid 2205 2206 """ 2207 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: 2208 raise errors.JobQueueFull() 2209 2210 job = _QueuedJob(self, job_id, ops, True) 2211 2212 for idx, op in enumerate(job.ops): 2213 # Check priority 2214 if op.priority not in constants.OP_PRIO_SUBMIT_VALID: 2215 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2216 raise errors.GenericError("Opcode %s has invalid priority %s, allowed" 2217 " are %s" % (idx, op.priority, allowed)) 2218 2219 # Check job dependencies 2220 dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None) 2221 if not opcodes_base.TNoRelativeJobDependencies(dependencies): 2222 raise errors.GenericError("Opcode %s has invalid dependencies, must" 2223 " match %s: %s" % 2224 (idx, opcodes_base.TNoRelativeJobDependencies, 2225 dependencies)) 2226 2227 # Write to disk 2228 self.UpdateJobUnlocked(job) 2229 2230 self._queue_size += 1 2231 2232 logging.debug("Adding new job %s to the cache", job_id) 2233 self._memcache[job_id] = job 2234 2235 return job
2236 2237 @locking.ssynchronized(_LOCK) 2238 @_RequireOpenQueue 2239 @_RequireNonDrainedQueue
2240 - def SubmitJob(self, ops):
2241 """Create and store a new job. 2242 2243 @see: L{_SubmitJobUnlocked} 2244 2245 """ 2246 (job_id, ) = self._NewSerialsUnlocked(1) 2247 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) 2248 return job_id
2249 2250 @locking.ssynchronized(_LOCK) 2251 @_RequireOpenQueue
2252 - def SubmitJobToDrainedQueue(self, ops):
2253 """Forcefully create and store a new job. 2254 2255 Do so, even if the job queue is drained. 2256 @see: L{_SubmitJobUnlocked} 2257 2258 """ 2259 (job_id, ) = self._NewSerialsUnlocked(1) 2260 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) 2261 return job_id
2262 2263 @locking.ssynchronized(_LOCK) 2264 @_RequireOpenQueue 2265 @_RequireNonDrainedQueue
2266 - def SubmitManyJobs(self, jobs):
2267 """Create and store multiple jobs. 2268 2269 @see: L{_SubmitJobUnlocked} 2270 2271 """ 2272 all_job_ids = self._NewSerialsUnlocked(len(jobs)) 2273 2274 (results, added_jobs) = \ 2275 self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) 2276 2277 self._EnqueueJobsUnlocked(added_jobs) 2278 2279 return results
2280 2281 @staticmethod
2282 - def _FormatSubmitError(msg, ops):
2283 """Formats errors which occurred while submitting a job. 2284 2285 """ 2286 return ("%s; opcodes %s" % 2287 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2288 2289 @staticmethod
2290 - def _ResolveJobDependencies(resolve_fn, deps):
2291 """Resolves relative job IDs in dependencies. 2292 2293 @type resolve_fn: callable 2294 @param resolve_fn: Function to resolve a relative job ID 2295 @type deps: list 2296 @param deps: Dependencies 2297 @rtype: tuple; (boolean, string or list) 2298 @return: If successful (first tuple item), the returned list contains 2299 resolved job IDs along with the requested status; if not successful, 2300 the second element is an error message 2301 2302 """ 2303 result = [] 2304 2305 for (dep_job_id, dep_status) in deps: 2306 if ht.TRelativeJobId(dep_job_id): 2307 assert ht.TInt(dep_job_id) and dep_job_id < 0 2308 try: 2309 job_id = resolve_fn(dep_job_id) 2310 except IndexError: 2311 # Abort 2312 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 2313 else: 2314 job_id = dep_job_id 2315 2316 result.append((job_id, dep_status)) 2317 2318 return (True, result)
2319
2320 - def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2321 """Create and store multiple jobs. 2322 2323 @see: L{_SubmitJobUnlocked} 2324 2325 """ 2326 results = [] 2327 added_jobs = [] 2328 2329 def resolve_fn(job_idx, reljobid): 2330 assert reljobid < 0 2331 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2332 2333 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): 2334 for op in ops: 2335 if getattr(op, opcodes_base.DEPEND_ATTR, None): 2336 (status, data) = \ 2337 self._ResolveJobDependencies(compat.partial(resolve_fn, idx), 2338 op.depends) 2339 if not status: 2340 # Abort resolving dependencies 2341 assert ht.TNonEmptyString(data), "No error message" 2342 break 2343 # Use resolved dependencies 2344 op.depends = data 2345 else: 2346 try: 2347 job = self._SubmitJobUnlocked(job_id, ops) 2348 except errors.GenericError, err: 2349 status = False 2350 data = self._FormatSubmitError(str(err), ops) 2351 else: 2352 status = True 2353 data = job_id 2354 added_jobs.append(job) 2355 2356 results.append((status, data)) 2357 2358 return (results, added_jobs)
2359 2360 @locking.ssynchronized(_LOCK)
2361 - def _EnqueueJobs(self, jobs):
2362 """Helper function to add jobs to worker pool's queue. 2363 2364 @type jobs: list 2365 @param jobs: List of all jobs 2366 2367 """ 2368 return self._EnqueueJobsUnlocked(jobs)
2369
2370 - def _EnqueueJobsUnlocked(self, jobs):
2371 """Helper function to add jobs to worker pool's queue. 2372 2373 @type jobs: list 2374 @param jobs: List of all jobs 2375 2376 """ 2377 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" 2378 self._wpool.AddManyTasks([(job, ) for job in jobs], 2379 priority=[job.CalcPriority() for job in jobs], 2380 task_id=map(_GetIdAttr, jobs))
2381
2382 - def _GetJobStatusForDependencies(self, job_id):
2383 """Gets the status of a job for dependencies. 2384 2385 @type job_id: int 2386 @param job_id: Job ID 2387 @raise errors.JobLost: If job can't be found 2388 2389 """ 2390 # Not using in-memory cache as doing so would require an exclusive lock 2391 2392 # Try to load from disk 2393 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2394 2395 assert not job.writable, "Got writable job" # pylint: disable=E1101 2396 2397 if job: 2398 return job.CalcStatus() 2399 2400 raise errors.JobLost("Job %s not found" % job_id)
2401 2402 @_RequireOpenQueue
2403 - def UpdateJobUnlocked(self, job, replicate=True):
2404 """Update a job's on disk storage. 2405 2406 After a job has been modified, this function needs to be called in 2407 order to write the changes to disk and replicate them to the other 2408 nodes. 2409 2410 @type job: L{_QueuedJob} 2411 @param job: the changed job 2412 @type replicate: boolean 2413 @param replicate: whether to replicate the change to remote nodes 2414 2415 """ 2416 if __debug__: 2417 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 2418 assert (finalized ^ (job.end_timestamp is None)) 2419 assert job.writable, "Can't update read-only job" 2420 assert not job.archived, "Can't update archived job" 2421 2422 filename = self._GetJobPath(job.id) 2423 data = serializer.DumpJson(job.Serialize()) 2424 logging.debug("Writing job %s to %s", job.id, filename) 2425 self._UpdateJobQueueFile(filename, data, replicate)
2426
2427 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 2428 timeout):
2429 """Waits for changes in a job. 2430 2431 @type job_id: int 2432 @param job_id: Job identifier 2433 @type fields: list of strings 2434 @param fields: Which fields to check for changes 2435 @type prev_job_info: list or None 2436 @param prev_job_info: Last job information returned 2437 @type prev_log_serial: int 2438 @param prev_log_serial: Last job message serial number 2439 @type timeout: float 2440 @param timeout: maximum time to wait in seconds 2441 @rtype: tuple (job info, log entries) 2442 @return: a tuple of the job information as required via 2443 the fields parameter, and the log entries as a list 2444 2445 if the job has not changed and the timeout has expired, 2446 we instead return a special value, 2447 L{constants.JOB_NOTCHANGED}, which should be interpreted 2448 as such by the clients 2449 2450 """ 2451 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True, 2452 writable=False) 2453 2454 helper = _WaitForJobChangesHelper() 2455 2456 return helper(self._GetJobPath(job_id), load_fn, 2457 fields, prev_job_info, prev_log_serial, timeout)
2458 2459 @locking.ssynchronized(_LOCK) 2460 @_RequireOpenQueue
2461 - def CancelJob(self, job_id):
2462 """Cancels a job. 2463 2464 This will only succeed if the job has not started yet. 2465 2466 @type job_id: int 2467 @param job_id: job ID of job to be cancelled. 2468 2469 """ 2470 logging.info("Cancelling job %s", job_id) 2471 2472 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2473 2474 @locking.ssynchronized(_LOCK) 2475 @_RequireOpenQueue
2476 - def ChangeJobPriority(self, job_id, priority):
2477 """Changes a job's priority. 2478 2479 @type job_id: int 2480 @param job_id: ID of the job whose priority should be changed 2481 @type priority: int 2482 @param priority: New priority 2483 2484 """ 2485 logging.info("Changing priority of job %s to %s", job_id, priority) 2486 2487 if priority not in constants.OP_PRIO_SUBMIT_VALID: 2488 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2489 raise errors.GenericError("Invalid priority %s, allowed are %s" % 2490 (priority, allowed)) 2491 2492 def fn(job): 2493 (success, msg) = job.ChangePriority(priority) 2494 2495 if success: 2496 try: 2497 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) 2498 except workerpool.NoSuchTask: 2499 logging.debug("Job %s is not in workerpool at this time", job.id) 2500 2501 return (success, msg)
2502 2503 return self._ModifyJobUnlocked(job_id, fn) 2504
2505 - def _ModifyJobUnlocked(self, job_id, mod_fn):
2506 """Modifies a job. 2507 2508 @type job_id: int 2509 @param job_id: Job ID 2510 @type mod_fn: callable 2511 @param mod_fn: Modifying function, receiving job object as parameter, 2512 returning tuple of (status boolean, message string) 2513 2514 """ 2515 job = self._LoadJobUnlocked(job_id) 2516 if not job: 2517 logging.debug("Job %s not found", job_id) 2518 return (False, "Job %s not found" % job_id) 2519 2520 assert job.writable, "Can't modify read-only job" 2521 assert not job.archived, "Can't modify archived job" 2522 2523 (success, msg) = mod_fn(job) 2524 2525 if success: 2526 # If the job was finalized (e.g. cancelled), this is the final write 2527 # allowed. The job can be archived anytime. 2528 self.UpdateJobUnlocked(job) 2529 2530 return (success, msg)
2531 2532 @_RequireOpenQueue
2533 - def _ArchiveJobsUnlocked(self, jobs):
2534 """Archives jobs. 2535 2536 @type jobs: list of L{_QueuedJob} 2537 @param jobs: Job objects 2538 @rtype: int 2539 @return: Number of archived jobs 2540 2541 """ 2542 archive_jobs = [] 2543 rename_files = [] 2544 for job in jobs: 2545 assert job.writable, "Can't archive read-only job" 2546 assert not job.archived, "Can't cancel archived job" 2547 2548 if job.CalcStatus() not in constants.JOBS_FINALIZED: 2549 logging.debug("Job %s is not yet done", job.id) 2550 continue 2551 2552 archive_jobs.append(job) 2553 2554 old = self._GetJobPath(job.id) 2555 new = self._GetArchivedJobPath(job.id) 2556 rename_files.append((old, new)) 2557 2558 # TODO: What if 1..n files fail to rename? 2559 self._RenameFilesUnlocked(rename_files) 2560 2561 logging.debug("Successfully archived job(s) %s", 2562 utils.CommaJoin(job.id for job in archive_jobs)) 2563 2564 # Since we haven't quite checked, above, if we succeeded or failed renaming 2565 # the files, we update the cached queue size from the filesystem. When we 2566 # get around to fix the TODO: above, we can use the number of actually 2567 # archived jobs to fix this. 2568 self._UpdateQueueSizeUnlocked() 2569 return len(archive_jobs)
2570 2571 @locking.ssynchronized(_LOCK) 2572 @_RequireOpenQueue
2573 - def ArchiveJob(self, job_id):
2574 """Archives a job. 2575 2576 This is just a wrapper over L{_ArchiveJobsUnlocked}. 2577 2578 @type job_id: int 2579 @param job_id: Job ID of job to be archived. 2580 @rtype: bool 2581 @return: Whether job was archived 2582 2583 """ 2584 logging.info("Archiving job %s", job_id) 2585 2586 job = self._LoadJobUnlocked(job_id) 2587 if not job: 2588 logging.debug("Job %s not found", job_id) 2589 return False 2590 2591 return self._ArchiveJobsUnlocked([job]) == 1
2592 2593 @locking.ssynchronized(_LOCK) 2594 @_RequireOpenQueue
2595 - def AutoArchiveJobs(self, age, timeout):
2596 """Archives all jobs based on age. 2597 2598 The method will archive all jobs which are older than the age 2599 parameter. For jobs that don't have an end timestamp, the start 2600 timestamp will be considered. The special '-1' age will cause 2601 archival of all jobs (that are not running or queued). 2602 2603 @type age: int 2604 @param age: the minimum age in seconds 2605 2606 """ 2607 logging.info("Archiving jobs with age more than %s seconds", age) 2608 2609 now = time.time() 2610 end_time = now + timeout 2611 archived_count = 0 2612 last_touched = 0 2613 2614 all_job_ids = self._GetJobIDsUnlocked() 2615 pending = [] 2616 for idx, job_id in enumerate(all_job_ids): 2617 last_touched = idx + 1 2618 2619 # Not optimal because jobs could be pending 2620 # TODO: Measure average duration for job archival and take number of 2621 # pending jobs into account. 2622 if time.time() > end_time: 2623 break 2624 2625 # Returns None if the job failed to load 2626 job = self._LoadJobUnlocked(job_id) 2627 if job: 2628 if job.end_timestamp is None: 2629 if job.start_timestamp is None: 2630 job_age = job.received_timestamp 2631 else: 2632 job_age = job.start_timestamp 2633 else: 2634 job_age = job.end_timestamp 2635 2636 if age == -1 or now - job_age[0] > age: 2637 pending.append(job) 2638 2639 # Archive 10 jobs at a time 2640 if len(pending) >= 10: 2641 archived_count += self._ArchiveJobsUnlocked(pending) 2642 pending = [] 2643 2644 if pending: 2645 archived_count += self._ArchiveJobsUnlocked(pending) 2646 2647 return (archived_count, len(all_job_ids) - last_touched)
2648
2649 - def _Query(self, fields, qfilter):
2650 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, 2651 namefield="id") 2652 2653 # Archived jobs are only looked at if the "archived" field is referenced 2654 # either as a requested field or in the filter. By default archived jobs 2655 # are ignored. 2656 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData()) 2657 2658 job_ids = qobj.RequestedNames() 2659 2660 list_all = (job_ids is None) 2661 2662 if list_all: 2663 # Since files are added to/removed from the queue atomically, there's no 2664 # risk of getting the job ids in an inconsistent state. 2665 job_ids = self._GetJobIDsUnlocked(archived=include_archived) 2666 2667 jobs = [] 2668 2669 for job_id in job_ids: 2670 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2671 if job is not None or not list_all: 2672 jobs.append((job_id, job)) 2673 2674 return (qobj, jobs, list_all)
2675
2676 - def QueryJobs(self, fields, qfilter):
2677 """Returns a list of jobs in queue. 2678 2679 @type fields: sequence 2680 @param fields: List of wanted fields 2681 @type qfilter: None or query2 filter (list) 2682 @param qfilter: Query filter 2683 2684 """ 2685 (qobj, ctx, _) = self._Query(fields, qfilter) 2686 2687 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2688
2689 - def OldStyleQueryJobs(self, job_ids, fields):
2690 """Returns a list of jobs in queue. 2691 2692 @type job_ids: list 2693 @param job_ids: sequence of job identifiers or None for all 2694 @type fields: list 2695 @param fields: names of fields to return 2696 @rtype: list 2697 @return: list one element per job, each element being list with 2698 the requested fields 2699 2700 """ 2701 # backwards compat: 2702 job_ids = [int(jid) for jid in job_ids] 2703 qfilter = qlang.MakeSimpleFilter("id", job_ids) 2704 2705 (qobj, ctx, _) = self._Query(fields, qfilter) 2706 2707 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2708 2709 @locking.ssynchronized(_LOCK)
2710 - def PrepareShutdown(self):
2711 """Prepare to stop the job queue. 2712 2713 Disables execution of jobs in the workerpool and returns whether there are 2714 any jobs currently running. If the latter is the case, the job queue is not 2715 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can 2716 be called without interfering with any job. Queued and unfinished jobs will 2717 be resumed next time. 2718 2719 Once this function has been called no new job submissions will be accepted 2720 (see L{_RequireNonDrainedQueue}). 2721 2722 @rtype: bool 2723 @return: Whether there are any running jobs 2724 2725 """ 2726 if self._accepting_jobs: 2727 self._accepting_jobs = False 2728 2729 # Tell worker pool to stop processing pending tasks 2730 self._wpool.SetActive(False) 2731 2732 return self._wpool.HasRunningTasks()
2733
2734 - def AcceptingJobsUnlocked(self):
2735 """Returns whether jobs are accepted. 2736 2737 Once L{PrepareShutdown} has been called, no new jobs are accepted and the 2738 queue is shutting down. 2739 2740 @rtype: bool 2741 2742 """ 2743 return self._accepting_jobs
2744 2745 @locking.ssynchronized(_LOCK) 2746 @_RequireOpenQueue
2747 - def Shutdown(self):
2748 """Stops the job queue. 2749 2750 This shutdowns all the worker threads an closes the queue. 2751 2752 """ 2753 self._wpool.TerminateWorkers() 2754 2755 self._queue_filelock.Close() 2756 self._queue_filelock = None
2757