Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the job queue handling. 
  23   
  24  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  25  used by all other classes in this module. 
  26   
  27  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  28      processing jobs 
  29   
  30  """ 
  31   
  32  import logging 
  33  import errno 
  34  import time 
  35  import weakref 
  36  import threading 
  37  import itertools 
  38  import operator 
  39   
  40  try: 
  41    # pylint: disable=E0611 
  42    from pyinotify import pyinotify 
  43  except ImportError: 
  44    import pyinotify 
  45   
  46  from ganeti import asyncnotifier 
  47  from ganeti import constants 
  48  from ganeti import serializer 
  49  from ganeti import workerpool 
  50  from ganeti import locking 
  51  from ganeti import opcodes 
  52  from ganeti import errors 
  53  from ganeti import mcpu 
  54  from ganeti import utils 
  55  from ganeti import jstore 
  56  from ganeti import rpc 
  57  from ganeti import runtime 
  58  from ganeti import netutils 
  59  from ganeti import compat 
  60  from ganeti import ht 
  61  from ganeti import query 
  62  from ganeti import qlang 
  63  from ganeti import pathutils 
  64  from ganeti import vcluster 
  65   
  66   
  67  JOBQUEUE_THREADS = 25 
  68   
  69  # member lock names to be passed to @ssynchronized decorator 
  70  _LOCK = "_lock" 
  71  _QUEUE = "_queue" 
  72   
  73  #: Retrieves "id" attribute 
  74  _GetIdAttr = operator.attrgetter("id") 
75 76 77 -class CancelJob(Exception):
78 """Special exception to cancel a job. 79 80 """
81
82 83 -class QueueShutdown(Exception):
84 """Special exception to abort a job when the job queue is shutting down. 85 86 """
87
88 89 -def TimeStampNow():
90 """Returns the current timestamp. 91 92 @rtype: tuple 93 @return: the current time in the (seconds, microseconds) format 94 95 """ 96 return utils.SplitTime(time.time())
97
98 99 -def _CallJqUpdate(runner, names, file_name, content):
100 """Updates job queue file after virtualizing filename. 101 102 """ 103 virt_file_name = vcluster.MakeVirtualPath(file_name) 104 return runner.call_jobqueue_update(names, virt_file_name, content)
105
106 107 -class _SimpleJobQuery:
108 """Wrapper for job queries. 109 110 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}. 111 112 """
113 - def __init__(self, fields):
114 """Initializes this class. 115 116 """ 117 self._query = query.Query(query.JOB_FIELDS, fields)
118
119 - def __call__(self, job):
120 """Executes a job query using cached field list. 121 122 """ 123 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124
125 126 -class _QueuedOpCode(object):
127 """Encapsulates an opcode object. 128 129 @ivar log: holds the execution log and consists of tuples 130 of the form C{(log_serial, timestamp, level, message)} 131 @ivar input: the OpCode we encapsulate 132 @ivar status: the current status 133 @ivar result: the result of the LU execution 134 @ivar start_timestamp: timestamp for the start of the execution 135 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 136 @ivar stop_timestamp: timestamp for the end of the execution 137 138 """ 139 __slots__ = ["input", "status", "result", "log", "priority", 140 "start_timestamp", "exec_timestamp", "end_timestamp", 141 "__weakref__"] 142
143 - def __init__(self, op):
144 """Initializes instances of this class. 145 146 @type op: L{opcodes.OpCode} 147 @param op: the opcode we encapsulate 148 149 """ 150 self.input = op 151 self.status = constants.OP_STATUS_QUEUED 152 self.result = None 153 self.log = [] 154 self.start_timestamp = None 155 self.exec_timestamp = None 156 self.end_timestamp = None 157 158 # Get initial priority (it might change during the lifetime of this opcode) 159 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160 161 @classmethod
162 - def Restore(cls, state):
163 """Restore the _QueuedOpCode from the serialized form. 164 165 @type state: dict 166 @param state: the serialized state 167 @rtype: _QueuedOpCode 168 @return: a new _QueuedOpCode instance 169 170 """ 171 obj = _QueuedOpCode.__new__(cls) 172 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 173 obj.status = state["status"] 174 obj.result = state["result"] 175 obj.log = state["log"] 176 obj.start_timestamp = state.get("start_timestamp", None) 177 obj.exec_timestamp = state.get("exec_timestamp", None) 178 obj.end_timestamp = state.get("end_timestamp", None) 179 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 180 return obj
181
182 - def Serialize(self):
183 """Serializes this _QueuedOpCode. 184 185 @rtype: dict 186 @return: the dictionary holding the serialized state 187 188 """ 189 return { 190 "input": self.input.__getstate__(), 191 "status": self.status, 192 "result": self.result, 193 "log": self.log, 194 "start_timestamp": self.start_timestamp, 195 "exec_timestamp": self.exec_timestamp, 196 "end_timestamp": self.end_timestamp, 197 "priority": self.priority, 198 }
199
200 201 -class _QueuedJob(object):
202 """In-memory job representation. 203 204 This is what we use to track the user-submitted jobs. Locking must 205 be taken care of by users of this class. 206 207 @type queue: L{JobQueue} 208 @ivar queue: the parent queue 209 @ivar id: the job ID 210 @type ops: list 211 @ivar ops: the list of _QueuedOpCode that constitute the job 212 @type log_serial: int 213 @ivar log_serial: holds the index for the next log entry 214 @ivar received_timestamp: the timestamp for when the job was received 215 @ivar start_timestmap: the timestamp for start of execution 216 @ivar end_timestamp: the timestamp for end of execution 217 @ivar writable: Whether the job is allowed to be modified 218 219 """ 220 # pylint: disable=W0212 221 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 222 "received_timestamp", "start_timestamp", "end_timestamp", 223 "__weakref__", "processor_lock", "writable", "archived"] 224
225 - def __init__(self, queue, job_id, ops, writable):
226 """Constructor for the _QueuedJob. 227 228 @type queue: L{JobQueue} 229 @param queue: our parent queue 230 @type job_id: job_id 231 @param job_id: our job id 232 @type ops: list 233 @param ops: the list of opcodes we hold, which will be encapsulated 234 in _QueuedOpCodes 235 @type writable: bool 236 @param writable: Whether job can be modified 237 238 """ 239 if not ops: 240 raise errors.GenericError("A job needs at least one opcode") 241 242 self.queue = queue 243 self.id = int(job_id) 244 self.ops = [_QueuedOpCode(op) for op in ops] 245 self.log_serial = 0 246 self.received_timestamp = TimeStampNow() 247 self.start_timestamp = None 248 self.end_timestamp = None 249 self.archived = False 250 251 self._InitInMemory(self, writable) 252 253 assert not self.archived, "New jobs can not be marked as archived"
254 255 @staticmethod
256 - def _InitInMemory(obj, writable):
257 """Initializes in-memory variables. 258 259 """ 260 obj.writable = writable 261 obj.ops_iter = None 262 obj.cur_opctx = None 263 264 # Read-only jobs are not processed and therefore don't need a lock 265 if writable: 266 obj.processor_lock = threading.Lock() 267 else: 268 obj.processor_lock = None
269
270 - def __repr__(self):
271 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 272 "id=%s" % self.id, 273 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 274 275 return "<%s at %#x>" % (" ".join(status), id(self))
276 277 @classmethod
278 - def Restore(cls, queue, state, writable, archived):
279 """Restore a _QueuedJob from serialized state: 280 281 @type queue: L{JobQueue} 282 @param queue: to which queue the restored job belongs 283 @type state: dict 284 @param state: the serialized state 285 @type writable: bool 286 @param writable: Whether job can be modified 287 @type archived: bool 288 @param archived: Whether job was already archived 289 @rtype: _JobQueue 290 @return: the restored _JobQueue instance 291 292 """ 293 obj = _QueuedJob.__new__(cls) 294 obj.queue = queue 295 obj.id = int(state["id"]) 296 obj.received_timestamp = state.get("received_timestamp", None) 297 obj.start_timestamp = state.get("start_timestamp", None) 298 obj.end_timestamp = state.get("end_timestamp", None) 299 obj.archived = archived 300 301 obj.ops = [] 302 obj.log_serial = 0 303 for op_state in state["ops"]: 304 op = _QueuedOpCode.Restore(op_state) 305 for log_entry in op.log: 306 obj.log_serial = max(obj.log_serial, log_entry[0]) 307 obj.ops.append(op) 308 309 cls._InitInMemory(obj, writable) 310 311 return obj
312
313 - def Serialize(self):
314 """Serialize the _JobQueue instance. 315 316 @rtype: dict 317 @return: the serialized state 318 319 """ 320 return { 321 "id": self.id, 322 "ops": [op.Serialize() for op in self.ops], 323 "start_timestamp": self.start_timestamp, 324 "end_timestamp": self.end_timestamp, 325 "received_timestamp": self.received_timestamp, 326 }
327
328 - def CalcStatus(self):
329 """Compute the status of this job. 330 331 This function iterates over all the _QueuedOpCodes in the job and 332 based on their status, computes the job status. 333 334 The algorithm is: 335 - if we find a cancelled, or finished with error, the job 336 status will be the same 337 - otherwise, the last opcode with the status one of: 338 - waitlock 339 - canceling 340 - running 341 342 will determine the job status 343 344 - otherwise, it means either all opcodes are queued, or success, 345 and the job status will be the same 346 347 @return: the job status 348 349 """ 350 status = constants.JOB_STATUS_QUEUED 351 352 all_success = True 353 for op in self.ops: 354 if op.status == constants.OP_STATUS_SUCCESS: 355 continue 356 357 all_success = False 358 359 if op.status == constants.OP_STATUS_QUEUED: 360 pass 361 elif op.status == constants.OP_STATUS_WAITING: 362 status = constants.JOB_STATUS_WAITING 363 elif op.status == constants.OP_STATUS_RUNNING: 364 status = constants.JOB_STATUS_RUNNING 365 elif op.status == constants.OP_STATUS_CANCELING: 366 status = constants.JOB_STATUS_CANCELING 367 break 368 elif op.status == constants.OP_STATUS_ERROR: 369 status = constants.JOB_STATUS_ERROR 370 # The whole job fails if one opcode failed 371 break 372 elif op.status == constants.OP_STATUS_CANCELED: 373 status = constants.OP_STATUS_CANCELED 374 break 375 376 if all_success: 377 status = constants.JOB_STATUS_SUCCESS 378 379 return status
380
381 - def CalcPriority(self):
382 """Gets the current priority for this job. 383 384 Only unfinished opcodes are considered. When all are done, the default 385 priority is used. 386 387 @rtype: int 388 389 """ 390 priorities = [op.priority for op in self.ops 391 if op.status not in constants.OPS_FINALIZED] 392 393 if not priorities: 394 # All opcodes are done, assume default priority 395 return constants.OP_PRIO_DEFAULT 396 397 return min(priorities)
398
399 - def GetLogEntries(self, newer_than):
400 """Selectively returns the log entries. 401 402 @type newer_than: None or int 403 @param newer_than: if this is None, return all log entries, 404 otherwise return only the log entries with serial higher 405 than this value 406 @rtype: list 407 @return: the list of the log entries selected 408 409 """ 410 if newer_than is None: 411 serial = -1 412 else: 413 serial = newer_than 414 415 entries = [] 416 for op in self.ops: 417 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 418 419 return entries
420
421 - def GetInfo(self, fields):
422 """Returns information about a job. 423 424 @type fields: list 425 @param fields: names of fields to return 426 @rtype: list 427 @return: list with one element for each field 428 @raise errors.OpExecError: when an invalid field 429 has been passed 430 431 """ 432 return _SimpleJobQuery(fields)(self)
433
434 - def MarkUnfinishedOps(self, status, result):
435 """Mark unfinished opcodes with a given status and result. 436 437 This is an utility function for marking all running or waiting to 438 be run opcodes with a given status. Opcodes which are already 439 finalised are not changed. 440 441 @param status: a given opcode status 442 @param result: the opcode result 443 444 """ 445 not_marked = True 446 for op in self.ops: 447 if op.status in constants.OPS_FINALIZED: 448 assert not_marked, "Finalized opcodes found after non-finalized ones" 449 continue 450 op.status = status 451 op.result = result 452 not_marked = False
453
454 - def Finalize(self):
455 """Marks the job as finalized. 456 457 """ 458 self.end_timestamp = TimeStampNow()
459
460 - def Cancel(self):
461 """Marks job as canceled/-ing if possible. 462 463 @rtype: tuple; (bool, string) 464 @return: Boolean describing whether job was successfully canceled or marked 465 as canceling and a text message 466 467 """ 468 status = self.CalcStatus() 469 470 if status == constants.JOB_STATUS_QUEUED: 471 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 472 "Job canceled by request") 473 self.Finalize() 474 return (True, "Job %s canceled" % self.id) 475 476 elif status == constants.JOB_STATUS_WAITING: 477 # The worker will notice the new status and cancel the job 478 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 479 return (True, "Job %s will be canceled" % self.id) 480 481 else: 482 logging.debug("Job %s is no longer waiting in the queue", self.id) 483 return (False, "Job %s is no longer waiting in the queue" % self.id)
484
485 - def ChangePriority(self, priority):
486 """Changes the job priority. 487 488 @type priority: int 489 @param priority: New priority 490 @rtype: tuple; (bool, string) 491 @return: Boolean describing whether job's priority was successfully changed 492 and a text message 493 494 """ 495 status = self.CalcStatus() 496 497 if status in constants.JOBS_FINALIZED: 498 return (False, "Job %s is finished" % self.id) 499 elif status == constants.JOB_STATUS_CANCELING: 500 return (False, "Job %s is cancelling" % self.id) 501 else: 502 assert status in (constants.JOB_STATUS_QUEUED, 503 constants.JOB_STATUS_WAITING, 504 constants.JOB_STATUS_RUNNING) 505 506 changed = False 507 for op in self.ops: 508 if (op.status == constants.OP_STATUS_RUNNING or 509 op.status in constants.OPS_FINALIZED): 510 assert not changed, \ 511 ("Found opcode for which priority should not be changed after" 512 " priority has been changed for previous opcodes") 513 continue 514 515 assert op.status in (constants.OP_STATUS_QUEUED, 516 constants.OP_STATUS_WAITING) 517 518 changed = True 519 520 # Set new priority (doesn't modify opcode input) 521 op.priority = priority 522 523 if changed: 524 return (True, ("Priorities of pending opcodes for job %s have been" 525 " changed to %s" % (self.id, priority))) 526 else: 527 return (False, "Job %s had no pending opcodes" % self.id)
528
529 530 -class _OpExecCallbacks(mcpu.OpExecCbBase):
531 - def __init__(self, queue, job, op):
532 """Initializes this class. 533 534 @type queue: L{JobQueue} 535 @param queue: Job queue 536 @type job: L{_QueuedJob} 537 @param job: Job object 538 @type op: L{_QueuedOpCode} 539 @param op: OpCode 540 541 """ 542 assert queue, "Queue is missing" 543 assert job, "Job is missing" 544 assert op, "Opcode is missing" 545 546 self._queue = queue 547 self._job = job 548 self._op = op
549
550 - def _CheckCancel(self):
551 """Raises an exception to cancel the job if asked to. 552 553 """ 554 # Cancel here if we were asked to 555 if self._op.status == constants.OP_STATUS_CANCELING: 556 logging.debug("Canceling opcode") 557 raise CancelJob() 558 559 # See if queue is shutting down 560 if not self._queue.AcceptingJobsUnlocked(): 561 logging.debug("Queue is shutting down") 562 raise QueueShutdown()
563 564 @locking.ssynchronized(_QUEUE, shared=1)
565 - def NotifyStart(self):
566 """Mark the opcode as running, not lock-waiting. 567 568 This is called from the mcpu code as a notifier function, when the LU is 569 finally about to start the Exec() method. Of course, to have end-user 570 visible results, the opcode must be initially (before calling into 571 Processor.ExecOpCode) set to OP_STATUS_WAITING. 572 573 """ 574 assert self._op in self._job.ops 575 assert self._op.status in (constants.OP_STATUS_WAITING, 576 constants.OP_STATUS_CANCELING) 577 578 # Cancel here if we were asked to 579 self._CheckCancel() 580 581 logging.debug("Opcode is now running") 582 583 self._op.status = constants.OP_STATUS_RUNNING 584 self._op.exec_timestamp = TimeStampNow() 585 586 # And finally replicate the job status 587 self._queue.UpdateJobUnlocked(self._job)
588 589 @locking.ssynchronized(_QUEUE, shared=1)
590 - def _AppendFeedback(self, timestamp, log_type, log_msg):
591 """Internal feedback append function, with locks 592 593 """ 594 self._job.log_serial += 1 595 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 596 self._queue.UpdateJobUnlocked(self._job, replicate=False)
597
598 - def Feedback(self, *args):
599 """Append a log entry. 600 601 """ 602 assert len(args) < 3 603 604 if len(args) == 1: 605 log_type = constants.ELOG_MESSAGE 606 log_msg = args[0] 607 else: 608 (log_type, log_msg) = args 609 610 # The time is split to make serialization easier and not lose 611 # precision. 612 timestamp = utils.SplitTime(time.time()) 613 self._AppendFeedback(timestamp, log_type, log_msg)
614
615 - def CurrentPriority(self):
616 """Returns current priority for opcode. 617 618 """ 619 assert self._op.status in (constants.OP_STATUS_WAITING, 620 constants.OP_STATUS_CANCELING) 621 622 # Cancel here if we were asked to 623 self._CheckCancel() 624 625 return self._op.priority
626
627 - def SubmitManyJobs(self, jobs):
628 """Submits jobs for processing. 629 630 See L{JobQueue.SubmitManyJobs}. 631 632 """ 633 # Locking is done in job queue 634 return self._queue.SubmitManyJobs(jobs)
635
636 637 -class _JobChangesChecker(object):
638 - def __init__(self, fields, prev_job_info, prev_log_serial):
639 """Initializes this class. 640 641 @type fields: list of strings 642 @param fields: Fields requested by LUXI client 643 @type prev_job_info: string 644 @param prev_job_info: previous job info, as passed by the LUXI client 645 @type prev_log_serial: string 646 @param prev_log_serial: previous job serial, as passed by the LUXI client 647 648 """ 649 self._squery = _SimpleJobQuery(fields) 650 self._prev_job_info = prev_job_info 651 self._prev_log_serial = prev_log_serial
652
653 - def __call__(self, job):
654 """Checks whether job has changed. 655 656 @type job: L{_QueuedJob} 657 @param job: Job object 658 659 """ 660 assert not job.writable, "Expected read-only job" 661 662 status = job.CalcStatus() 663 job_info = self._squery(job) 664 log_entries = job.GetLogEntries(self._prev_log_serial) 665 666 # Serializing and deserializing data can cause type changes (e.g. from 667 # tuple to list) or precision loss. We're doing it here so that we get 668 # the same modifications as the data received from the client. Without 669 # this, the comparison afterwards might fail without the data being 670 # significantly different. 671 # TODO: we just deserialized from disk, investigate how to make sure that 672 # the job info and log entries are compatible to avoid this further step. 673 # TODO: Doing something like in testutils.py:UnifyValueType might be more 674 # efficient, though floats will be tricky 675 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 676 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 677 678 # Don't even try to wait if the job is no longer running, there will be 679 # no changes. 680 if (status not in (constants.JOB_STATUS_QUEUED, 681 constants.JOB_STATUS_RUNNING, 682 constants.JOB_STATUS_WAITING) or 683 job_info != self._prev_job_info or 684 (log_entries and self._prev_log_serial != log_entries[0][0])): 685 logging.debug("Job %s changed", job.id) 686 return (job_info, log_entries) 687 688 return None
689
690 691 -class _JobFileChangesWaiter(object):
692 - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
693 """Initializes this class. 694 695 @type filename: string 696 @param filename: Path to job file 697 @raises errors.InotifyError: if the notifier cannot be setup 698 699 """ 700 self._wm = _inotify_wm_cls() 701 self._inotify_handler = \ 702 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) 703 self._notifier = \ 704 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) 705 try: 706 self._inotify_handler.enable() 707 except Exception: 708 # pyinotify doesn't close file descriptors automatically 709 self._notifier.stop() 710 raise
711
712 - def _OnInotify(self, notifier_enabled):
713 """Callback for inotify. 714 715 """ 716 if not notifier_enabled: 717 self._inotify_handler.enable()
718
719 - def Wait(self, timeout):
720 """Waits for the job file to change. 721 722 @type timeout: float 723 @param timeout: Timeout in seconds 724 @return: Whether there have been events 725 726 """ 727 assert timeout >= 0 728 have_events = self._notifier.check_events(timeout * 1000) 729 if have_events: 730 self._notifier.read_events() 731 self._notifier.process_events() 732 return have_events
733
734 - def Close(self):
735 """Closes underlying notifier and its file descriptor. 736 737 """ 738 self._notifier.stop()
739
740 741 -class _JobChangesWaiter(object):
742 - def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
743 """Initializes this class. 744 745 @type filename: string 746 @param filename: Path to job file 747 748 """ 749 self._filewaiter = None 750 self._filename = filename 751 self._waiter_cls = _waiter_cls
752
753 - def Wait(self, timeout):
754 """Waits for a job to change. 755 756 @type timeout: float 757 @param timeout: Timeout in seconds 758 @return: Whether there have been events 759 760 """ 761 if self._filewaiter: 762 return self._filewaiter.Wait(timeout) 763 764 # Lazy setup: Avoid inotify setup cost when job file has already changed. 765 # If this point is reached, return immediately and let caller check the job 766 # file again in case there were changes since the last check. This avoids a 767 # race condition. 768 self._filewaiter = self._waiter_cls(self._filename) 769 770 return True
771
772 - def Close(self):
773 """Closes underlying waiter. 774 775 """ 776 if self._filewaiter: 777 self._filewaiter.Close()
778
779 780 -class _WaitForJobChangesHelper(object):
781 """Helper class using inotify to wait for changes in a job file. 782 783 This class takes a previous job status and serial, and alerts the client when 784 the current job status has changed. 785 786 """ 787 @staticmethod
788 - def _CheckForChanges(counter, job_load_fn, check_fn):
789 if counter.next() > 0: 790 # If this isn't the first check the job is given some more time to change 791 # again. This gives better performance for jobs generating many 792 # changes/messages. 793 time.sleep(0.1) 794 795 job = job_load_fn() 796 if not job: 797 raise errors.JobLost() 798 799 result = check_fn(job) 800 if result is None: 801 raise utils.RetryAgain() 802 803 return result
804
805 - def __call__(self, filename, job_load_fn, 806 fields, prev_job_info, prev_log_serial, timeout, 807 _waiter_cls=_JobChangesWaiter):
808 """Waits for changes on a job. 809 810 @type filename: string 811 @param filename: File on which to wait for changes 812 @type job_load_fn: callable 813 @param job_load_fn: Function to load job 814 @type fields: list of strings 815 @param fields: Which fields to check for changes 816 @type prev_job_info: list or None 817 @param prev_job_info: Last job information returned 818 @type prev_log_serial: int 819 @param prev_log_serial: Last job message serial number 820 @type timeout: float 821 @param timeout: maximum time to wait in seconds 822 823 """ 824 counter = itertools.count() 825 try: 826 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) 827 waiter = _waiter_cls(filename) 828 try: 829 return utils.Retry(compat.partial(self._CheckForChanges, 830 counter, job_load_fn, check_fn), 831 utils.RETRY_REMAINING_TIME, timeout, 832 wait_fn=waiter.Wait) 833 finally: 834 waiter.Close() 835 except errors.JobLost: 836 return None 837 except utils.RetryTimeout: 838 return constants.JOB_NOTCHANGED
839
840 841 -def _EncodeOpError(err):
842 """Encodes an error which occurred while processing an opcode. 843 844 """ 845 if isinstance(err, errors.GenericError): 846 to_encode = err 847 else: 848 to_encode = errors.OpExecError(str(err)) 849 850 return errors.EncodeException(to_encode)
851
852 853 -class _TimeoutStrategyWrapper:
854 - def __init__(self, fn):
855 """Initializes this class. 856 857 """ 858 self._fn = fn 859 self._next = None
860
861 - def _Advance(self):
862 """Gets the next timeout if necessary. 863 864 """ 865 if self._next is None: 866 self._next = self._fn()
867
868 - def Peek(self):
869 """Returns the next timeout. 870 871 """ 872 self._Advance() 873 return self._next
874
875 - def Next(self):
876 """Returns the current timeout and advances the internal state. 877 878 """ 879 self._Advance() 880 result = self._next 881 self._next = None 882 return result
883
884 885 -class _OpExecContext:
886 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
887 """Initializes this class. 888 889 """ 890 self.op = op 891 self.index = index 892 self.log_prefix = log_prefix 893 self.summary = op.input.Summary() 894 895 # Create local copy to modify 896 if getattr(op.input, opcodes.DEPEND_ATTR, None): 897 self.jobdeps = op.input.depends[:] 898 else: 899 self.jobdeps = None 900 901 self._timeout_strategy_factory = timeout_strategy_factory 902 self._ResetTimeoutStrategy()
903
904 - def _ResetTimeoutStrategy(self):
905 """Creates a new timeout strategy. 906 907 """ 908 self._timeout_strategy = \ 909 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
910
911 - def CheckPriorityIncrease(self):
912 """Checks whether priority can and should be increased. 913 914 Called when locks couldn't be acquired. 915 916 """ 917 op = self.op 918 919 # Exhausted all retries and next round should not use blocking acquire 920 # for locks? 921 if (self._timeout_strategy.Peek() is None and 922 op.priority > constants.OP_PRIO_HIGHEST): 923 logging.debug("Increasing priority") 924 op.priority -= 1 925 self._ResetTimeoutStrategy() 926 return True 927 928 return False
929
930 - def GetNextLockTimeout(self):
931 """Returns the next lock acquire timeout. 932 933 """ 934 return self._timeout_strategy.Next()
935
936 937 -class _JobProcessor(object):
938 (DEFER, 939 WAITDEP, 940 FINISHED) = range(1, 4) 941
942 - def __init__(self, queue, opexec_fn, job, 943 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
944 """Initializes this class. 945 946 """ 947 self.queue = queue 948 self.opexec_fn = opexec_fn 949 self.job = job 950 self._timeout_strategy_factory = _timeout_strategy_factory
951 952 @staticmethod
953 - def _FindNextOpcode(job, timeout_strategy_factory):
954 """Locates the next opcode to run. 955 956 @type job: L{_QueuedJob} 957 @param job: Job object 958 @param timeout_strategy_factory: Callable to create new timeout strategy 959 960 """ 961 # Create some sort of a cache to speed up locating next opcode for future 962 # lookups 963 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 964 # pending and one for processed ops. 965 if job.ops_iter is None: 966 job.ops_iter = enumerate(job.ops) 967 968 # Find next opcode to run 969 while True: 970 try: 971 (idx, op) = job.ops_iter.next() 972 except StopIteration: 973 raise errors.ProgrammerError("Called for a finished job") 974 975 if op.status == constants.OP_STATUS_RUNNING: 976 # Found an opcode already marked as running 977 raise errors.ProgrammerError("Called for job marked as running") 978 979 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 980 timeout_strategy_factory) 981 982 if op.status not in constants.OPS_FINALIZED: 983 return opctx 984 985 # This is a job that was partially completed before master daemon 986 # shutdown, so it can be expected that some opcodes are already 987 # completed successfully (if any did error out, then the whole job 988 # should have been aborted and not resubmitted for processing). 989 logging.info("%s: opcode %s already processed, skipping", 990 opctx.log_prefix, opctx.summary)
991 992 @staticmethod
993 - def _MarkWaitlock(job, op):
994 """Marks an opcode as waiting for locks. 995 996 The job's start timestamp is also set if necessary. 997 998 @type job: L{_QueuedJob} 999 @param job: Job object 1000 @type op: L{_QueuedOpCode} 1001 @param op: Opcode object 1002 1003 """ 1004 assert op in job.ops 1005 assert op.status in (constants.OP_STATUS_QUEUED, 1006 constants.OP_STATUS_WAITING) 1007 1008 update = False 1009 1010 op.result = None 1011 1012 if op.status == constants.OP_STATUS_QUEUED: 1013 op.status = constants.OP_STATUS_WAITING 1014 update = True 1015 1016 if op.start_timestamp is None: 1017 op.start_timestamp = TimeStampNow() 1018 update = True 1019 1020 if job.start_timestamp is None: 1021 job.start_timestamp = op.start_timestamp 1022 update = True 1023 1024 assert op.status == constants.OP_STATUS_WAITING 1025 1026 return update
1027 1028 @staticmethod
1029 - def _CheckDependencies(queue, job, opctx):
1030 """Checks if an opcode has dependencies and if so, processes them. 1031 1032 @type queue: L{JobQueue} 1033 @param queue: Queue object 1034 @type job: L{_QueuedJob} 1035 @param job: Job object 1036 @type opctx: L{_OpExecContext} 1037 @param opctx: Opcode execution context 1038 @rtype: bool 1039 @return: Whether opcode will be re-scheduled by dependency tracker 1040 1041 """ 1042 op = opctx.op 1043 1044 result = False 1045 1046 while opctx.jobdeps: 1047 (dep_job_id, dep_status) = opctx.jobdeps[0] 1048 1049 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 1050 dep_status) 1051 assert ht.TNonEmptyString(depmsg), "No dependency message" 1052 1053 logging.info("%s: %s", opctx.log_prefix, depmsg) 1054 1055 if depresult == _JobDependencyManager.CONTINUE: 1056 # Remove dependency and continue 1057 opctx.jobdeps.pop(0) 1058 1059 elif depresult == _JobDependencyManager.WAIT: 1060 # Need to wait for notification, dependency tracker will re-add job 1061 # to workerpool 1062 result = True 1063 break 1064 1065 elif depresult == _JobDependencyManager.CANCEL: 1066 # Job was cancelled, cancel this job as well 1067 job.Cancel() 1068 assert op.status == constants.OP_STATUS_CANCELING 1069 break 1070 1071 elif depresult in (_JobDependencyManager.WRONGSTATUS, 1072 _JobDependencyManager.ERROR): 1073 # Job failed or there was an error, this job must fail 1074 op.status = constants.OP_STATUS_ERROR 1075 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 1076 break 1077 1078 else: 1079 raise errors.ProgrammerError("Unknown dependency result '%s'" % 1080 depresult) 1081 1082 return result
1083
1084 - def _ExecOpCodeUnlocked(self, opctx):
1085 """Processes one opcode and returns the result. 1086 1087 """ 1088 op = opctx.op 1089 1090 assert op.status == constants.OP_STATUS_WAITING 1091 1092 timeout = opctx.GetNextLockTimeout() 1093 1094 try: 1095 # Make sure not to hold queue lock while calling ExecOpCode 1096 result = self.opexec_fn(op.input, 1097 _OpExecCallbacks(self.queue, self.job, op), 1098 timeout=timeout) 1099 except mcpu.LockAcquireTimeout: 1100 assert timeout is not None, "Received timeout for blocking acquire" 1101 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 1102 1103 assert op.status in (constants.OP_STATUS_WAITING, 1104 constants.OP_STATUS_CANCELING) 1105 1106 # Was job cancelled while we were waiting for the lock? 1107 if op.status == constants.OP_STATUS_CANCELING: 1108 return (constants.OP_STATUS_CANCELING, None) 1109 1110 # Queue is shutting down, return to queued 1111 if not self.queue.AcceptingJobsUnlocked(): 1112 return (constants.OP_STATUS_QUEUED, None) 1113 1114 # Stay in waitlock while trying to re-acquire lock 1115 return (constants.OP_STATUS_WAITING, None) 1116 except CancelJob: 1117 logging.exception("%s: Canceling job", opctx.log_prefix) 1118 assert op.status == constants.OP_STATUS_CANCELING 1119 return (constants.OP_STATUS_CANCELING, None) 1120 1121 except QueueShutdown: 1122 logging.exception("%s: Queue is shutting down", opctx.log_prefix) 1123 1124 assert op.status == constants.OP_STATUS_WAITING 1125 1126 # Job hadn't been started yet, so it should return to the queue 1127 return (constants.OP_STATUS_QUEUED, None) 1128 1129 except Exception, err: # pylint: disable=W0703 1130 logging.exception("%s: Caught exception in %s", 1131 opctx.log_prefix, opctx.summary) 1132 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 1133 else: 1134 logging.debug("%s: %s successful", 1135 opctx.log_prefix, opctx.summary) 1136 return (constants.OP_STATUS_SUCCESS, result)
1137
1138 - def __call__(self, _nextop_fn=None):
1139 """Continues execution of a job. 1140 1141 @param _nextop_fn: Callback function for tests 1142 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 1143 be deferred and C{WAITDEP} if the dependency manager 1144 (L{_JobDependencyManager}) will re-schedule the job when appropriate 1145 1146 """ 1147 queue = self.queue 1148 job = self.job 1149 1150 logging.debug("Processing job %s", job.id) 1151 1152 queue.acquire(shared=1) 1153 try: 1154 opcount = len(job.ops) 1155 1156 assert job.writable, "Expected writable job" 1157 1158 # Don't do anything for finalized jobs 1159 if job.CalcStatus() in constants.JOBS_FINALIZED: 1160 return self.FINISHED 1161 1162 # Is a previous opcode still pending? 1163 if job.cur_opctx: 1164 opctx = job.cur_opctx 1165 job.cur_opctx = None 1166 else: 1167 if __debug__ and _nextop_fn: 1168 _nextop_fn() 1169 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 1170 1171 op = opctx.op 1172 1173 # Consistency check 1174 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 1175 constants.OP_STATUS_CANCELING) 1176 for i in job.ops[opctx.index + 1:]) 1177 1178 assert op.status in (constants.OP_STATUS_QUEUED, 1179 constants.OP_STATUS_WAITING, 1180 constants.OP_STATUS_CANCELING) 1181 1182 assert (op.priority <= constants.OP_PRIO_LOWEST and 1183 op.priority >= constants.OP_PRIO_HIGHEST) 1184 1185 waitjob = None 1186 1187 if op.status != constants.OP_STATUS_CANCELING: 1188 assert op.status in (constants.OP_STATUS_QUEUED, 1189 constants.OP_STATUS_WAITING) 1190 1191 # Prepare to start opcode 1192 if self._MarkWaitlock(job, op): 1193 # Write to disk 1194 queue.UpdateJobUnlocked(job) 1195 1196 assert op.status == constants.OP_STATUS_WAITING 1197 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1198 assert job.start_timestamp and op.start_timestamp 1199 assert waitjob is None 1200 1201 # Check if waiting for a job is necessary 1202 waitjob = self._CheckDependencies(queue, job, opctx) 1203 1204 assert op.status in (constants.OP_STATUS_WAITING, 1205 constants.OP_STATUS_CANCELING, 1206 constants.OP_STATUS_ERROR) 1207 1208 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1209 constants.OP_STATUS_ERROR)): 1210 logging.info("%s: opcode %s waiting for locks", 1211 opctx.log_prefix, opctx.summary) 1212 1213 assert not opctx.jobdeps, "Not all dependencies were removed" 1214 1215 queue.release() 1216 try: 1217 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1218 finally: 1219 queue.acquire(shared=1) 1220 1221 op.status = op_status 1222 op.result = op_result 1223 1224 assert not waitjob 1225 1226 if op.status in (constants.OP_STATUS_WAITING, 1227 constants.OP_STATUS_QUEUED): 1228 # waiting: Couldn't get locks in time 1229 # queued: Queue is shutting down 1230 assert not op.end_timestamp 1231 else: 1232 # Finalize opcode 1233 op.end_timestamp = TimeStampNow() 1234 1235 if op.status == constants.OP_STATUS_CANCELING: 1236 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1237 for i in job.ops[opctx.index:]) 1238 else: 1239 assert op.status in constants.OPS_FINALIZED 1240 1241 if op.status == constants.OP_STATUS_QUEUED: 1242 # Queue is shutting down 1243 assert not waitjob 1244 1245 finalize = False 1246 1247 # Reset context 1248 job.cur_opctx = None 1249 1250 # In no case must the status be finalized here 1251 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1252 1253 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1254 finalize = False 1255 1256 if not waitjob and opctx.CheckPriorityIncrease(): 1257 # Priority was changed, need to update on-disk file 1258 queue.UpdateJobUnlocked(job) 1259 1260 # Keep around for another round 1261 job.cur_opctx = opctx 1262 1263 assert (op.priority <= constants.OP_PRIO_LOWEST and 1264 op.priority >= constants.OP_PRIO_HIGHEST) 1265 1266 # In no case must the status be finalized here 1267 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1268 1269 else: 1270 # Ensure all opcodes so far have been successful 1271 assert (opctx.index == 0 or 1272 compat.all(i.status == constants.OP_STATUS_SUCCESS 1273 for i in job.ops[:opctx.index])) 1274 1275 # Reset context 1276 job.cur_opctx = None 1277 1278 if op.status == constants.OP_STATUS_SUCCESS: 1279 finalize = False 1280 1281 elif op.status == constants.OP_STATUS_ERROR: 1282 # Ensure failed opcode has an exception as its result 1283 assert errors.GetEncodedError(job.ops[opctx.index].result) 1284 1285 to_encode = errors.OpExecError("Preceding opcode failed") 1286 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1287 _EncodeOpError(to_encode)) 1288 finalize = True 1289 1290 # Consistency check 1291 assert compat.all(i.status == constants.OP_STATUS_ERROR and 1292 errors.GetEncodedError(i.result) 1293 for i in job.ops[opctx.index:]) 1294 1295 elif op.status == constants.OP_STATUS_CANCELING: 1296 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1297 "Job canceled by request") 1298 finalize = True 1299 1300 else: 1301 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1302 1303 if opctx.index == (opcount - 1): 1304 # Finalize on last opcode 1305 finalize = True 1306 1307 if finalize: 1308 # All opcodes have been run, finalize job 1309 job.Finalize() 1310 1311 # Write to disk. If the job status is final, this is the final write 1312 # allowed. Once the file has been written, it can be archived anytime. 1313 queue.UpdateJobUnlocked(job) 1314 1315 assert not waitjob 1316 1317 if finalize: 1318 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1319 return self.FINISHED 1320 1321 assert not waitjob or queue.depmgr.JobWaiting(job) 1322 1323 if waitjob: 1324 return self.WAITDEP 1325 else: 1326 return self.DEFER 1327 finally: 1328 assert job.writable, "Job became read-only while being processed" 1329 queue.release()
1330
1331 1332 -def _EvaluateJobProcessorResult(depmgr, job, result):
1333 """Looks at a result from L{_JobProcessor} for a job. 1334 1335 To be used in a L{_JobQueueWorker}. 1336 1337 """ 1338 if result == _JobProcessor.FINISHED: 1339 # Notify waiting jobs 1340 depmgr.NotifyWaiters(job.id) 1341 1342 elif result == _JobProcessor.DEFER: 1343 # Schedule again 1344 raise workerpool.DeferTask(priority=job.CalcPriority()) 1345 1346 elif result == _JobProcessor.WAITDEP: 1347 # No-op, dependency manager will re-schedule 1348 pass 1349 1350 else: 1351 raise errors.ProgrammerError("Job processor returned unknown status %s" % 1352 (result, ))
1353
1354 1355 -class _JobQueueWorker(workerpool.BaseWorker):
1356 """The actual job workers. 1357 1358 """
1359 - def RunTask(self, job): # pylint: disable=W0221
1360 """Job executor. 1361 1362 @type job: L{_QueuedJob} 1363 @param job: the job to be processed 1364 1365 """ 1366 assert job.writable, "Expected writable job" 1367 1368 # Ensure only one worker is active on a single job. If a job registers for 1369 # a dependency job, and the other job notifies before the first worker is 1370 # done, the job can end up in the tasklist more than once. 1371 job.processor_lock.acquire() 1372 try: 1373 return self._RunTaskInner(job) 1374 finally: 1375 job.processor_lock.release()
1376
1377 - def _RunTaskInner(self, job):
1378 """Executes a job. 1379 1380 Must be called with per-job lock acquired. 1381 1382 """ 1383 queue = job.queue 1384 assert queue == self.pool.queue 1385 1386 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) 1387 setname_fn(None) 1388 1389 proc = mcpu.Processor(queue.context, job.id) 1390 1391 # Create wrapper for setting thread name 1392 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, 1393 proc.ExecOpCode) 1394 1395 _EvaluateJobProcessorResult(queue.depmgr, job, 1396 _JobProcessor(queue, wrap_execop_fn, job)())
1397 1398 @staticmethod
1399 - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1400 """Updates the worker thread name to include a short summary of the opcode. 1401 1402 @param setname_fn: Callable setting worker thread name 1403 @param execop_fn: Callable for executing opcode (usually 1404 L{mcpu.Processor.ExecOpCode}) 1405 1406 """ 1407 setname_fn(op) 1408 try: 1409 return execop_fn(op, *args, **kwargs) 1410 finally: 1411 setname_fn(None)
1412 1413 @staticmethod
1414 - def _GetWorkerName(job, op):
1415 """Sets the worker thread name. 1416 1417 @type job: L{_QueuedJob} 1418 @type op: L{opcodes.OpCode} 1419 1420 """ 1421 parts = ["Job%s" % job.id] 1422 1423 if op: 1424 parts.append(op.TinySummary()) 1425 1426 return "/".join(parts)
1427
1428 1429 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1430 """Simple class implementing a job-processing workerpool. 1431 1432 """
1433 - def __init__(self, queue):
1434 super(_JobQueueWorkerPool, self).__init__("Jq", 1435 JOBQUEUE_THREADS, 1436 _JobQueueWorker) 1437 self.queue = queue
1438
1439 1440 -class _JobDependencyManager:
1441 """Keeps track of job dependencies. 1442 1443 """ 1444 (WAIT, 1445 ERROR, 1446 CANCEL, 1447 CONTINUE, 1448 WRONGSTATUS) = range(1, 6) 1449
1450 - def __init__(self, getstatus_fn, enqueue_fn):
1451 """Initializes this class. 1452 1453 """ 1454 self._getstatus_fn = getstatus_fn 1455 self._enqueue_fn = enqueue_fn 1456 1457 self._waiters = {} 1458 self._lock = locking.SharedLock("JobDepMgr")
1459 1460 @locking.ssynchronized(_LOCK, shared=1)
1461 - def GetLockInfo(self, requested): # pylint: disable=W0613
1462 """Retrieves information about waiting jobs. 1463 1464 @type requested: set 1465 @param requested: Requested information, see C{query.LQ_*} 1466 1467 """ 1468 # No need to sort here, that's being done by the lock manager and query 1469 # library. There are no priorities for notifying jobs, hence all show up as 1470 # one item under "pending". 1471 return [("job/%s" % job_id, None, None, 1472 [("job", [job.id for job in waiters])]) 1473 for job_id, waiters in self._waiters.items() 1474 if waiters]
1475 1476 @locking.ssynchronized(_LOCK, shared=1)
1477 - def JobWaiting(self, job):
1478 """Checks if a job is waiting. 1479 1480 """ 1481 return compat.any(job in jobs 1482 for jobs in self._waiters.values())
1483 1484 @locking.ssynchronized(_LOCK)
1485 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1486 """Checks if a dependency job has the requested status. 1487 1488 If the other job is not yet in a finalized status, the calling job will be 1489 notified (re-added to the workerpool) at a later point. 1490 1491 @type job: L{_QueuedJob} 1492 @param job: Job object 1493 @type dep_job_id: int 1494 @param dep_job_id: ID of dependency job 1495 @type dep_status: list 1496 @param dep_status: Required status 1497 1498 """ 1499 assert ht.TJobId(job.id) 1500 assert ht.TJobId(dep_job_id) 1501 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1502 1503 if job.id == dep_job_id: 1504 return (self.ERROR, "Job can't depend on itself") 1505 1506 # Get status of dependency job 1507 try: 1508 status = self._getstatus_fn(dep_job_id) 1509 except errors.JobLost, err: 1510 return (self.ERROR, "Dependency error: %s" % err) 1511 1512 assert status in constants.JOB_STATUS_ALL 1513 1514 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1515 1516 if status not in constants.JOBS_FINALIZED: 1517 # Register for notification and wait for job to finish 1518 job_id_waiters.add(job) 1519 return (self.WAIT, 1520 "Need to wait for job %s, wanted status '%s'" % 1521 (dep_job_id, dep_status)) 1522 1523 # Remove from waiters list 1524 if job in job_id_waiters: 1525 job_id_waiters.remove(job) 1526 1527 if (status == constants.JOB_STATUS_CANCELED and 1528 constants.JOB_STATUS_CANCELED not in dep_status): 1529 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1530 1531 elif not dep_status or status in dep_status: 1532 return (self.CONTINUE, 1533 "Dependency job %s finished with status '%s'" % 1534 (dep_job_id, status)) 1535 1536 else: 1537 return (self.WRONGSTATUS, 1538 "Dependency job %s finished with status '%s'," 1539 " not one of '%s' as required" % 1540 (dep_job_id, status, utils.CommaJoin(dep_status)))
1541
1542 - def _RemoveEmptyWaitersUnlocked(self):
1543 """Remove all jobs without actual waiters. 1544 1545 """ 1546 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1547 if not waiters]: 1548 del self._waiters[job_id]
1549
1550 - def NotifyWaiters(self, job_id):
1551 """Notifies all jobs waiting for a certain job ID. 1552 1553 @attention: Do not call until L{CheckAndRegister} returned a status other 1554 than C{WAITDEP} for C{job_id}, or behaviour is undefined 1555 @type job_id: int 1556 @param job_id: Job ID 1557 1558 """ 1559 assert ht.TJobId(job_id) 1560 1561 self._lock.acquire() 1562 try: 1563 self._RemoveEmptyWaitersUnlocked() 1564 1565 jobs = self._waiters.pop(job_id, None) 1566 finally: 1567 self._lock.release() 1568 1569 if jobs: 1570 # Re-add jobs to workerpool 1571 logging.debug("Re-adding %s jobs which were waiting for job %s", 1572 len(jobs), job_id) 1573 self._enqueue_fn(jobs)
1574
1575 1576 -def _RequireOpenQueue(fn):
1577 """Decorator for "public" functions. 1578 1579 This function should be used for all 'public' functions. That is, 1580 functions usually called from other classes. Note that this should 1581 be applied only to methods (not plain functions), since it expects 1582 that the decorated function is called with a first argument that has 1583 a '_queue_filelock' argument. 1584 1585 @warning: Use this decorator only after locking.ssynchronized 1586 1587 Example:: 1588 @locking.ssynchronized(_LOCK) 1589 @_RequireOpenQueue 1590 def Example(self): 1591 pass 1592 1593 """ 1594 def wrapper(self, *args, **kwargs): 1595 # pylint: disable=W0212 1596 assert self._queue_filelock is not None, "Queue should be open" 1597 return fn(self, *args, **kwargs)
1598 return wrapper 1599
1600 1601 -def _RequireNonDrainedQueue(fn):
1602 """Decorator checking for a non-drained queue. 1603 1604 To be used with functions submitting new jobs. 1605 1606 """ 1607 def wrapper(self, *args, **kwargs): 1608 """Wrapper function. 1609 1610 @raise errors.JobQueueDrainError: if the job queue is marked for draining 1611 1612 """ 1613 # Ok when sharing the big job queue lock, as the drain file is created when 1614 # the lock is exclusive. 1615 # Needs access to protected member, pylint: disable=W0212 1616 if self._drained: 1617 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 1618 1619 if not self._accepting_jobs: 1620 raise errors.JobQueueError("Job queue is shutting down, refusing job") 1621 1622 return fn(self, *args, **kwargs)
1623 return wrapper 1624
1625 1626 -class JobQueue(object):
1627 """Queue used to manage the jobs. 1628 1629 """
1630 - def __init__(self, context):
1631 """Constructor for JobQueue. 1632 1633 The constructor will initialize the job queue object and then 1634 start loading the current jobs from disk, either for starting them 1635 (if they were queue) or for aborting them (if they were already 1636 running). 1637 1638 @type context: GanetiContext 1639 @param context: the context object for access to the configuration 1640 data and other ganeti objects 1641 1642 """ 1643 self.context = context 1644 self._memcache = weakref.WeakValueDictionary() 1645 self._my_hostname = netutils.Hostname.GetSysName() 1646 1647 # The Big JobQueue lock. If a code block or method acquires it in shared 1648 # mode safe it must guarantee concurrency with all the code acquiring it in 1649 # shared mode, including itself. In order not to acquire it at all 1650 # concurrency must be guaranteed with all code acquiring it in shared mode 1651 # and all code acquiring it exclusively. 1652 self._lock = locking.SharedLock("JobQueue") 1653 1654 self.acquire = self._lock.acquire 1655 self.release = self._lock.release 1656 1657 # Accept jobs by default 1658 self._accepting_jobs = True 1659 1660 # Initialize the queue, and acquire the filelock. 1661 # This ensures no other process is working on the job queue. 1662 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) 1663 1664 # Read serial file 1665 self._last_serial = jstore.ReadSerial() 1666 assert self._last_serial is not None, ("Serial file was modified between" 1667 " check in jstore and here") 1668 1669 # Get initial list of nodes 1670 self._nodes = dict((n.name, n.primary_ip) 1671 for n in self.context.cfg.GetAllNodesInfo().values() 1672 if n.master_candidate) 1673 1674 # Remove master node 1675 self._nodes.pop(self._my_hostname, None) 1676 1677 # TODO: Check consistency across nodes 1678 1679 self._queue_size = None 1680 self._UpdateQueueSizeUnlocked() 1681 assert ht.TInt(self._queue_size) 1682 self._drained = jstore.CheckDrainFlag() 1683 1684 # Job dependencies 1685 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, 1686 self._EnqueueJobs) 1687 self.context.glm.AddToLockMonitor(self.depmgr) 1688 1689 # Setup worker pool 1690 self._wpool = _JobQueueWorkerPool(self) 1691 try: 1692 self._InspectQueue() 1693 except: 1694 self._wpool.TerminateWorkers() 1695 raise
1696 1697 @locking.ssynchronized(_LOCK) 1698 @_RequireOpenQueue
1699 - def _InspectQueue(self):
1700 """Loads the whole job queue and resumes unfinished jobs. 1701 1702 This function needs the lock here because WorkerPool.AddTask() may start a 1703 job while we're still doing our work. 1704 1705 """ 1706 logging.info("Inspecting job queue") 1707 1708 restartjobs = [] 1709 1710 all_job_ids = self._GetJobIDsUnlocked() 1711 jobs_count = len(all_job_ids) 1712 lastinfo = time.time() 1713 for idx, job_id in enumerate(all_job_ids): 1714 # Give an update every 1000 jobs or 10 seconds 1715 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or 1716 idx == (jobs_count - 1)): 1717 logging.info("Job queue inspection: %d/%d (%0.1f %%)", 1718 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) 1719 lastinfo = time.time() 1720 1721 job = self._LoadJobUnlocked(job_id) 1722 1723 # a failure in loading the job can cause 'None' to be returned 1724 if job is None: 1725 continue 1726 1727 status = job.CalcStatus() 1728 1729 if status == constants.JOB_STATUS_QUEUED: 1730 restartjobs.append(job) 1731 1732 elif status in (constants.JOB_STATUS_RUNNING, 1733 constants.JOB_STATUS_WAITING, 1734 constants.JOB_STATUS_CANCELING): 1735 logging.warning("Unfinished job %s found: %s", job.id, job) 1736 1737 if status == constants.JOB_STATUS_WAITING: 1738 # Restart job 1739 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1740 restartjobs.append(job) 1741 else: 1742 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1743 "Unclean master daemon shutdown") 1744 job.Finalize() 1745 1746 self.UpdateJobUnlocked(job) 1747 1748 if restartjobs: 1749 logging.info("Restarting %s jobs", len(restartjobs)) 1750 self._EnqueueJobsUnlocked(restartjobs) 1751 1752 logging.info("Job queue inspection finished")
1753
1754 - def _GetRpc(self, address_list):
1755 """Gets RPC runner with context. 1756 1757 """ 1758 return rpc.JobQueueRunner(self.context, address_list)
1759 1760 @locking.ssynchronized(_LOCK) 1761 @_RequireOpenQueue
1762 - def AddNode(self, node):
1763 """Register a new node with the queue. 1764 1765 @type node: L{objects.Node} 1766 @param node: the node object to be added 1767 1768 """ 1769 node_name = node.name 1770 assert node_name != self._my_hostname 1771 1772 # Clean queue directory on added node 1773 result = self._GetRpc(None).call_jobqueue_purge(node_name) 1774 msg = result.fail_msg 1775 if msg: 1776 logging.warning("Cannot cleanup queue directory on node %s: %s", 1777 node_name, msg) 1778 1779 if not node.master_candidate: 1780 # remove if existing, ignoring errors 1781 self._nodes.pop(node_name, None) 1782 # and skip the replication of the job ids 1783 return 1784 1785 # Upload the whole queue excluding archived jobs 1786 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1787 1788 # Upload current serial file 1789 files.append(pathutils.JOB_QUEUE_SERIAL_FILE) 1790 1791 # Static address list 1792 addrs = [node.primary_ip] 1793 1794 for file_name in files: 1795 # Read file content 1796 content = utils.ReadFile(file_name) 1797 1798 result = _CallJqUpdate(self._GetRpc(addrs), [node_name], 1799 file_name, content) 1800 msg = result[node_name].fail_msg 1801 if msg: 1802 logging.error("Failed to upload file %s to node %s: %s", 1803 file_name, node_name, msg) 1804 1805 # Set queue drained flag 1806 result = \ 1807 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name], 1808 self._drained) 1809 msg = result[node_name].fail_msg 1810 if msg: 1811 logging.error("Failed to set queue drained flag on node %s: %s", 1812 node_name, msg) 1813 1814 self._nodes[node_name] = node.primary_ip
1815 1816 @locking.ssynchronized(_LOCK) 1817 @_RequireOpenQueue
1818 - def RemoveNode(self, node_name):
1819 """Callback called when removing nodes from the cluster. 1820 1821 @type node_name: str 1822 @param node_name: the name of the node to remove 1823 1824 """ 1825 self._nodes.pop(node_name, None)
1826 1827 @staticmethod
1828 - def _CheckRpcResult(result, nodes, failmsg):
1829 """Verifies the status of an RPC call. 1830 1831 Since we aim to keep consistency should this node (the current 1832 master) fail, we will log errors if our rpc fail, and especially 1833 log the case when more than half of the nodes fails. 1834 1835 @param result: the data as returned from the rpc call 1836 @type nodes: list 1837 @param nodes: the list of nodes we made the call to 1838 @type failmsg: str 1839 @param failmsg: the identifier to be used for logging 1840 1841 """ 1842 failed = [] 1843 success = [] 1844 1845 for node in nodes: 1846 msg = result[node].fail_msg 1847 if msg: 1848 failed.append(node) 1849 logging.error("RPC call %s (%s) failed on node %s: %s", 1850 result[node].call, failmsg, node, msg) 1851 else: 1852 success.append(node) 1853 1854 # +1 for the master node 1855 if (len(success) + 1) < len(failed): 1856 # TODO: Handle failing nodes 1857 logging.error("More than half of the nodes failed")
1858
1859 - def _GetNodeIp(self):
1860 """Helper for returning the node name/ip list. 1861 1862 @rtype: (list, list) 1863 @return: a tuple of two lists, the first one with the node 1864 names and the second one with the node addresses 1865 1866 """ 1867 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1868 name_list = self._nodes.keys() 1869 addr_list = [self._nodes[name] for name in name_list] 1870 return name_list, addr_list
1871
1872 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1873 """Writes a file locally and then replicates it to all nodes. 1874 1875 This function will replace the contents of a file on the local 1876 node and then replicate it to all the other nodes we have. 1877 1878 @type file_name: str 1879 @param file_name: the path of the file to be replicated 1880 @type data: str 1881 @param data: the new contents of the file 1882 @type replicate: boolean 1883 @param replicate: whether to spread the changes to the remote nodes 1884 1885 """ 1886 getents = runtime.GetEnts() 1887 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1888 gid=getents.daemons_gid, 1889 mode=constants.JOB_QUEUE_FILES_PERMS) 1890 1891 if replicate: 1892 names, addrs = self._GetNodeIp() 1893 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) 1894 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1895
1896 - def _RenameFilesUnlocked(self, rename):
1897 """Renames a file locally and then replicate the change. 1898 1899 This function will rename a file in the local queue directory 1900 and then replicate this rename to all the other nodes we have. 1901 1902 @type rename: list of (old, new) 1903 @param rename: List containing tuples mapping old to new names 1904 1905 """ 1906 # Rename them locally 1907 for old, new in rename: 1908 utils.RenameFile(old, new, mkdir=True) 1909 1910 # ... and on all nodes 1911 names, addrs = self._GetNodeIp() 1912 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1913 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1914
1915 - def _NewSerialsUnlocked(self, count):
1916 """Generates a new job identifier. 1917 1918 Job identifiers are unique during the lifetime of a cluster. 1919 1920 @type count: integer 1921 @param count: how many serials to return 1922 @rtype: list of int 1923 @return: a list of job identifiers. 1924 1925 """ 1926 assert ht.TNonNegativeInt(count) 1927 1928 # New number 1929 serial = self._last_serial + count 1930 1931 # Write to file 1932 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE, 1933 "%s\n" % serial, True) 1934 1935 result = [jstore.FormatJobID(v) 1936 for v in range(self._last_serial + 1, serial + 1)] 1937 1938 # Keep it only if we were able to write the file 1939 self._last_serial = serial 1940 1941 assert len(result) == count 1942 1943 return result
1944 1945 @staticmethod
1946 - def _GetJobPath(job_id):
1947 """Returns the job file for a given job id. 1948 1949 @type job_id: str 1950 @param job_id: the job identifier 1951 @rtype: str 1952 @return: the path to the job file 1953 1954 """ 1955 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1956 1957 @staticmethod
1958 - def _GetArchivedJobPath(job_id):
1959 """Returns the archived job file for a give job id. 1960 1961 @type job_id: str 1962 @param job_id: the job identifier 1963 @rtype: str 1964 @return: the path to the archived job file 1965 1966 """ 1967 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, 1968 jstore.GetArchiveDirectory(job_id), 1969 "job-%s" % job_id)
1970 1971 @staticmethod
1972 - def _DetermineJobDirectories(archived):
1973 """Build list of directories containing job files. 1974 1975 @type archived: bool 1976 @param archived: Whether to include directories for archived jobs 1977 @rtype: list 1978 1979 """ 1980 result = [pathutils.QUEUE_DIR] 1981 1982 if archived: 1983 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR 1984 result.extend(map(compat.partial(utils.PathJoin, archive_path), 1985 utils.ListVisibleFiles(archive_path))) 1986 1987 return result
1988 1989 @classmethod
1990 - def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1991 """Return all known job IDs. 1992 1993 The method only looks at disk because it's a requirement that all 1994 jobs are present on disk (so in the _memcache we don't have any 1995 extra IDs). 1996 1997 @type sort: boolean 1998 @param sort: perform sorting on the returned job ids 1999 @rtype: list 2000 @return: the list of job IDs 2001 2002 """ 2003 jlist = [] 2004 2005 for path in cls._DetermineJobDirectories(archived): 2006 for filename in utils.ListVisibleFiles(path): 2007 m = constants.JOB_FILE_RE.match(filename) 2008 if m: 2009 jlist.append(int(m.group(1))) 2010 2011 if sort: 2012 jlist.sort() 2013 return jlist
2014
2015 - def _LoadJobUnlocked(self, job_id):
2016 """Loads a job from the disk or memory. 2017 2018 Given a job id, this will return the cached job object if 2019 existing, or try to load the job from the disk. If loading from 2020 disk, it will also add the job to the cache. 2021 2022 @type job_id: int 2023 @param job_id: the job id 2024 @rtype: L{_QueuedJob} or None 2025 @return: either None or the job object 2026 2027 """ 2028 job = self._memcache.get(job_id, None) 2029 if job: 2030 logging.debug("Found job %s in memcache", job_id) 2031 assert job.writable, "Found read-only job in memcache" 2032 return job 2033 2034 try: 2035 job = self._LoadJobFromDisk(job_id, False) 2036 if job is None: 2037 return job 2038 except errors.JobFileCorrupted: 2039 old_path = self._GetJobPath(job_id) 2040 new_path = self._GetArchivedJobPath(job_id) 2041 if old_path == new_path: 2042 # job already archived (future case) 2043 logging.exception("Can't parse job %s", job_id) 2044 else: 2045 # non-archived case 2046 logging.exception("Can't parse job %s, will archive.", job_id) 2047 self._RenameFilesUnlocked([(old_path, new_path)]) 2048 return None 2049 2050 assert job.writable, "Job just loaded is not writable" 2051 2052 self._memcache[job_id] = job 2053 logging.debug("Added job %s to the cache", job_id) 2054 return job
2055
2056 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2057 """Load the given job file from disk. 2058 2059 Given a job file, read, load and restore it in a _QueuedJob format. 2060 2061 @type job_id: int 2062 @param job_id: job identifier 2063 @type try_archived: bool 2064 @param try_archived: Whether to try loading an archived job 2065 @rtype: L{_QueuedJob} or None 2066 @return: either None or the job object 2067 2068 """ 2069 path_functions = [(self._GetJobPath, False)] 2070 2071 if try_archived: 2072 path_functions.append((self._GetArchivedJobPath, True)) 2073 2074 raw_data = None 2075 archived = None 2076 2077 for (fn, archived) in path_functions: 2078 filepath = fn(job_id) 2079 logging.debug("Loading job from %s", filepath) 2080 try: 2081 raw_data = utils.ReadFile(filepath) 2082 except EnvironmentError, err: 2083 if err.errno != errno.ENOENT: 2084 raise 2085 else: 2086 break 2087 2088 if not raw_data: 2089 return None 2090 2091 if writable is None: 2092 writable = not archived 2093 2094 try: 2095 data = serializer.LoadJson(raw_data) 2096 job = _QueuedJob.Restore(self, data, writable, archived) 2097 except Exception, err: # pylint: disable=W0703 2098 raise errors.JobFileCorrupted(err) 2099 2100 return job
2101
2102 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2103 """Load the given job file from disk. 2104 2105 Given a job file, read, load and restore it in a _QueuedJob format. 2106 In case of error reading the job, it gets returned as None, and the 2107 exception is logged. 2108 2109 @type job_id: int 2110 @param job_id: job identifier 2111 @type try_archived: bool 2112 @param try_archived: Whether to try loading an archived job 2113 @rtype: L{_QueuedJob} or None 2114 @return: either None or the job object 2115 2116 """ 2117 try: 2118 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 2119 except (errors.JobFileCorrupted, EnvironmentError): 2120 logging.exception("Can't load/parse job %s", job_id) 2121 return None
2122
2123 - def _UpdateQueueSizeUnlocked(self):
2124 """Update the queue size. 2125 2126 """ 2127 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2128 2129 @locking.ssynchronized(_LOCK) 2130 @_RequireOpenQueue
2131 - def SetDrainFlag(self, drain_flag):
2132 """Sets the drain flag for the queue. 2133 2134 @type drain_flag: boolean 2135 @param drain_flag: Whether to set or unset the drain flag 2136 2137 """ 2138 # Change flag locally 2139 jstore.SetDrainFlag(drain_flag) 2140 2141 self._drained = drain_flag 2142 2143 # ... and on all nodes 2144 (names, addrs) = self._GetNodeIp() 2145 result = \ 2146 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag) 2147 self._CheckRpcResult(result, self._nodes, 2148 "Setting queue drain flag to %s" % drain_flag) 2149 2150 return True
2151 2152 @_RequireOpenQueue
2153 - def _SubmitJobUnlocked(self, job_id, ops):
2154 """Create and store a new job. 2155 2156 This enters the job into our job queue and also puts it on the new 2157 queue, in order for it to be picked up by the queue processors. 2158 2159 @type job_id: job ID 2160 @param job_id: the job ID for the new job 2161 @type ops: list 2162 @param ops: The list of OpCodes that will become the new job. 2163 @rtype: L{_QueuedJob} 2164 @return: the job object to be queued 2165 @raise errors.JobQueueFull: if the job queue has too many jobs in it 2166 @raise errors.GenericError: If an opcode is not valid 2167 2168 """ 2169 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: 2170 raise errors.JobQueueFull() 2171 2172 job = _QueuedJob(self, job_id, ops, True) 2173 2174 for idx, op in enumerate(job.ops): 2175 # Check priority 2176 if op.priority not in constants.OP_PRIO_SUBMIT_VALID: 2177 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2178 raise errors.GenericError("Opcode %s has invalid priority %s, allowed" 2179 " are %s" % (idx, op.priority, allowed)) 2180 2181 # Check job dependencies 2182 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) 2183 if not opcodes.TNoRelativeJobDependencies(dependencies): 2184 raise errors.GenericError("Opcode %s has invalid dependencies, must" 2185 " match %s: %s" % 2186 (idx, opcodes.TNoRelativeJobDependencies, 2187 dependencies)) 2188 2189 # Write to disk 2190 self.UpdateJobUnlocked(job) 2191 2192 self._queue_size += 1 2193 2194 logging.debug("Adding new job %s to the cache", job_id) 2195 self._memcache[job_id] = job 2196 2197 return job
2198 2199 @locking.ssynchronized(_LOCK) 2200 @_RequireOpenQueue 2201 @_RequireNonDrainedQueue
2202 - def SubmitJob(self, ops):
2203 """Create and store a new job. 2204 2205 @see: L{_SubmitJobUnlocked} 2206 2207 """ 2208 (job_id, ) = self._NewSerialsUnlocked(1) 2209 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) 2210 return job_id
2211 2212 @locking.ssynchronized(_LOCK) 2213 @_RequireOpenQueue 2214 @_RequireNonDrainedQueue
2215 - def SubmitManyJobs(self, jobs):
2216 """Create and store multiple jobs. 2217 2218 @see: L{_SubmitJobUnlocked} 2219 2220 """ 2221 all_job_ids = self._NewSerialsUnlocked(len(jobs)) 2222 2223 (results, added_jobs) = \ 2224 self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) 2225 2226 self._EnqueueJobsUnlocked(added_jobs) 2227 2228 return results
2229 2230 @staticmethod
2231 - def _FormatSubmitError(msg, ops):
2232 """Formats errors which occurred while submitting a job. 2233 2234 """ 2235 return ("%s; opcodes %s" % 2236 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2237 2238 @staticmethod
2239 - def _ResolveJobDependencies(resolve_fn, deps):
2240 """Resolves relative job IDs in dependencies. 2241 2242 @type resolve_fn: callable 2243 @param resolve_fn: Function to resolve a relative job ID 2244 @type deps: list 2245 @param deps: Dependencies 2246 @rtype: tuple; (boolean, string or list) 2247 @return: If successful (first tuple item), the returned list contains 2248 resolved job IDs along with the requested status; if not successful, 2249 the second element is an error message 2250 2251 """ 2252 result = [] 2253 2254 for (dep_job_id, dep_status) in deps: 2255 if ht.TRelativeJobId(dep_job_id): 2256 assert ht.TInt(dep_job_id) and dep_job_id < 0 2257 try: 2258 job_id = resolve_fn(dep_job_id) 2259 except IndexError: 2260 # Abort 2261 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 2262 else: 2263 job_id = dep_job_id 2264 2265 result.append((job_id, dep_status)) 2266 2267 return (True, result)
2268
2269 - def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2270 """Create and store multiple jobs. 2271 2272 @see: L{_SubmitJobUnlocked} 2273 2274 """ 2275 results = [] 2276 added_jobs = [] 2277 2278 def resolve_fn(job_idx, reljobid): 2279 assert reljobid < 0 2280 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2281 2282 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): 2283 for op in ops: 2284 if getattr(op, opcodes.DEPEND_ATTR, None): 2285 (status, data) = \ 2286 self._ResolveJobDependencies(compat.partial(resolve_fn, idx), 2287 op.depends) 2288 if not status: 2289 # Abort resolving dependencies 2290 assert ht.TNonEmptyString(data), "No error message" 2291 break 2292 # Use resolved dependencies 2293 op.depends = data 2294 else: 2295 try: 2296 job = self._SubmitJobUnlocked(job_id, ops) 2297 except errors.GenericError, err: 2298 status = False 2299 data = self._FormatSubmitError(str(err), ops) 2300 else: 2301 status = True 2302 data = job_id 2303 added_jobs.append(job) 2304 2305 results.append((status, data)) 2306 2307 return (results, added_jobs)
2308 2309 @locking.ssynchronized(_LOCK)
2310 - def _EnqueueJobs(self, jobs):
2311 """Helper function to add jobs to worker pool's queue. 2312 2313 @type jobs: list 2314 @param jobs: List of all jobs 2315 2316 """ 2317 return self._EnqueueJobsUnlocked(jobs)
2318
2319 - def _EnqueueJobsUnlocked(self, jobs):
2320 """Helper function to add jobs to worker pool's queue. 2321 2322 @type jobs: list 2323 @param jobs: List of all jobs 2324 2325 """ 2326 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" 2327 self._wpool.AddManyTasks([(job, ) for job in jobs], 2328 priority=[job.CalcPriority() for job in jobs], 2329 task_id=map(_GetIdAttr, jobs))
2330
2331 - def _GetJobStatusForDependencies(self, job_id):
2332 """Gets the status of a job for dependencies. 2333 2334 @type job_id: int 2335 @param job_id: Job ID 2336 @raise errors.JobLost: If job can't be found 2337 2338 """ 2339 # Not using in-memory cache as doing so would require an exclusive lock 2340 2341 # Try to load from disk 2342 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2343 2344 assert not job.writable, "Got writable job" # pylint: disable=E1101 2345 2346 if job: 2347 return job.CalcStatus() 2348 2349 raise errors.JobLost("Job %s not found" % job_id)
2350 2351 @_RequireOpenQueue
2352 - def UpdateJobUnlocked(self, job, replicate=True):
2353 """Update a job's on disk storage. 2354 2355 After a job has been modified, this function needs to be called in 2356 order to write the changes to disk and replicate them to the other 2357 nodes. 2358 2359 @type job: L{_QueuedJob} 2360 @param job: the changed job 2361 @type replicate: boolean 2362 @param replicate: whether to replicate the change to remote nodes 2363 2364 """ 2365 if __debug__: 2366 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 2367 assert (finalized ^ (job.end_timestamp is None)) 2368 assert job.writable, "Can't update read-only job" 2369 assert not job.archived, "Can't update archived job" 2370 2371 filename = self._GetJobPath(job.id) 2372 data = serializer.DumpJson(job.Serialize()) 2373 logging.debug("Writing job %s to %s", job.id, filename) 2374 self._UpdateJobQueueFile(filename, data, replicate)
2375
2376 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 2377 timeout):
2378 """Waits for changes in a job. 2379 2380 @type job_id: int 2381 @param job_id: Job identifier 2382 @type fields: list of strings 2383 @param fields: Which fields to check for changes 2384 @type prev_job_info: list or None 2385 @param prev_job_info: Last job information returned 2386 @type prev_log_serial: int 2387 @param prev_log_serial: Last job message serial number 2388 @type timeout: float 2389 @param timeout: maximum time to wait in seconds 2390 @rtype: tuple (job info, log entries) 2391 @return: a tuple of the job information as required via 2392 the fields parameter, and the log entries as a list 2393 2394 if the job has not changed and the timeout has expired, 2395 we instead return a special value, 2396 L{constants.JOB_NOTCHANGED}, which should be interpreted 2397 as such by the clients 2398 2399 """ 2400 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True, 2401 writable=False) 2402 2403 helper = _WaitForJobChangesHelper() 2404 2405 return helper(self._GetJobPath(job_id), load_fn, 2406 fields, prev_job_info, prev_log_serial, timeout)
2407 2408 @locking.ssynchronized(_LOCK) 2409 @_RequireOpenQueue
2410 - def CancelJob(self, job_id):
2411 """Cancels a job. 2412 2413 This will only succeed if the job has not started yet. 2414 2415 @type job_id: int 2416 @param job_id: job ID of job to be cancelled. 2417 2418 """ 2419 logging.info("Cancelling job %s", job_id) 2420 2421 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2422 2423 @locking.ssynchronized(_LOCK) 2424 @_RequireOpenQueue
2425 - def ChangeJobPriority(self, job_id, priority):
2426 """Changes a job's priority. 2427 2428 @type job_id: int 2429 @param job_id: ID of the job whose priority should be changed 2430 @type priority: int 2431 @param priority: New priority 2432 2433 """ 2434 logging.info("Changing priority of job %s to %s", job_id, priority) 2435 2436 if priority not in constants.OP_PRIO_SUBMIT_VALID: 2437 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2438 raise errors.GenericError("Invalid priority %s, allowed are %s" % 2439 (priority, allowed)) 2440 2441 def fn(job): 2442 (success, msg) = job.ChangePriority(priority) 2443 2444 if success: 2445 try: 2446 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) 2447 except workerpool.NoSuchTask: 2448 logging.debug("Job %s is not in workerpool at this time", job.id) 2449 2450 return (success, msg)
2451 2452 return self._ModifyJobUnlocked(job_id, fn) 2453
2454 - def _ModifyJobUnlocked(self, job_id, mod_fn):
2455 """Modifies a job. 2456 2457 @type job_id: int 2458 @param job_id: Job ID 2459 @type mod_fn: callable 2460 @param mod_fn: Modifying function, receiving job object as parameter, 2461 returning tuple of (status boolean, message string) 2462 2463 """ 2464 job = self._LoadJobUnlocked(job_id) 2465 if not job: 2466 logging.debug("Job %s not found", job_id) 2467 return (False, "Job %s not found" % job_id) 2468 2469 assert job.writable, "Can't modify read-only job" 2470 assert not job.archived, "Can't modify archived job" 2471 2472 (success, msg) = mod_fn(job) 2473 2474 if success: 2475 # If the job was finalized (e.g. cancelled), this is the final write 2476 # allowed. The job can be archived anytime. 2477 self.UpdateJobUnlocked(job) 2478 2479 return (success, msg)
2480 2481 @_RequireOpenQueue
2482 - def _ArchiveJobsUnlocked(self, jobs):
2483 """Archives jobs. 2484 2485 @type jobs: list of L{_QueuedJob} 2486 @param jobs: Job objects 2487 @rtype: int 2488 @return: Number of archived jobs 2489 2490 """ 2491 archive_jobs = [] 2492 rename_files = [] 2493 for job in jobs: 2494 assert job.writable, "Can't archive read-only job" 2495 assert not job.archived, "Can't cancel archived job" 2496 2497 if job.CalcStatus() not in constants.JOBS_FINALIZED: 2498 logging.debug("Job %s is not yet done", job.id) 2499 continue 2500 2501 archive_jobs.append(job) 2502 2503 old = self._GetJobPath(job.id) 2504 new = self._GetArchivedJobPath(job.id) 2505 rename_files.append((old, new)) 2506 2507 # TODO: What if 1..n files fail to rename? 2508 self._RenameFilesUnlocked(rename_files) 2509 2510 logging.debug("Successfully archived job(s) %s", 2511 utils.CommaJoin(job.id for job in archive_jobs)) 2512 2513 # Since we haven't quite checked, above, if we succeeded or failed renaming 2514 # the files, we update the cached queue size from the filesystem. When we 2515 # get around to fix the TODO: above, we can use the number of actually 2516 # archived jobs to fix this. 2517 self._UpdateQueueSizeUnlocked() 2518 return len(archive_jobs)
2519 2520 @locking.ssynchronized(_LOCK) 2521 @_RequireOpenQueue
2522 - def ArchiveJob(self, job_id):
2523 """Archives a job. 2524 2525 This is just a wrapper over L{_ArchiveJobsUnlocked}. 2526 2527 @type job_id: int 2528 @param job_id: Job ID of job to be archived. 2529 @rtype: bool 2530 @return: Whether job was archived 2531 2532 """ 2533 logging.info("Archiving job %s", job_id) 2534 2535 job = self._LoadJobUnlocked(job_id) 2536 if not job: 2537 logging.debug("Job %s not found", job_id) 2538 return False 2539 2540 return self._ArchiveJobsUnlocked([job]) == 1
2541 2542 @locking.ssynchronized(_LOCK) 2543 @_RequireOpenQueue
2544 - def AutoArchiveJobs(self, age, timeout):
2545 """Archives all jobs based on age. 2546 2547 The method will archive all jobs which are older than the age 2548 parameter. For jobs that don't have an end timestamp, the start 2549 timestamp will be considered. The special '-1' age will cause 2550 archival of all jobs (that are not running or queued). 2551 2552 @type age: int 2553 @param age: the minimum age in seconds 2554 2555 """ 2556 logging.info("Archiving jobs with age more than %s seconds", age) 2557 2558 now = time.time() 2559 end_time = now + timeout 2560 archived_count = 0 2561 last_touched = 0 2562 2563 all_job_ids = self._GetJobIDsUnlocked() 2564 pending = [] 2565 for idx, job_id in enumerate(all_job_ids): 2566 last_touched = idx + 1 2567 2568 # Not optimal because jobs could be pending 2569 # TODO: Measure average duration for job archival and take number of 2570 # pending jobs into account. 2571 if time.time() > end_time: 2572 break 2573 2574 # Returns None if the job failed to load 2575 job = self._LoadJobUnlocked(job_id) 2576 if job: 2577 if job.end_timestamp is None: 2578 if job.start_timestamp is None: 2579 job_age = job.received_timestamp 2580 else: 2581 job_age = job.start_timestamp 2582 else: 2583 job_age = job.end_timestamp 2584 2585 if age == -1 or now - job_age[0] > age: 2586 pending.append(job) 2587 2588 # Archive 10 jobs at a time 2589 if len(pending) >= 10: 2590 archived_count += self._ArchiveJobsUnlocked(pending) 2591 pending = [] 2592 2593 if pending: 2594 archived_count += self._ArchiveJobsUnlocked(pending) 2595 2596 return (archived_count, len(all_job_ids) - last_touched)
2597
2598 - def _Query(self, fields, qfilter):
2599 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, 2600 namefield="id") 2601 2602 # Archived jobs are only looked at if the "archived" field is referenced 2603 # either as a requested field or in the filter. By default archived jobs 2604 # are ignored. 2605 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData()) 2606 2607 job_ids = qobj.RequestedNames() 2608 2609 list_all = (job_ids is None) 2610 2611 if list_all: 2612 # Since files are added to/removed from the queue atomically, there's no 2613 # risk of getting the job ids in an inconsistent state. 2614 job_ids = self._GetJobIDsUnlocked(archived=include_archived) 2615 2616 jobs = [] 2617 2618 for job_id in job_ids: 2619 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2620 if job is not None or not list_all: 2621 jobs.append((job_id, job)) 2622 2623 return (qobj, jobs, list_all)
2624
2625 - def QueryJobs(self, fields, qfilter):
2626 """Returns a list of jobs in queue. 2627 2628 @type fields: sequence 2629 @param fields: List of wanted fields 2630 @type qfilter: None or query2 filter (list) 2631 @param qfilter: Query filter 2632 2633 """ 2634 (qobj, ctx, _) = self._Query(fields, qfilter) 2635 2636 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2637
2638 - def OldStyleQueryJobs(self, job_ids, fields):
2639 """Returns a list of jobs in queue. 2640 2641 @type job_ids: list 2642 @param job_ids: sequence of job identifiers or None for all 2643 @type fields: list 2644 @param fields: names of fields to return 2645 @rtype: list 2646 @return: list one element per job, each element being list with 2647 the requested fields 2648 2649 """ 2650 # backwards compat: 2651 job_ids = [int(jid) for jid in job_ids] 2652 qfilter = qlang.MakeSimpleFilter("id", job_ids) 2653 2654 (qobj, ctx, _) = self._Query(fields, qfilter) 2655 2656 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2657 2658 @locking.ssynchronized(_LOCK)
2659 - def PrepareShutdown(self):
2660 """Prepare to stop the job queue. 2661 2662 Disables execution of jobs in the workerpool and returns whether there are 2663 any jobs currently running. If the latter is the case, the job queue is not 2664 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can 2665 be called without interfering with any job. Queued and unfinished jobs will 2666 be resumed next time. 2667 2668 Once this function has been called no new job submissions will be accepted 2669 (see L{_RequireNonDrainedQueue}). 2670 2671 @rtype: bool 2672 @return: Whether there are any running jobs 2673 2674 """ 2675 if self._accepting_jobs: 2676 self._accepting_jobs = False 2677 2678 # Tell worker pool to stop processing pending tasks 2679 self._wpool.SetActive(False) 2680 2681 return self._wpool.HasRunningTasks()
2682
2683 - def AcceptingJobsUnlocked(self):
2684 """Returns whether jobs are accepted. 2685 2686 Once L{PrepareShutdown} has been called, no new jobs are accepted and the 2687 queue is shutting down. 2688 2689 @rtype: bool 2690 2691 """ 2692 return self._accepting_jobs
2693 2694 @locking.ssynchronized(_LOCK) 2695 @_RequireOpenQueue
2696 - def Shutdown(self):
2697 """Stops the job queue. 2698 2699 This shutdowns all the worker threads an closes the queue. 2700 2701 """ 2702 self._wpool.TerminateWorkers() 2703 2704 self._queue_filelock.Close() 2705 self._queue_filelock = None
2706