Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Module implementing the job queue handling. 
  32   
  33  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  34  used by all other classes in this module. 
  35   
  36  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  37      processing jobs 
  38   
  39  """ 
  40   
  41  import logging 
  42  import errno 
  43  import time 
  44  import weakref 
  45  import threading 
  46  import itertools 
  47  import operator 
  48   
  49  try: 
  50    # pylint: disable=E0611 
  51    from pyinotify import pyinotify 
  52  except ImportError: 
  53    import pyinotify 
  54   
  55  from ganeti import asyncnotifier 
  56  from ganeti import constants 
  57  from ganeti import serializer 
  58  from ganeti import workerpool 
  59  from ganeti import locking 
  60  from ganeti import luxi 
  61  from ganeti import opcodes 
  62  from ganeti import opcodes_base 
  63  from ganeti import errors 
  64  from ganeti import mcpu 
  65  from ganeti import utils 
  66  from ganeti import jstore 
  67  import ganeti.rpc.node as rpc 
  68  from ganeti import runtime 
  69  from ganeti import netutils 
  70  from ganeti import compat 
  71  from ganeti import ht 
  72  from ganeti import query 
  73  from ganeti import qlang 
  74  from ganeti import pathutils 
  75  from ganeti import vcluster 
  76   
  77   
  78  JOBQUEUE_THREADS = 25 
  79   
  80  # member lock names to be passed to @ssynchronized decorator 
  81  _LOCK = "_lock" 
  82  _QUEUE = "_queue" 
  83   
  84  #: Retrieves "id" attribute 
  85  _GetIdAttr = operator.attrgetter("id") 
86 87 88 -class CancelJob(Exception):
89 """Special exception to cancel a job. 90 91 """
92
93 94 -class QueueShutdown(Exception):
95 """Special exception to abort a job when the job queue is shutting down. 96 97 """
98
99 100 -def TimeStampNow():
101 """Returns the current timestamp. 102 103 @rtype: tuple 104 @return: the current time in the (seconds, microseconds) format 105 106 """ 107 return utils.SplitTime(time.time())
108
109 110 -def _CallJqUpdate(runner, names, file_name, content):
111 """Updates job queue file after virtualizing filename. 112 113 """ 114 virt_file_name = vcluster.MakeVirtualPath(file_name) 115 return runner.call_jobqueue_update(names, virt_file_name, content)
116
117 118 -class _SimpleJobQuery:
119 """Wrapper for job queries. 120 121 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}. 122 123 """
124 - def __init__(self, fields):
125 """Initializes this class. 126 127 """ 128 self._query = query.Query(query.JOB_FIELDS, fields)
129
130 - def __call__(self, job):
131 """Executes a job query using cached field list. 132 133 """ 134 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
135
136 137 -class _QueuedOpCode(object):
138 """Encapsulates an opcode object. 139 140 @ivar log: holds the execution log and consists of tuples 141 of the form C{(log_serial, timestamp, level, message)} 142 @ivar input: the OpCode we encapsulate 143 @ivar status: the current status 144 @ivar result: the result of the LU execution 145 @ivar start_timestamp: timestamp for the start of the execution 146 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 147 @ivar stop_timestamp: timestamp for the end of the execution 148 149 """ 150 __slots__ = ["input", "status", "result", "log", "priority", 151 "start_timestamp", "exec_timestamp", "end_timestamp", 152 "__weakref__"] 153
154 - def __init__(self, op):
155 """Initializes instances of this class. 156 157 @type op: L{opcodes.OpCode} 158 @param op: the opcode we encapsulate 159 160 """ 161 self.input = op 162 self.status = constants.OP_STATUS_QUEUED 163 self.result = None 164 self.log = [] 165 self.start_timestamp = None 166 self.exec_timestamp = None 167 self.end_timestamp = None 168 169 # Get initial priority (it might change during the lifetime of this opcode) 170 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
171 172 @classmethod
173 - def Restore(cls, state):
174 """Restore the _QueuedOpCode from the serialized form. 175 176 @type state: dict 177 @param state: the serialized state 178 @rtype: _QueuedOpCode 179 @return: a new _QueuedOpCode instance 180 181 """ 182 obj = _QueuedOpCode.__new__(cls) 183 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 184 obj.status = state["status"] 185 obj.result = state["result"] 186 obj.log = state["log"] 187 obj.start_timestamp = state.get("start_timestamp", None) 188 obj.exec_timestamp = state.get("exec_timestamp", None) 189 obj.end_timestamp = state.get("end_timestamp", None) 190 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 191 return obj
192
193 - def Serialize(self):
194 """Serializes this _QueuedOpCode. 195 196 @rtype: dict 197 @return: the dictionary holding the serialized state 198 199 """ 200 return { 201 "input": self.input.__getstate__(), 202 "status": self.status, 203 "result": self.result, 204 "log": self.log, 205 "start_timestamp": self.start_timestamp, 206 "exec_timestamp": self.exec_timestamp, 207 "end_timestamp": self.end_timestamp, 208 "priority": self.priority, 209 }
210
211 212 -class _QueuedJob(object):
213 """In-memory job representation. 214 215 This is what we use to track the user-submitted jobs. Locking must 216 be taken care of by users of this class. 217 218 @type queue: L{JobQueue} 219 @ivar queue: the parent queue 220 @ivar id: the job ID 221 @type ops: list 222 @ivar ops: the list of _QueuedOpCode that constitute the job 223 @type log_serial: int 224 @ivar log_serial: holds the index for the next log entry 225 @ivar received_timestamp: the timestamp for when the job was received 226 @ivar start_timestmap: the timestamp for start of execution 227 @ivar end_timestamp: the timestamp for end of execution 228 @ivar writable: Whether the job is allowed to be modified 229 230 """ 231 # pylint: disable=W0212 232 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 233 "received_timestamp", "start_timestamp", "end_timestamp", 234 "__weakref__", "processor_lock", "writable", "archived"] 235
236 - def AddReasons(self, pickup=False):
237 """Extend the reason trail 238 239 Add the reason for all the opcodes of this job to be executed. 240 241 """ 242 count = 0 243 for queued_op in self.ops: 244 op = queued_op.input 245 if pickup: 246 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP 247 else: 248 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE 249 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__, 250 reason_src_prefix) 251 reason_text = "job=%d;index=%d" % (self.id, count) 252 reason = getattr(op, "reason", []) 253 reason.append((reason_src, reason_text, utils.EpochNano())) 254 op.reason = reason 255 count = count + 1
256
257 - def __init__(self, queue, job_id, ops, writable):
258 """Constructor for the _QueuedJob. 259 260 @type queue: L{JobQueue} 261 @param queue: our parent queue 262 @type job_id: job_id 263 @param job_id: our job id 264 @type ops: list 265 @param ops: the list of opcodes we hold, which will be encapsulated 266 in _QueuedOpCodes 267 @type writable: bool 268 @param writable: Whether job can be modified 269 270 """ 271 if not ops: 272 raise errors.GenericError("A job needs at least one opcode") 273 274 self.queue = queue 275 self.id = int(job_id) 276 self.ops = [_QueuedOpCode(op) for op in ops] 277 self.AddReasons() 278 self.log_serial = 0 279 self.received_timestamp = TimeStampNow() 280 self.start_timestamp = None 281 self.end_timestamp = None 282 self.archived = False 283 284 self._InitInMemory(self, writable) 285 286 assert not self.archived, "New jobs can not be marked as archived"
287 288 @staticmethod
289 - def _InitInMemory(obj, writable):
290 """Initializes in-memory variables. 291 292 """ 293 obj.writable = writable 294 obj.ops_iter = None 295 obj.cur_opctx = None 296 297 # Read-only jobs are not processed and therefore don't need a lock 298 if writable: 299 obj.processor_lock = threading.Lock() 300 else: 301 obj.processor_lock = None
302
303 - def __repr__(self):
304 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 305 "id=%s" % self.id, 306 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 307 308 return "<%s at %#x>" % (" ".join(status), id(self))
309 310 @classmethod
311 - def Restore(cls, queue, state, writable, archived):
312 """Restore a _QueuedJob from serialized state: 313 314 @type queue: L{JobQueue} 315 @param queue: to which queue the restored job belongs 316 @type state: dict 317 @param state: the serialized state 318 @type writable: bool 319 @param writable: Whether job can be modified 320 @type archived: bool 321 @param archived: Whether job was already archived 322 @rtype: _JobQueue 323 @return: the restored _JobQueue instance 324 325 """ 326 obj = _QueuedJob.__new__(cls) 327 obj.queue = queue 328 obj.id = int(state["id"]) 329 obj.received_timestamp = state.get("received_timestamp", None) 330 obj.start_timestamp = state.get("start_timestamp", None) 331 obj.end_timestamp = state.get("end_timestamp", None) 332 obj.archived = archived 333 334 obj.ops = [] 335 obj.log_serial = 0 336 for op_state in state["ops"]: 337 op = _QueuedOpCode.Restore(op_state) 338 for log_entry in op.log: 339 obj.log_serial = max(obj.log_serial, log_entry[0]) 340 obj.ops.append(op) 341 342 cls._InitInMemory(obj, writable) 343 344 return obj
345
346 - def Serialize(self):
347 """Serialize the _JobQueue instance. 348 349 @rtype: dict 350 @return: the serialized state 351 352 """ 353 return { 354 "id": self.id, 355 "ops": [op.Serialize() for op in self.ops], 356 "start_timestamp": self.start_timestamp, 357 "end_timestamp": self.end_timestamp, 358 "received_timestamp": self.received_timestamp, 359 }
360
361 - def CalcStatus(self):
362 """Compute the status of this job. 363 364 This function iterates over all the _QueuedOpCodes in the job and 365 based on their status, computes the job status. 366 367 The algorithm is: 368 - if we find a cancelled, or finished with error, the job 369 status will be the same 370 - otherwise, the last opcode with the status one of: 371 - waitlock 372 - canceling 373 - running 374 375 will determine the job status 376 377 - otherwise, it means either all opcodes are queued, or success, 378 and the job status will be the same 379 380 @return: the job status 381 382 """ 383 status = constants.JOB_STATUS_QUEUED 384 385 all_success = True 386 for op in self.ops: 387 if op.status == constants.OP_STATUS_SUCCESS: 388 continue 389 390 all_success = False 391 392 if op.status == constants.OP_STATUS_QUEUED: 393 pass 394 elif op.status == constants.OP_STATUS_WAITING: 395 status = constants.JOB_STATUS_WAITING 396 elif op.status == constants.OP_STATUS_RUNNING: 397 status = constants.JOB_STATUS_RUNNING 398 elif op.status == constants.OP_STATUS_CANCELING: 399 status = constants.JOB_STATUS_CANCELING 400 break 401 elif op.status == constants.OP_STATUS_ERROR: 402 status = constants.JOB_STATUS_ERROR 403 # The whole job fails if one opcode failed 404 break 405 elif op.status == constants.OP_STATUS_CANCELED: 406 status = constants.OP_STATUS_CANCELED 407 break 408 409 if all_success: 410 status = constants.JOB_STATUS_SUCCESS 411 412 return status
413
414 - def CalcPriority(self):
415 """Gets the current priority for this job. 416 417 Only unfinished opcodes are considered. When all are done, the default 418 priority is used. 419 420 @rtype: int 421 422 """ 423 priorities = [op.priority for op in self.ops 424 if op.status not in constants.OPS_FINALIZED] 425 426 if not priorities: 427 # All opcodes are done, assume default priority 428 return constants.OP_PRIO_DEFAULT 429 430 return min(priorities)
431
432 - def GetLogEntries(self, newer_than):
433 """Selectively returns the log entries. 434 435 @type newer_than: None or int 436 @param newer_than: if this is None, return all log entries, 437 otherwise return only the log entries with serial higher 438 than this value 439 @rtype: list 440 @return: the list of the log entries selected 441 442 """ 443 if newer_than is None: 444 serial = -1 445 else: 446 serial = newer_than 447 448 entries = [] 449 for op in self.ops: 450 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 451 452 return entries
453
454 - def GetInfo(self, fields):
455 """Returns information about a job. 456 457 @type fields: list 458 @param fields: names of fields to return 459 @rtype: list 460 @return: list with one element for each field 461 @raise errors.OpExecError: when an invalid field 462 has been passed 463 464 """ 465 return _SimpleJobQuery(fields)(self)
466
467 - def MarkUnfinishedOps(self, status, result):
468 """Mark unfinished opcodes with a given status and result. 469 470 This is an utility function for marking all running or waiting to 471 be run opcodes with a given status. Opcodes which are already 472 finalised are not changed. 473 474 @param status: a given opcode status 475 @param result: the opcode result 476 477 """ 478 not_marked = True 479 for op in self.ops: 480 if op.status in constants.OPS_FINALIZED: 481 assert not_marked, "Finalized opcodes found after non-finalized ones" 482 continue 483 op.status = status 484 op.result = result 485 not_marked = False
486
487 - def Finalize(self):
488 """Marks the job as finalized. 489 490 """ 491 self.end_timestamp = TimeStampNow()
492
493 - def Cancel(self):
494 """Marks job as canceled/-ing if possible. 495 496 @rtype: tuple; (bool, string) 497 @return: Boolean describing whether job was successfully canceled or marked 498 as canceling and a text message 499 500 """ 501 status = self.CalcStatus() 502 503 if status == constants.JOB_STATUS_QUEUED: 504 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 505 "Job canceled by request") 506 self.Finalize() 507 return (True, "Job %s canceled" % self.id) 508 509 elif status == constants.JOB_STATUS_WAITING: 510 # The worker will notice the new status and cancel the job 511 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 512 return (True, "Job %s will be canceled" % self.id) 513 514 else: 515 logging.debug("Job %s is no longer waiting in the queue", self.id) 516 return (False, "Job %s is no longer waiting in the queue" % self.id)
517
518 - def ChangePriority(self, priority):
519 """Changes the job priority. 520 521 @type priority: int 522 @param priority: New priority 523 @rtype: tuple; (bool, string) 524 @return: Boolean describing whether job's priority was successfully changed 525 and a text message 526 527 """ 528 status = self.CalcStatus() 529 530 if status in constants.JOBS_FINALIZED: 531 return (False, "Job %s is finished" % self.id) 532 elif status == constants.JOB_STATUS_CANCELING: 533 return (False, "Job %s is cancelling" % self.id) 534 else: 535 assert status in (constants.JOB_STATUS_QUEUED, 536 constants.JOB_STATUS_WAITING, 537 constants.JOB_STATUS_RUNNING) 538 539 changed = False 540 for op in self.ops: 541 if (op.status == constants.OP_STATUS_RUNNING or 542 op.status in constants.OPS_FINALIZED): 543 assert not changed, \ 544 ("Found opcode for which priority should not be changed after" 545 " priority has been changed for previous opcodes") 546 continue 547 548 assert op.status in (constants.OP_STATUS_QUEUED, 549 constants.OP_STATUS_WAITING) 550 551 changed = True 552 553 # Set new priority (doesn't modify opcode input) 554 op.priority = priority 555 556 if changed: 557 return (True, ("Priorities of pending opcodes for job %s have been" 558 " changed to %s" % (self.id, priority))) 559 else: 560 return (False, "Job %s had no pending opcodes" % self.id)
561
562 563 -class _OpExecCallbacks(mcpu.OpExecCbBase):
564
565 - def __init__(self, queue, job, op):
566 """Initializes this class. 567 568 @type queue: L{JobQueue} 569 @param queue: Job queue 570 @type job: L{_QueuedJob} 571 @param job: Job object 572 @type op: L{_QueuedOpCode} 573 @param op: OpCode 574 575 """ 576 super(_OpExecCallbacks, self).__init__() 577 578 assert queue, "Queue is missing" 579 assert job, "Job is missing" 580 assert op, "Opcode is missing" 581 582 self._queue = queue 583 self._job = job 584 self._op = op
585
586 - def _CheckCancel(self):
587 """Raises an exception to cancel the job if asked to. 588 589 """ 590 # Cancel here if we were asked to 591 if self._op.status == constants.OP_STATUS_CANCELING: 592 logging.debug("Canceling opcode") 593 raise CancelJob() 594 595 # See if queue is shutting down 596 if not self._queue.AcceptingJobsUnlocked(): 597 logging.debug("Queue is shutting down") 598 raise QueueShutdown()
599 600 @locking.ssynchronized(_QUEUE, shared=1)
601 - def NotifyStart(self):
602 """Mark the opcode as running, not lock-waiting. 603 604 This is called from the mcpu code as a notifier function, when the LU is 605 finally about to start the Exec() method. Of course, to have end-user 606 visible results, the opcode must be initially (before calling into 607 Processor.ExecOpCode) set to OP_STATUS_WAITING. 608 609 """ 610 assert self._op in self._job.ops 611 assert self._op.status in (constants.OP_STATUS_WAITING, 612 constants.OP_STATUS_CANCELING) 613 614 # Cancel here if we were asked to 615 self._CheckCancel() 616 617 logging.debug("Opcode is now running") 618 619 self._op.status = constants.OP_STATUS_RUNNING 620 self._op.exec_timestamp = TimeStampNow() 621 622 # And finally replicate the job status 623 self._queue.UpdateJobUnlocked(self._job)
624 625 @locking.ssynchronized(_QUEUE, shared=1)
626 - def _AppendFeedback(self, timestamp, log_type, log_msg):
627 """Internal feedback append function, with locks 628 629 """ 630 self._job.log_serial += 1 631 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 632 self._queue.UpdateJobUnlocked(self._job, replicate=False)
633
634 - def Feedback(self, *args):
635 """Append a log entry. 636 637 """ 638 assert len(args) < 3 639 640 if len(args) == 1: 641 log_type = constants.ELOG_MESSAGE 642 log_msg = args[0] 643 else: 644 (log_type, log_msg) = args 645 646 # The time is split to make serialization easier and not lose 647 # precision. 648 timestamp = utils.SplitTime(time.time()) 649 self._AppendFeedback(timestamp, log_type, log_msg)
650
651 - def CurrentPriority(self):
652 """Returns current priority for opcode. 653 654 """ 655 assert self._op.status in (constants.OP_STATUS_WAITING, 656 constants.OP_STATUS_CANCELING) 657 658 # Cancel here if we were asked to 659 self._CheckCancel() 660 661 return self._op.priority
662
663 - def SubmitManyJobs(self, jobs):
664 """Submits jobs for processing. 665 666 See L{JobQueue.SubmitManyJobs}. 667 668 """ 669 # Locking is done in job queue 670 return self._queue.SubmitManyJobs(jobs)
671
672 673 -class _JobChangesChecker(object):
674 - def __init__(self, fields, prev_job_info, prev_log_serial):
675 """Initializes this class. 676 677 @type fields: list of strings 678 @param fields: Fields requested by LUXI client 679 @type prev_job_info: string 680 @param prev_job_info: previous job info, as passed by the LUXI client 681 @type prev_log_serial: string 682 @param prev_log_serial: previous job serial, as passed by the LUXI client 683 684 """ 685 self._squery = _SimpleJobQuery(fields) 686 self._prev_job_info = prev_job_info 687 self._prev_log_serial = prev_log_serial
688
689 - def __call__(self, job):
690 """Checks whether job has changed. 691 692 @type job: L{_QueuedJob} 693 @param job: Job object 694 695 """ 696 assert not job.writable, "Expected read-only job" 697 698 status = job.CalcStatus() 699 job_info = self._squery(job) 700 log_entries = job.GetLogEntries(self._prev_log_serial) 701 702 # Serializing and deserializing data can cause type changes (e.g. from 703 # tuple to list) or precision loss. We're doing it here so that we get 704 # the same modifications as the data received from the client. Without 705 # this, the comparison afterwards might fail without the data being 706 # significantly different. 707 # TODO: we just deserialized from disk, investigate how to make sure that 708 # the job info and log entries are compatible to avoid this further step. 709 # TODO: Doing something like in testutils.py:UnifyValueType might be more 710 # efficient, though floats will be tricky 711 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 712 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 713 714 # Don't even try to wait if the job is no longer running, there will be 715 # no changes. 716 if (status not in (constants.JOB_STATUS_QUEUED, 717 constants.JOB_STATUS_RUNNING, 718 constants.JOB_STATUS_WAITING) or 719 job_info != self._prev_job_info or 720 (log_entries and self._prev_log_serial != log_entries[0][0])): 721 logging.debug("Job %s changed", job.id) 722 return (job_info, log_entries) 723 724 return None
725
726 727 -class _JobFileChangesWaiter(object):
728 - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
729 """Initializes this class. 730 731 @type filename: string 732 @param filename: Path to job file 733 @raises errors.InotifyError: if the notifier cannot be setup 734 735 """ 736 self._wm = _inotify_wm_cls() 737 self._inotify_handler = \ 738 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) 739 self._notifier = \ 740 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) 741 try: 742 self._inotify_handler.enable() 743 except Exception: 744 # pyinotify doesn't close file descriptors automatically 745 self._notifier.stop() 746 raise
747
748 - def _OnInotify(self, notifier_enabled):
749 """Callback for inotify. 750 751 """ 752 if not notifier_enabled: 753 self._inotify_handler.enable()
754
755 - def Wait(self, timeout):
756 """Waits for the job file to change. 757 758 @type timeout: float 759 @param timeout: Timeout in seconds 760 @return: Whether there have been events 761 762 """ 763 assert timeout >= 0 764 have_events = self._notifier.check_events(timeout * 1000) 765 if have_events: 766 self._notifier.read_events() 767 self._notifier.process_events() 768 return have_events
769
770 - def Close(self):
771 """Closes underlying notifier and its file descriptor. 772 773 """ 774 self._notifier.stop()
775
776 777 -class _JobChangesWaiter(object):
778 - def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
779 """Initializes this class. 780 781 @type filename: string 782 @param filename: Path to job file 783 784 """ 785 self._filewaiter = None 786 self._filename = filename 787 self._waiter_cls = _waiter_cls
788
789 - def Wait(self, timeout):
790 """Waits for a job to change. 791 792 @type timeout: float 793 @param timeout: Timeout in seconds 794 @return: Whether there have been events 795 796 """ 797 if self._filewaiter: 798 return self._filewaiter.Wait(timeout) 799 800 # Lazy setup: Avoid inotify setup cost when job file has already changed. 801 # If this point is reached, return immediately and let caller check the job 802 # file again in case there were changes since the last check. This avoids a 803 # race condition. 804 self._filewaiter = self._waiter_cls(self._filename) 805 806 return True
807
808 - def Close(self):
809 """Closes underlying waiter. 810 811 """ 812 if self._filewaiter: 813 self._filewaiter.Close()
814
815 816 -class _WaitForJobChangesHelper(object):
817 """Helper class using inotify to wait for changes in a job file. 818 819 This class takes a previous job status and serial, and alerts the client when 820 the current job status has changed. 821 822 """ 823 @staticmethod
824 - def _CheckForChanges(counter, job_load_fn, check_fn):
825 if counter.next() > 0: 826 # If this isn't the first check the job is given some more time to change 827 # again. This gives better performance for jobs generating many 828 # changes/messages. 829 time.sleep(0.1) 830 831 job = job_load_fn() 832 if not job: 833 raise errors.JobLost() 834 835 result = check_fn(job) 836 if result is None: 837 raise utils.RetryAgain() 838 839 return result
840
841 - def __call__(self, filename, job_load_fn, 842 fields, prev_job_info, prev_log_serial, timeout, 843 _waiter_cls=_JobChangesWaiter):
844 """Waits for changes on a job. 845 846 @type filename: string 847 @param filename: File on which to wait for changes 848 @type job_load_fn: callable 849 @param job_load_fn: Function to load job 850 @type fields: list of strings 851 @param fields: Which fields to check for changes 852 @type prev_job_info: list or None 853 @param prev_job_info: Last job information returned 854 @type prev_log_serial: int 855 @param prev_log_serial: Last job message serial number 856 @type timeout: float 857 @param timeout: maximum time to wait in seconds 858 859 """ 860 counter = itertools.count() 861 try: 862 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) 863 waiter = _waiter_cls(filename) 864 try: 865 return utils.Retry(compat.partial(self._CheckForChanges, 866 counter, job_load_fn, check_fn), 867 utils.RETRY_REMAINING_TIME, timeout, 868 wait_fn=waiter.Wait) 869 finally: 870 waiter.Close() 871 except errors.JobLost: 872 return None 873 except utils.RetryTimeout: 874 return constants.JOB_NOTCHANGED
875
876 877 -def _EncodeOpError(err):
878 """Encodes an error which occurred while processing an opcode. 879 880 """ 881 if isinstance(err, errors.GenericError): 882 to_encode = err 883 else: 884 to_encode = errors.OpExecError(str(err)) 885 886 return errors.EncodeException(to_encode)
887
888 889 -class _TimeoutStrategyWrapper:
890 - def __init__(self, fn):
891 """Initializes this class. 892 893 """ 894 self._fn = fn 895 self._next = None
896
897 - def _Advance(self):
898 """Gets the next timeout if necessary. 899 900 """ 901 if self._next is None: 902 self._next = self._fn()
903
904 - def Peek(self):
905 """Returns the next timeout. 906 907 """ 908 self._Advance() 909 return self._next
910
911 - def Next(self):
912 """Returns the current timeout and advances the internal state. 913 914 """ 915 self._Advance() 916 result = self._next 917 self._next = None 918 return result
919
920 921 -class _OpExecContext:
922 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
923 """Initializes this class. 924 925 """ 926 self.op = op 927 self.index = index 928 self.log_prefix = log_prefix 929 self.summary = op.input.Summary() 930 931 # Create local copy to modify 932 if getattr(op.input, opcodes_base.DEPEND_ATTR, None): 933 self.jobdeps = op.input.depends[:] 934 else: 935 self.jobdeps = None 936 937 self._timeout_strategy_factory = timeout_strategy_factory 938 self._ResetTimeoutStrategy()
939
940 - def _ResetTimeoutStrategy(self):
941 """Creates a new timeout strategy. 942 943 """ 944 self._timeout_strategy = \ 945 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
946
947 - def CheckPriorityIncrease(self):
948 """Checks whether priority can and should be increased. 949 950 Called when locks couldn't be acquired. 951 952 """ 953 op = self.op 954 955 # Exhausted all retries and next round should not use blocking acquire 956 # for locks? 957 if (self._timeout_strategy.Peek() is None and 958 op.priority > constants.OP_PRIO_HIGHEST): 959 logging.debug("Increasing priority") 960 op.priority -= 1 961 self._ResetTimeoutStrategy() 962 return True 963 964 return False
965
966 - def GetNextLockTimeout(self):
967 """Returns the next lock acquire timeout. 968 969 """ 970 return self._timeout_strategy.Next()
971
972 973 -class _JobProcessor(object):
974 (DEFER, 975 WAITDEP, 976 FINISHED) = range(1, 4) 977
978 - def __init__(self, queue, opexec_fn, job, 979 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
980 """Initializes this class. 981 982 """ 983 self.queue = queue 984 self.opexec_fn = opexec_fn 985 self.job = job 986 self._timeout_strategy_factory = _timeout_strategy_factory
987 988 @staticmethod
989 - def _FindNextOpcode(job, timeout_strategy_factory):
990 """Locates the next opcode to run. 991 992 @type job: L{_QueuedJob} 993 @param job: Job object 994 @param timeout_strategy_factory: Callable to create new timeout strategy 995 996 """ 997 # Create some sort of a cache to speed up locating next opcode for future 998 # lookups 999 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 1000 # pending and one for processed ops. 1001 if job.ops_iter is None: 1002 job.ops_iter = enumerate(job.ops) 1003 1004 # Find next opcode to run 1005 while True: 1006 try: 1007 (idx, op) = job.ops_iter.next() 1008 except StopIteration: 1009 raise errors.ProgrammerError("Called for a finished job") 1010 1011 if op.status == constants.OP_STATUS_RUNNING: 1012 # Found an opcode already marked as running 1013 raise errors.ProgrammerError("Called for job marked as running") 1014 1015 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 1016 timeout_strategy_factory) 1017 1018 if op.status not in constants.OPS_FINALIZED: 1019 return opctx 1020 1021 # This is a job that was partially completed before master daemon 1022 # shutdown, so it can be expected that some opcodes are already 1023 # completed successfully (if any did error out, then the whole job 1024 # should have been aborted and not resubmitted for processing). 1025 logging.info("%s: opcode %s already processed, skipping", 1026 opctx.log_prefix, opctx.summary)
1027 1028 @staticmethod
1029 - def _MarkWaitlock(job, op):
1030 """Marks an opcode as waiting for locks. 1031 1032 The job's start timestamp is also set if necessary. 1033 1034 @type job: L{_QueuedJob} 1035 @param job: Job object 1036 @type op: L{_QueuedOpCode} 1037 @param op: Opcode object 1038 1039 """ 1040 assert op in job.ops 1041 assert op.status in (constants.OP_STATUS_QUEUED, 1042 constants.OP_STATUS_WAITING) 1043 1044 update = False 1045 1046 op.result = None 1047 1048 if op.status == constants.OP_STATUS_QUEUED: 1049 op.status = constants.OP_STATUS_WAITING 1050 update = True 1051 1052 if op.start_timestamp is None: 1053 op.start_timestamp = TimeStampNow() 1054 update = True 1055 1056 if job.start_timestamp is None: 1057 job.start_timestamp = op.start_timestamp 1058 update = True 1059 1060 assert op.status == constants.OP_STATUS_WAITING 1061 1062 return update
1063 1064 @staticmethod
1065 - def _CheckDependencies(queue, job, opctx):
1066 """Checks if an opcode has dependencies and if so, processes them. 1067 1068 @type queue: L{JobQueue} 1069 @param queue: Queue object 1070 @type job: L{_QueuedJob} 1071 @param job: Job object 1072 @type opctx: L{_OpExecContext} 1073 @param opctx: Opcode execution context 1074 @rtype: bool 1075 @return: Whether opcode will be re-scheduled by dependency tracker 1076 1077 """ 1078 op = opctx.op 1079 1080 result = False 1081 1082 while opctx.jobdeps: 1083 (dep_job_id, dep_status) = opctx.jobdeps[0] 1084 1085 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 1086 dep_status) 1087 assert ht.TNonEmptyString(depmsg), "No dependency message" 1088 1089 logging.info("%s: %s", opctx.log_prefix, depmsg) 1090 1091 if depresult == _JobDependencyManager.CONTINUE: 1092 # Remove dependency and continue 1093 opctx.jobdeps.pop(0) 1094 1095 elif depresult == _JobDependencyManager.WAIT: 1096 # Need to wait for notification, dependency tracker will re-add job 1097 # to workerpool 1098 result = True 1099 break 1100 1101 elif depresult == _JobDependencyManager.CANCEL: 1102 # Job was cancelled, cancel this job as well 1103 job.Cancel() 1104 assert op.status == constants.OP_STATUS_CANCELING 1105 break 1106 1107 elif depresult in (_JobDependencyManager.WRONGSTATUS, 1108 _JobDependencyManager.ERROR): 1109 # Job failed or there was an error, this job must fail 1110 op.status = constants.OP_STATUS_ERROR 1111 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 1112 break 1113 1114 else: 1115 raise errors.ProgrammerError("Unknown dependency result '%s'" % 1116 depresult) 1117 1118 return result
1119
1120 - def _ExecOpCodeUnlocked(self, opctx):
1121 """Processes one opcode and returns the result. 1122 1123 """ 1124 op = opctx.op 1125 1126 assert op.status in (constants.OP_STATUS_WAITING, 1127 constants.OP_STATUS_CANCELING) 1128 1129 # The very last check if the job was cancelled before trying to execute 1130 if op.status == constants.OP_STATUS_CANCELING: 1131 return (constants.OP_STATUS_CANCELING, None) 1132 1133 timeout = opctx.GetNextLockTimeout() 1134 1135 try: 1136 # Make sure not to hold queue lock while calling ExecOpCode 1137 result = self.opexec_fn(op.input, 1138 _OpExecCallbacks(self.queue, self.job, op), 1139 timeout=timeout) 1140 except mcpu.LockAcquireTimeout: 1141 assert timeout is not None, "Received timeout for blocking acquire" 1142 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 1143 1144 assert op.status in (constants.OP_STATUS_WAITING, 1145 constants.OP_STATUS_CANCELING) 1146 1147 # Was job cancelled while we were waiting for the lock? 1148 if op.status == constants.OP_STATUS_CANCELING: 1149 return (constants.OP_STATUS_CANCELING, None) 1150 1151 # Queue is shutting down, return to queued 1152 if not self.queue.AcceptingJobsUnlocked(): 1153 return (constants.OP_STATUS_QUEUED, None) 1154 1155 # Stay in waitlock while trying to re-acquire lock 1156 return (constants.OP_STATUS_WAITING, None) 1157 except CancelJob: 1158 logging.exception("%s: Canceling job", opctx.log_prefix) 1159 assert op.status == constants.OP_STATUS_CANCELING 1160 return (constants.OP_STATUS_CANCELING, None) 1161 1162 except QueueShutdown: 1163 logging.exception("%s: Queue is shutting down", opctx.log_prefix) 1164 1165 assert op.status == constants.OP_STATUS_WAITING 1166 1167 # Job hadn't been started yet, so it should return to the queue 1168 return (constants.OP_STATUS_QUEUED, None) 1169 1170 except Exception, err: # pylint: disable=W0703 1171 logging.exception("%s: Caught exception in %s", 1172 opctx.log_prefix, opctx.summary) 1173 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 1174 else: 1175 logging.debug("%s: %s successful", 1176 opctx.log_prefix, opctx.summary) 1177 return (constants.OP_STATUS_SUCCESS, result)
1178
1179 - def __call__(self, _nextop_fn=None):
1180 """Continues execution of a job. 1181 1182 @param _nextop_fn: Callback function for tests 1183 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 1184 be deferred and C{WAITDEP} if the dependency manager 1185 (L{_JobDependencyManager}) will re-schedule the job when appropriate 1186 1187 """ 1188 queue = self.queue 1189 job = self.job 1190 1191 logging.debug("Processing job %s", job.id) 1192 1193 queue.acquire(shared=1) 1194 try: 1195 opcount = len(job.ops) 1196 1197 assert job.writable, "Expected writable job" 1198 1199 # Don't do anything for finalized jobs 1200 if job.CalcStatus() in constants.JOBS_FINALIZED: 1201 return self.FINISHED 1202 1203 # Is a previous opcode still pending? 1204 if job.cur_opctx: 1205 opctx = job.cur_opctx 1206 job.cur_opctx = None 1207 else: 1208 if __debug__ and _nextop_fn: 1209 _nextop_fn() 1210 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 1211 1212 op = opctx.op 1213 1214 # Consistency check 1215 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 1216 constants.OP_STATUS_CANCELING) 1217 for i in job.ops[opctx.index + 1:]) 1218 1219 assert op.status in (constants.OP_STATUS_QUEUED, 1220 constants.OP_STATUS_WAITING, 1221 constants.OP_STATUS_CANCELING) 1222 1223 assert (op.priority <= constants.OP_PRIO_LOWEST and 1224 op.priority >= constants.OP_PRIO_HIGHEST) 1225 1226 waitjob = None 1227 1228 if op.status != constants.OP_STATUS_CANCELING: 1229 assert op.status in (constants.OP_STATUS_QUEUED, 1230 constants.OP_STATUS_WAITING) 1231 1232 # Prepare to start opcode 1233 if self._MarkWaitlock(job, op): 1234 # Write to disk 1235 queue.UpdateJobUnlocked(job) 1236 1237 assert op.status == constants.OP_STATUS_WAITING 1238 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1239 assert job.start_timestamp and op.start_timestamp 1240 assert waitjob is None 1241 1242 # Check if waiting for a job is necessary 1243 waitjob = self._CheckDependencies(queue, job, opctx) 1244 1245 assert op.status in (constants.OP_STATUS_WAITING, 1246 constants.OP_STATUS_CANCELING, 1247 constants.OP_STATUS_ERROR) 1248 1249 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1250 constants.OP_STATUS_ERROR)): 1251 logging.info("%s: opcode %s waiting for locks", 1252 opctx.log_prefix, opctx.summary) 1253 1254 assert not opctx.jobdeps, "Not all dependencies were removed" 1255 1256 queue.release() 1257 try: 1258 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1259 finally: 1260 queue.acquire(shared=1) 1261 1262 op.status = op_status 1263 op.result = op_result 1264 1265 assert not waitjob 1266 1267 if op.status in (constants.OP_STATUS_WAITING, 1268 constants.OP_STATUS_QUEUED): 1269 # waiting: Couldn't get locks in time 1270 # queued: Queue is shutting down 1271 assert not op.end_timestamp 1272 else: 1273 # Finalize opcode 1274 op.end_timestamp = TimeStampNow() 1275 1276 if op.status == constants.OP_STATUS_CANCELING: 1277 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1278 for i in job.ops[opctx.index:]) 1279 else: 1280 assert op.status in constants.OPS_FINALIZED 1281 1282 if op.status == constants.OP_STATUS_QUEUED: 1283 # Queue is shutting down 1284 assert not waitjob 1285 1286 finalize = False 1287 1288 # Reset context 1289 job.cur_opctx = None 1290 1291 # In no case must the status be finalized here 1292 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1293 1294 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1295 finalize = False 1296 1297 if not waitjob and opctx.CheckPriorityIncrease(): 1298 # Priority was changed, need to update on-disk file 1299 queue.UpdateJobUnlocked(job) 1300 1301 # Keep around for another round 1302 job.cur_opctx = opctx 1303 1304 assert (op.priority <= constants.OP_PRIO_LOWEST and 1305 op.priority >= constants.OP_PRIO_HIGHEST) 1306 1307 # In no case must the status be finalized here 1308 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1309 1310 else: 1311 # Ensure all opcodes so far have been successful 1312 assert (opctx.index == 0 or 1313 compat.all(i.status == constants.OP_STATUS_SUCCESS 1314 for i in job.ops[:opctx.index])) 1315 1316 # Reset context 1317 job.cur_opctx = None 1318 1319 if op.status == constants.OP_STATUS_SUCCESS: 1320 finalize = False 1321 1322 elif op.status == constants.OP_STATUS_ERROR: 1323 # Ensure failed opcode has an exception as its result 1324 assert errors.GetEncodedError(job.ops[opctx.index].result) 1325 1326 to_encode = errors.OpExecError("Preceding opcode failed") 1327 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1328 _EncodeOpError(to_encode)) 1329 finalize = True 1330 1331 # Consistency check 1332 assert compat.all(i.status == constants.OP_STATUS_ERROR and 1333 errors.GetEncodedError(i.result) 1334 for i in job.ops[opctx.index:]) 1335 1336 elif op.status == constants.OP_STATUS_CANCELING: 1337 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1338 "Job canceled by request") 1339 finalize = True 1340 1341 else: 1342 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1343 1344 if opctx.index == (opcount - 1): 1345 # Finalize on last opcode 1346 finalize = True 1347 1348 if finalize: 1349 # All opcodes have been run, finalize job 1350 job.Finalize() 1351 1352 # Write to disk. If the job status is final, this is the final write 1353 # allowed. Once the file has been written, it can be archived anytime. 1354 queue.UpdateJobUnlocked(job) 1355 1356 assert not waitjob 1357 1358 if finalize: 1359 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1360 return self.FINISHED 1361 1362 assert not waitjob or queue.depmgr.JobWaiting(job) 1363 1364 if waitjob: 1365 return self.WAITDEP 1366 else: 1367 return self.DEFER 1368 finally: 1369 assert job.writable, "Job became read-only while being processed" 1370 queue.release()
1371
1372 1373 -def _EvaluateJobProcessorResult(depmgr, job, result):
1374 """Looks at a result from L{_JobProcessor} for a job. 1375 1376 To be used in a L{_JobQueueWorker}. 1377 1378 """ 1379 if result == _JobProcessor.FINISHED: 1380 # Notify waiting jobs 1381 depmgr.NotifyWaiters(job.id) 1382 1383 elif result == _JobProcessor.DEFER: 1384 # Schedule again 1385 raise workerpool.DeferTask(priority=job.CalcPriority()) 1386 1387 elif result == _JobProcessor.WAITDEP: 1388 # No-op, dependency manager will re-schedule 1389 pass 1390 1391 else: 1392 raise errors.ProgrammerError("Job processor returned unknown status %s" % 1393 (result, ))
1394
1395 1396 -class _JobQueueWorker(workerpool.BaseWorker):
1397 """The actual job workers. 1398 1399 """
1400 - def RunTask(self, job): # pylint: disable=W0221
1401 """Job executor. 1402 1403 @type job: L{_QueuedJob} 1404 @param job: the job to be processed 1405 1406 """ 1407 assert job.writable, "Expected writable job" 1408 1409 # Ensure only one worker is active on a single job. If a job registers for 1410 # a dependency job, and the other job notifies before the first worker is 1411 # done, the job can end up in the tasklist more than once. 1412 job.processor_lock.acquire() 1413 try: 1414 return self._RunTaskInner(job) 1415 finally: 1416 job.processor_lock.release()
1417
1418 - def _RunTaskInner(self, job):
1419 """Executes a job. 1420 1421 Must be called with per-job lock acquired. 1422 1423 """ 1424 queue = job.queue 1425 assert queue == self.pool.queue 1426 1427 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) 1428 setname_fn(None) 1429 1430 proc = mcpu.Processor(queue.context, job.id) 1431 1432 # Create wrapper for setting thread name 1433 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, 1434 proc.ExecOpCode) 1435 1436 _EvaluateJobProcessorResult(queue.depmgr, job, 1437 _JobProcessor(queue, wrap_execop_fn, job)())
1438 1439 @staticmethod
1440 - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1441 """Updates the worker thread name to include a short summary of the opcode. 1442 1443 @param setname_fn: Callable setting worker thread name 1444 @param execop_fn: Callable for executing opcode (usually 1445 L{mcpu.Processor.ExecOpCode}) 1446 1447 """ 1448 setname_fn(op) 1449 try: 1450 return execop_fn(op, *args, **kwargs) 1451 finally: 1452 setname_fn(None)
1453 1454 @staticmethod
1455 - def _GetWorkerName(job, op):
1456 """Sets the worker thread name. 1457 1458 @type job: L{_QueuedJob} 1459 @type op: L{opcodes.OpCode} 1460 1461 """ 1462 parts = ["Job%s" % job.id] 1463 1464 if op: 1465 parts.append(op.TinySummary()) 1466 1467 return "/".join(parts)
1468
1469 1470 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1471 """Simple class implementing a job-processing workerpool. 1472 1473 """
1474 - def __init__(self, queue):
1475 super(_JobQueueWorkerPool, self).__init__("Jq", 1476 JOBQUEUE_THREADS, 1477 _JobQueueWorker) 1478 self.queue = queue
1479
1480 1481 -class _JobDependencyManager:
1482 """Keeps track of job dependencies. 1483 1484 """ 1485 (WAIT, 1486 ERROR, 1487 CANCEL, 1488 CONTINUE, 1489 WRONGSTATUS) = range(1, 6) 1490
1491 - def __init__(self, getstatus_fn, enqueue_fn):
1492 """Initializes this class. 1493 1494 """ 1495 self._getstatus_fn = getstatus_fn 1496 self._enqueue_fn = enqueue_fn 1497 1498 self._waiters = {} 1499 self._lock = locking.SharedLock("JobDepMgr")
1500 1501 @locking.ssynchronized(_LOCK, shared=1)
1502 - def GetLockInfo(self, requested): # pylint: disable=W0613
1503 """Retrieves information about waiting jobs. 1504 1505 @type requested: set 1506 @param requested: Requested information, see C{query.LQ_*} 1507 1508 """ 1509 # No need to sort here, that's being done by the lock manager and query 1510 # library. There are no priorities for notifying jobs, hence all show up as 1511 # one item under "pending". 1512 return [("job/%s" % job_id, None, None, 1513 [("job", [job.id for job in waiters])]) 1514 for job_id, waiters in self._waiters.items() 1515 if waiters]
1516 1517 @locking.ssynchronized(_LOCK, shared=1)
1518 - def JobWaiting(self, job):
1519 """Checks if a job is waiting. 1520 1521 """ 1522 return compat.any(job in jobs 1523 for jobs in self._waiters.values())
1524 1525 @locking.ssynchronized(_LOCK)
1526 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1527 """Checks if a dependency job has the requested status. 1528 1529 If the other job is not yet in a finalized status, the calling job will be 1530 notified (re-added to the workerpool) at a later point. 1531 1532 @type job: L{_QueuedJob} 1533 @param job: Job object 1534 @type dep_job_id: int 1535 @param dep_job_id: ID of dependency job 1536 @type dep_status: list 1537 @param dep_status: Required status 1538 1539 """ 1540 assert ht.TJobId(job.id) 1541 assert ht.TJobId(dep_job_id) 1542 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1543 1544 if job.id == dep_job_id: 1545 return (self.ERROR, "Job can't depend on itself") 1546 1547 # Get status of dependency job 1548 try: 1549 status = self._getstatus_fn(dep_job_id) 1550 except errors.JobLost, err: 1551 return (self.ERROR, "Dependency error: %s" % err) 1552 1553 assert status in constants.JOB_STATUS_ALL 1554 1555 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1556 1557 if status not in constants.JOBS_FINALIZED: 1558 # Register for notification and wait for job to finish 1559 job_id_waiters.add(job) 1560 return (self.WAIT, 1561 "Need to wait for job %s, wanted status '%s'" % 1562 (dep_job_id, dep_status)) 1563 1564 # Remove from waiters list 1565 if job in job_id_waiters: 1566 job_id_waiters.remove(job) 1567 1568 if (status == constants.JOB_STATUS_CANCELED and 1569 constants.JOB_STATUS_CANCELED not in dep_status): 1570 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1571 1572 elif not dep_status or status in dep_status: 1573 return (self.CONTINUE, 1574 "Dependency job %s finished with status '%s'" % 1575 (dep_job_id, status)) 1576 1577 else: 1578 return (self.WRONGSTATUS, 1579 "Dependency job %s finished with status '%s'," 1580 " not one of '%s' as required" % 1581 (dep_job_id, status, utils.CommaJoin(dep_status)))
1582
1583 - def _RemoveEmptyWaitersUnlocked(self):
1584 """Remove all jobs without actual waiters. 1585 1586 """ 1587 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1588 if not waiters]: 1589 del self._waiters[job_id]
1590
1591 - def NotifyWaiters(self, job_id):
1592 """Notifies all jobs waiting for a certain job ID. 1593 1594 @attention: Do not call until L{CheckAndRegister} returned a status other 1595 than C{WAITDEP} for C{job_id}, or behaviour is undefined 1596 @type job_id: int 1597 @param job_id: Job ID 1598 1599 """ 1600 assert ht.TJobId(job_id) 1601 1602 self._lock.acquire() 1603 try: 1604 self._RemoveEmptyWaitersUnlocked() 1605 1606 jobs = self._waiters.pop(job_id, None) 1607 finally: 1608 self._lock.release() 1609 1610 if jobs: 1611 # Re-add jobs to workerpool 1612 logging.debug("Re-adding %s jobs which were waiting for job %s", 1613 len(jobs), job_id) 1614 self._enqueue_fn(jobs)
1615
1616 1617 -def _RequireOpenQueue(fn):
1618 """Decorator for "public" functions. 1619 1620 This function should be used for all 'public' functions. That is, 1621 functions usually called from other classes. Note that this should 1622 be applied only to methods (not plain functions), since it expects 1623 that the decorated function is called with a first argument that has 1624 a '_queue_filelock' argument. 1625 1626 @warning: Use this decorator only after locking.ssynchronized 1627 1628 Example:: 1629 @locking.ssynchronized(_LOCK) 1630 @_RequireOpenQueue 1631 def Example(self): 1632 pass 1633 1634 """ 1635 def wrapper(self, *args, **kwargs): 1636 # pylint: disable=W0212 1637 assert self._queue_filelock is not None, "Queue should be open" 1638 return fn(self, *args, **kwargs)
1639 return wrapper 1640
1641 1642 -def _RequireNonDrainedQueue(fn):
1643 """Decorator checking for a non-drained queue. 1644 1645 To be used with functions submitting new jobs. 1646 1647 """ 1648 def wrapper(self, *args, **kwargs): 1649 """Wrapper function. 1650 1651 @raise errors.JobQueueDrainError: if the job queue is marked for draining 1652 1653 """ 1654 # Ok when sharing the big job queue lock, as the drain file is created when 1655 # the lock is exclusive. 1656 # Needs access to protected member, pylint: disable=W0212 1657 if self._drained: 1658 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 1659 1660 if not self._accepting_jobs: 1661 raise errors.JobQueueError("Job queue is shutting down, refusing job") 1662 1663 return fn(self, *args, **kwargs)
1664 return wrapper 1665
1666 1667 -class JobQueue(object):
1668 """Queue used to manage the jobs. 1669 1670 """
1671 - def __init__(self, context):
1672 """Constructor for JobQueue. 1673 1674 The constructor will initialize the job queue object and then 1675 start loading the current jobs from disk, either for starting them 1676 (if they were queue) or for aborting them (if they were already 1677 running). 1678 1679 @type context: GanetiContext 1680 @param context: the context object for access to the configuration 1681 data and other ganeti objects 1682 1683 """ 1684 self.context = context 1685 self._memcache = weakref.WeakValueDictionary() 1686 self._my_hostname = netutils.Hostname.GetSysName() 1687 1688 # The Big JobQueue lock. If a code block or method acquires it in shared 1689 # mode safe it must guarantee concurrency with all the code acquiring it in 1690 # shared mode, including itself. In order not to acquire it at all 1691 # concurrency must be guaranteed with all code acquiring it in shared mode 1692 # and all code acquiring it exclusively. 1693 self._lock = locking.SharedLock("JobQueue") 1694 1695 self.acquire = self._lock.acquire 1696 self.release = self._lock.release 1697 1698 # Accept jobs by default 1699 self._accepting_jobs = True 1700 1701 # Initialize the queue, and acquire the filelock. 1702 # This ensures no other process is working on the job queue. 1703 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) 1704 1705 # Read serial file 1706 self._last_serial = jstore.ReadSerial() 1707 assert self._last_serial is not None, ("Serial file was modified between" 1708 " check in jstore and here") 1709 1710 # Get initial list of nodes 1711 self._nodes = dict((n.name, n.primary_ip) 1712 for n in self.context.cfg.GetAllNodesInfo().values() 1713 if n.master_candidate) 1714 1715 # Remove master node 1716 self._nodes.pop(self._my_hostname, None) 1717 1718 # TODO: Check consistency across nodes 1719 1720 self._queue_size = None 1721 self._UpdateQueueSizeUnlocked() 1722 assert ht.TInt(self._queue_size) 1723 self._drained = jstore.CheckDrainFlag() 1724 1725 # Job dependencies 1726 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, 1727 self._EnqueueJobs) 1728 self.context.glm.AddToLockMonitor(self.depmgr) 1729 1730 # Setup worker pool 1731 self._wpool = _JobQueueWorkerPool(self)
1732
1733 - def _PickupJobUnlocked(self, job_id):
1734 """Load a job from the job queue 1735 1736 Pick up a job that already is in the job queue and start/resume it. 1737 1738 """ 1739 job = self._LoadJobUnlocked(job_id) 1740 1741 if job is None: 1742 logging.warning("Job %s could not be read", job_id) 1743 return 1744 1745 job.AddReasons(pickup=True) 1746 1747 status = job.CalcStatus() 1748 if status == constants.JOB_STATUS_QUEUED: 1749 self._EnqueueJobsUnlocked([job]) 1750 logging.info("Restarting job %s", job.id) 1751 1752 elif status in (constants.JOB_STATUS_RUNNING, 1753 constants.JOB_STATUS_WAITING, 1754 constants.JOB_STATUS_CANCELING): 1755 logging.warning("Unfinished job %s found: %s", job.id, job) 1756 1757 if status == constants.JOB_STATUS_WAITING: 1758 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1759 self._EnqueueJobsUnlocked([job]) 1760 logging.info("Restarting job %s", job.id) 1761 else: 1762 to_encode = errors.OpExecError("Unclean master daemon shutdown") 1763 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1764 _EncodeOpError(to_encode)) 1765 job.Finalize() 1766 1767 self.UpdateJobUnlocked(job)
1768 1769 @locking.ssynchronized(_LOCK)
1770 - def PickupJob(self, job_id):
1771 self._PickupJobUnlocked(job_id)
1772
1773 - def _GetRpc(self, address_list):
1774 """Gets RPC runner with context. 1775 1776 """ 1777 return rpc.JobQueueRunner(self.context, address_list)
1778 1779 @locking.ssynchronized(_LOCK) 1780 @_RequireOpenQueue
1781 - def AddNode(self, node):
1782 """Register a new node with the queue. 1783 1784 @type node: L{objects.Node} 1785 @param node: the node object to be added 1786 1787 """ 1788 node_name = node.name 1789 assert node_name != self._my_hostname 1790 1791 # Clean queue directory on added node 1792 result = self._GetRpc(None).call_jobqueue_purge(node_name) 1793 msg = result.fail_msg 1794 if msg: 1795 logging.warning("Cannot cleanup queue directory on node %s: %s", 1796 node_name, msg) 1797 1798 if not node.master_candidate: 1799 # remove if existing, ignoring errors 1800 self._nodes.pop(node_name, None) 1801 # and skip the replication of the job ids 1802 return 1803 1804 # Upload the whole queue excluding archived jobs 1805 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1806 1807 # Upload current serial file 1808 files.append(pathutils.JOB_QUEUE_SERIAL_FILE) 1809 1810 # Static address list 1811 addrs = [node.primary_ip] 1812 1813 for file_name in files: 1814 # Read file content 1815 content = utils.ReadFile(file_name) 1816 1817 result = _CallJqUpdate(self._GetRpc(addrs), [node_name], 1818 file_name, content) 1819 msg = result[node_name].fail_msg 1820 if msg: 1821 logging.error("Failed to upload file %s to node %s: %s", 1822 file_name, node_name, msg) 1823 1824 # Set queue drained flag 1825 result = \ 1826 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name], 1827 self._drained) 1828 msg = result[node_name].fail_msg 1829 if msg: 1830 logging.error("Failed to set queue drained flag on node %s: %s", 1831 node_name, msg) 1832 1833 self._nodes[node_name] = node.primary_ip
1834 1835 @locking.ssynchronized(_LOCK) 1836 @_RequireOpenQueue
1837 - def RemoveNode(self, node_name):
1838 """Callback called when removing nodes from the cluster. 1839 1840 @type node_name: str 1841 @param node_name: the name of the node to remove 1842 1843 """ 1844 self._nodes.pop(node_name, None)
1845 1846 @staticmethod
1847 - def _CheckRpcResult(result, nodes, failmsg):
1848 """Verifies the status of an RPC call. 1849 1850 Since we aim to keep consistency should this node (the current 1851 master) fail, we will log errors if our rpc fail, and especially 1852 log the case when more than half of the nodes fails. 1853 1854 @param result: the data as returned from the rpc call 1855 @type nodes: list 1856 @param nodes: the list of nodes we made the call to 1857 @type failmsg: str 1858 @param failmsg: the identifier to be used for logging 1859 1860 """ 1861 failed = [] 1862 success = [] 1863 1864 for node in nodes: 1865 msg = result[node].fail_msg 1866 if msg: 1867 failed.append(node) 1868 logging.error("RPC call %s (%s) failed on node %s: %s", 1869 result[node].call, failmsg, node, msg) 1870 else: 1871 success.append(node) 1872 1873 # +1 for the master node 1874 if (len(success) + 1) < len(failed): 1875 # TODO: Handle failing nodes 1876 logging.error("More than half of the nodes failed")
1877
1878 - def _GetNodeIp(self):
1879 """Helper for returning the node name/ip list. 1880 1881 @rtype: (list, list) 1882 @return: a tuple of two lists, the first one with the node 1883 names and the second one with the node addresses 1884 1885 """ 1886 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1887 name_list = self._nodes.keys() 1888 addr_list = [self._nodes[name] for name in name_list] 1889 return name_list, addr_list
1890
1891 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1892 """Writes a file locally and then replicates it to all nodes. 1893 1894 This function will replace the contents of a file on the local 1895 node and then replicate it to all the other nodes we have. 1896 1897 @type file_name: str 1898 @param file_name: the path of the file to be replicated 1899 @type data: str 1900 @param data: the new contents of the file 1901 @type replicate: boolean 1902 @param replicate: whether to spread the changes to the remote nodes 1903 1904 """ 1905 getents = runtime.GetEnts() 1906 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1907 gid=getents.daemons_gid, 1908 mode=constants.JOB_QUEUE_FILES_PERMS) 1909 1910 if replicate: 1911 names, addrs = self._GetNodeIp() 1912 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) 1913 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1914
1915 - def _RenameFilesUnlocked(self, rename):
1916 """Renames a file locally and then replicate the change. 1917 1918 This function will rename a file in the local queue directory 1919 and then replicate this rename to all the other nodes we have. 1920 1921 @type rename: list of (old, new) 1922 @param rename: List containing tuples mapping old to new names 1923 1924 """ 1925 # Rename them locally 1926 for old, new in rename: 1927 utils.RenameFile(old, new, mkdir=True) 1928 1929 # ... and on all nodes 1930 names, addrs = self._GetNodeIp() 1931 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1932 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1933 1934 @staticmethod
1935 - def _GetJobPath(job_id):
1936 """Returns the job file for a given job id. 1937 1938 @type job_id: str 1939 @param job_id: the job identifier 1940 @rtype: str 1941 @return: the path to the job file 1942 1943 """ 1944 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1945 1946 @staticmethod
1947 - def _GetArchivedJobPath(job_id):
1948 """Returns the archived job file for a give job id. 1949 1950 @type job_id: str 1951 @param job_id: the job identifier 1952 @rtype: str 1953 @return: the path to the archived job file 1954 1955 """ 1956 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, 1957 jstore.GetArchiveDirectory(job_id), 1958 "job-%s" % job_id)
1959 1960 @staticmethod
1961 - def _DetermineJobDirectories(archived):
1962 """Build list of directories containing job files. 1963 1964 @type archived: bool 1965 @param archived: Whether to include directories for archived jobs 1966 @rtype: list 1967 1968 """ 1969 result = [pathutils.QUEUE_DIR] 1970 1971 if archived: 1972 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR 1973 result.extend(map(compat.partial(utils.PathJoin, archive_path), 1974 utils.ListVisibleFiles(archive_path))) 1975 1976 return result
1977 1978 @classmethod
1979 - def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1980 """Return all known job IDs. 1981 1982 The method only looks at disk because it's a requirement that all 1983 jobs are present on disk (so in the _memcache we don't have any 1984 extra IDs). 1985 1986 @type sort: boolean 1987 @param sort: perform sorting on the returned job ids 1988 @rtype: list 1989 @return: the list of job IDs 1990 1991 """ 1992 jlist = [] 1993 1994 for path in cls._DetermineJobDirectories(archived): 1995 for filename in utils.ListVisibleFiles(path): 1996 m = constants.JOB_FILE_RE.match(filename) 1997 if m: 1998 jlist.append(int(m.group(1))) 1999 2000 if sort: 2001 jlist.sort() 2002 return jlist
2003
2004 - def _LoadJobUnlocked(self, job_id):
2005 """Loads a job from the disk or memory. 2006 2007 Given a job id, this will return the cached job object if 2008 existing, or try to load the job from the disk. If loading from 2009 disk, it will also add the job to the cache. 2010 2011 @type job_id: int 2012 @param job_id: the job id 2013 @rtype: L{_QueuedJob} or None 2014 @return: either None or the job object 2015 2016 """ 2017 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!" 2018 2019 job = self._memcache.get(job_id, None) 2020 if job: 2021 logging.debug("Found job %s in memcache", job_id) 2022 assert job.writable, "Found read-only job in memcache" 2023 return job 2024 2025 try: 2026 job = self._LoadJobFromDisk(job_id, False) 2027 if job is None: 2028 return job 2029 except errors.JobFileCorrupted: 2030 old_path = self._GetJobPath(job_id) 2031 new_path = self._GetArchivedJobPath(job_id) 2032 if old_path == new_path: 2033 # job already archived (future case) 2034 logging.exception("Can't parse job %s", job_id) 2035 else: 2036 # non-archived case 2037 logging.exception("Can't parse job %s, will archive.", job_id) 2038 self._RenameFilesUnlocked([(old_path, new_path)]) 2039 return None 2040 2041 assert job.writable, "Job just loaded is not writable" 2042 2043 self._memcache[job_id] = job 2044 logging.debug("Added job %s to the cache", job_id) 2045 return job
2046
2047 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2048 """Load the given job file from disk. 2049 2050 Given a job file, read, load and restore it in a _QueuedJob format. 2051 2052 @type job_id: int 2053 @param job_id: job identifier 2054 @type try_archived: bool 2055 @param try_archived: Whether to try loading an archived job 2056 @rtype: L{_QueuedJob} or None 2057 @return: either None or the job object 2058 2059 """ 2060 path_functions = [(self._GetJobPath, False)] 2061 2062 if try_archived: 2063 path_functions.append((self._GetArchivedJobPath, True)) 2064 2065 raw_data = None 2066 archived = None 2067 2068 for (fn, archived) in path_functions: 2069 filepath = fn(job_id) 2070 logging.debug("Loading job from %s", filepath) 2071 try: 2072 raw_data = utils.ReadFile(filepath) 2073 except EnvironmentError, err: 2074 if err.errno != errno.ENOENT: 2075 raise 2076 else: 2077 break 2078 2079 if not raw_data: 2080 return None 2081 2082 if writable is None: 2083 writable = not archived 2084 2085 try: 2086 data = serializer.LoadJson(raw_data) 2087 job = _QueuedJob.Restore(self, data, writable, archived) 2088 except Exception, err: # pylint: disable=W0703 2089 raise errors.JobFileCorrupted(err) 2090 2091 return job
2092
2093 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2094 """Load the given job file from disk. 2095 2096 Given a job file, read, load and restore it in a _QueuedJob format. 2097 In case of error reading the job, it gets returned as None, and the 2098 exception is logged. 2099 2100 @type job_id: int 2101 @param job_id: job identifier 2102 @type try_archived: bool 2103 @param try_archived: Whether to try loading an archived job 2104 @rtype: L{_QueuedJob} or None 2105 @return: either None or the job object 2106 2107 """ 2108 try: 2109 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 2110 except (errors.JobFileCorrupted, EnvironmentError): 2111 logging.exception("Can't load/parse job %s", job_id) 2112 return None
2113
2114 - def _UpdateQueueSizeUnlocked(self):
2115 """Update the queue size. 2116 2117 """ 2118 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2119 2120 @locking.ssynchronized(_LOCK) 2121 @_RequireOpenQueue
2122 - def SetDrainFlag(self, drain_flag):
2123 """Sets the drain flag for the queue. 2124 2125 @type drain_flag: boolean 2126 @param drain_flag: Whether to set or unset the drain flag 2127 2128 """ 2129 # Change flag locally 2130 jstore.SetDrainFlag(drain_flag) 2131 2132 self._drained = drain_flag 2133 2134 # ... and on all nodes 2135 (names, addrs) = self._GetNodeIp() 2136 result = \ 2137 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag) 2138 self._CheckRpcResult(result, self._nodes, 2139 "Setting queue drain flag to %s" % drain_flag) 2140 2141 return True
2142 2143 @classmethod
2144 - def SubmitJob(cls, ops):
2145 """Create and store a new job. 2146 2147 """ 2148 return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2149 2150 @classmethod
2151 - def SubmitJobToDrainedQueue(cls, ops):
2152 """Forcefully create and store a new job. 2153 2154 Do so, even if the job queue is drained. 2155 2156 """ 2157 return luxi.Client(address=pathutils.QUERY_SOCKET)\ 2158 .SubmitJobToDrainedQueue(ops)
2159 2160 @classmethod
2161 - def SubmitManyJobs(cls, jobs):
2162 """Create and store multiple jobs. 2163 2164 """ 2165 return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2166 2167 @staticmethod
2168 - def _FormatSubmitError(msg, ops):
2169 """Formats errors which occurred while submitting a job. 2170 2171 """ 2172 return ("%s; opcodes %s" % 2173 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2174 2175 @staticmethod
2176 - def _ResolveJobDependencies(resolve_fn, deps):
2177 """Resolves relative job IDs in dependencies. 2178 2179 @type resolve_fn: callable 2180 @param resolve_fn: Function to resolve a relative job ID 2181 @type deps: list 2182 @param deps: Dependencies 2183 @rtype: tuple; (boolean, string or list) 2184 @return: If successful (first tuple item), the returned list contains 2185 resolved job IDs along with the requested status; if not successful, 2186 the second element is an error message 2187 2188 """ 2189 result = [] 2190 2191 for (dep_job_id, dep_status) in deps: 2192 if ht.TRelativeJobId(dep_job_id): 2193 assert ht.TInt(dep_job_id) and dep_job_id < 0 2194 try: 2195 job_id = resolve_fn(dep_job_id) 2196 except IndexError: 2197 # Abort 2198 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 2199 else: 2200 job_id = dep_job_id 2201 2202 result.append((job_id, dep_status)) 2203 2204 return (True, result)
2205 2206 @locking.ssynchronized(_LOCK)
2207 - def _EnqueueJobs(self, jobs):
2208 """Helper function to add jobs to worker pool's queue. 2209 2210 @type jobs: list 2211 @param jobs: List of all jobs 2212 2213 """ 2214 return self._EnqueueJobsUnlocked(jobs)
2215
2216 - def _EnqueueJobsUnlocked(self, jobs):
2217 """Helper function to add jobs to worker pool's queue. 2218 2219 @type jobs: list 2220 @param jobs: List of all jobs 2221 2222 """ 2223 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" 2224 self._wpool.AddManyTasks([(job, ) for job in jobs], 2225 priority=[job.CalcPriority() for job in jobs], 2226 task_id=map(_GetIdAttr, jobs))
2227
2228 - def _GetJobStatusForDependencies(self, job_id):
2229 """Gets the status of a job for dependencies. 2230 2231 @type job_id: int 2232 @param job_id: Job ID 2233 @raise errors.JobLost: If job can't be found 2234 2235 """ 2236 # Not using in-memory cache as doing so would require an exclusive lock 2237 2238 # Try to load from disk 2239 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2240 2241 assert not job.writable, "Got writable job" # pylint: disable=E1101 2242 2243 if job: 2244 return job.CalcStatus() 2245 2246 raise errors.JobLost("Job %s not found" % job_id)
2247 2248 @_RequireOpenQueue
2249 - def UpdateJobUnlocked(self, job, replicate=True):
2250 """Update a job's on disk storage. 2251 2252 After a job has been modified, this function needs to be called in 2253 order to write the changes to disk and replicate them to the other 2254 nodes. 2255 2256 @type job: L{_QueuedJob} 2257 @param job: the changed job 2258 @type replicate: boolean 2259 @param replicate: whether to replicate the change to remote nodes 2260 2261 """ 2262 if __debug__: 2263 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 2264 assert (finalized ^ (job.end_timestamp is None)) 2265 assert job.writable, "Can't update read-only job" 2266 assert not job.archived, "Can't update archived job" 2267 2268 filename = self._GetJobPath(job.id) 2269 data = serializer.DumpJson(job.Serialize()) 2270 logging.debug("Writing job %s to %s", job.id, filename) 2271 self._UpdateJobQueueFile(filename, data, replicate)
2272
2273 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 2274 timeout):
2275 """Waits for changes in a job. 2276 2277 @type job_id: int 2278 @param job_id: Job identifier 2279 @type fields: list of strings 2280 @param fields: Which fields to check for changes 2281 @type prev_job_info: list or None 2282 @param prev_job_info: Last job information returned 2283 @type prev_log_serial: int 2284 @param prev_log_serial: Last job message serial number 2285 @type timeout: float 2286 @param timeout: maximum time to wait in seconds 2287 @rtype: tuple (job info, log entries) 2288 @return: a tuple of the job information as required via 2289 the fields parameter, and the log entries as a list 2290 2291 if the job has not changed and the timeout has expired, 2292 we instead return a special value, 2293 L{constants.JOB_NOTCHANGED}, which should be interpreted 2294 as such by the clients 2295 2296 """ 2297 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True, 2298 writable=False) 2299 2300 helper = _WaitForJobChangesHelper() 2301 2302 return helper(self._GetJobPath(job_id), load_fn, 2303 fields, prev_job_info, prev_log_serial, timeout)
2304 2305 @locking.ssynchronized(_LOCK) 2306 @_RequireOpenQueue
2307 - def CancelJob(self, job_id):
2308 """Cancels a job. 2309 2310 This will only succeed if the job has not started yet. 2311 2312 @type job_id: int 2313 @param job_id: job ID of job to be cancelled. 2314 2315 """ 2316 logging.info("Cancelling job %s", job_id) 2317 2318 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2319 2320 @locking.ssynchronized(_LOCK) 2321 @_RequireOpenQueue
2322 - def ChangeJobPriority(self, job_id, priority):
2323 """Changes a job's priority. 2324 2325 @type job_id: int 2326 @param job_id: ID of the job whose priority should be changed 2327 @type priority: int 2328 @param priority: New priority 2329 2330 """ 2331 logging.info("Changing priority of job %s to %s", job_id, priority) 2332 2333 if priority not in constants.OP_PRIO_SUBMIT_VALID: 2334 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2335 raise errors.GenericError("Invalid priority %s, allowed are %s" % 2336 (priority, allowed)) 2337 2338 def fn(job): 2339 (success, msg) = job.ChangePriority(priority) 2340 2341 if success: 2342 try: 2343 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) 2344 except workerpool.NoSuchTask: 2345 logging.debug("Job %s is not in workerpool at this time", job.id) 2346 2347 return (success, msg)
2348 2349 return self._ModifyJobUnlocked(job_id, fn)
2350
2351 - def _ModifyJobUnlocked(self, job_id, mod_fn):
2352 """Modifies a job. 2353 2354 @type job_id: int 2355 @param job_id: Job ID 2356 @type mod_fn: callable 2357 @param mod_fn: Modifying function, receiving job object as parameter, 2358 returning tuple of (status boolean, message string) 2359 2360 """ 2361 job = self._LoadJobUnlocked(job_id) 2362 if not job: 2363 logging.debug("Job %s not found", job_id) 2364 return (False, "Job %s not found" % job_id) 2365 2366 assert job.writable, "Can't modify read-only job" 2367 assert not job.archived, "Can't modify archived job" 2368 2369 (success, msg) = mod_fn(job) 2370 2371 if success: 2372 # If the job was finalized (e.g. cancelled), this is the final write 2373 # allowed. The job can be archived anytime. 2374 self.UpdateJobUnlocked(job) 2375 2376 return (success, msg)
2377 2378 @_RequireOpenQueue
2379 - def _ArchiveJobsUnlocked(self, jobs):
2380 """Archives jobs. 2381 2382 @type jobs: list of L{_QueuedJob} 2383 @param jobs: Job objects 2384 @rtype: int 2385 @return: Number of archived jobs 2386 2387 """ 2388 archive_jobs = [] 2389 rename_files = [] 2390 for job in jobs: 2391 assert job.writable, "Can't archive read-only job" 2392 assert not job.archived, "Can't cancel archived job" 2393 2394 if job.CalcStatus() not in constants.JOBS_FINALIZED: 2395 logging.debug("Job %s is not yet done", job.id) 2396 continue 2397 2398 archive_jobs.append(job) 2399 2400 old = self._GetJobPath(job.id) 2401 new = self._GetArchivedJobPath(job.id) 2402 rename_files.append((old, new)) 2403 2404 # TODO: What if 1..n files fail to rename? 2405 self._RenameFilesUnlocked(rename_files) 2406 2407 logging.debug("Successfully archived job(s) %s", 2408 utils.CommaJoin(job.id for job in archive_jobs)) 2409 2410 # Since we haven't quite checked, above, if we succeeded or failed renaming 2411 # the files, we update the cached queue size from the filesystem. When we 2412 # get around to fix the TODO: above, we can use the number of actually 2413 # archived jobs to fix this. 2414 self._UpdateQueueSizeUnlocked() 2415 return len(archive_jobs)
2416 2417 @locking.ssynchronized(_LOCK) 2418 @_RequireOpenQueue
2419 - def ArchiveJob(self, job_id):
2420 """Archives a job. 2421 2422 This is just a wrapper over L{_ArchiveJobsUnlocked}. 2423 2424 @type job_id: int 2425 @param job_id: Job ID of job to be archived. 2426 @rtype: bool 2427 @return: Whether job was archived 2428 2429 """ 2430 logging.info("Archiving job %s", job_id) 2431 2432 job = self._LoadJobUnlocked(job_id) 2433 if not job: 2434 logging.debug("Job %s not found", job_id) 2435 return False 2436 2437 return self._ArchiveJobsUnlocked([job]) == 1
2438 2439 @locking.ssynchronized(_LOCK) 2440 @_RequireOpenQueue
2441 - def AutoArchiveJobs(self, age, timeout):
2442 """Archives all jobs based on age. 2443 2444 The method will archive all jobs which are older than the age 2445 parameter. For jobs that don't have an end timestamp, the start 2446 timestamp will be considered. The special '-1' age will cause 2447 archival of all jobs (that are not running or queued). 2448 2449 @type age: int 2450 @param age: the minimum age in seconds 2451 2452 """ 2453 logging.info("Archiving jobs with age more than %s seconds", age) 2454 2455 now = time.time() 2456 end_time = now + timeout 2457 archived_count = 0 2458 last_touched = 0 2459 2460 all_job_ids = self._GetJobIDsUnlocked() 2461 pending = [] 2462 for idx, job_id in enumerate(all_job_ids): 2463 last_touched = idx + 1 2464 2465 # Not optimal because jobs could be pending 2466 # TODO: Measure average duration for job archival and take number of 2467 # pending jobs into account. 2468 if time.time() > end_time: 2469 break 2470 2471 # Returns None if the job failed to load 2472 job = self._LoadJobUnlocked(job_id) 2473 if job: 2474 if job.end_timestamp is None: 2475 if job.start_timestamp is None: 2476 job_age = job.received_timestamp 2477 else: 2478 job_age = job.start_timestamp 2479 else: 2480 job_age = job.end_timestamp 2481 2482 if age == -1 or now - job_age[0] > age: 2483 pending.append(job) 2484 2485 # Archive 10 jobs at a time 2486 if len(pending) >= 10: 2487 archived_count += self._ArchiveJobsUnlocked(pending) 2488 pending = [] 2489 2490 if pending: 2491 archived_count += self._ArchiveJobsUnlocked(pending) 2492 2493 return (archived_count, len(all_job_ids) - last_touched)
2494
2495 - def _Query(self, fields, qfilter):
2496 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, 2497 namefield="id") 2498 2499 # Archived jobs are only looked at if the "archived" field is referenced 2500 # either as a requested field or in the filter. By default archived jobs 2501 # are ignored. 2502 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData()) 2503 2504 job_ids = qobj.RequestedNames() 2505 2506 list_all = (job_ids is None) 2507 2508 if list_all: 2509 # Since files are added to/removed from the queue atomically, there's no 2510 # risk of getting the job ids in an inconsistent state. 2511 job_ids = self._GetJobIDsUnlocked(archived=include_archived) 2512 2513 jobs = [] 2514 2515 for job_id in job_ids: 2516 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2517 if job is not None or not list_all: 2518 jobs.append((job_id, job)) 2519 2520 return (qobj, jobs, list_all)
2521
2522 - def QueryJobs(self, fields, qfilter):
2523 """Returns a list of jobs in queue. 2524 2525 @type fields: sequence 2526 @param fields: List of wanted fields 2527 @type qfilter: None or query2 filter (list) 2528 @param qfilter: Query filter 2529 2530 """ 2531 (qobj, ctx, _) = self._Query(fields, qfilter) 2532 2533 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2534
2535 - def OldStyleQueryJobs(self, job_ids, fields):
2536 """Returns a list of jobs in queue. 2537 2538 @type job_ids: list 2539 @param job_ids: sequence of job identifiers or None for all 2540 @type fields: list 2541 @param fields: names of fields to return 2542 @rtype: list 2543 @return: list one element per job, each element being list with 2544 the requested fields 2545 2546 """ 2547 # backwards compat: 2548 job_ids = [int(jid) for jid in job_ids] 2549 qfilter = qlang.MakeSimpleFilter("id", job_ids) 2550 2551 (qobj, ctx, _) = self._Query(fields, qfilter) 2552 2553 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2554 2555 @locking.ssynchronized(_LOCK)
2556 - def PrepareShutdown(self):
2557 """Prepare to stop the job queue. 2558 2559 Disables execution of jobs in the workerpool and returns whether there are 2560 any jobs currently running. If the latter is the case, the job queue is not 2561 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can 2562 be called without interfering with any job. Queued and unfinished jobs will 2563 be resumed next time. 2564 2565 Once this function has been called no new job submissions will be accepted 2566 (see L{_RequireNonDrainedQueue}). 2567 2568 @rtype: bool 2569 @return: Whether there are any running jobs 2570 2571 """ 2572 if self._accepting_jobs: 2573 self._accepting_jobs = False 2574 2575 # Tell worker pool to stop processing pending tasks 2576 self._wpool.SetActive(False) 2577 2578 return self._wpool.HasRunningTasks()
2579
2580 - def AcceptingJobsUnlocked(self):
2581 """Returns whether jobs are accepted. 2582 2583 Once L{PrepareShutdown} has been called, no new jobs are accepted and the 2584 queue is shutting down. 2585 2586 @rtype: bool 2587 2588 """ 2589 return self._accepting_jobs
2590 2591 @locking.ssynchronized(_LOCK) 2592 @_RequireOpenQueue
2593 - def Shutdown(self):
2594 """Stops the job queue. 2595 2596 This shutdowns all the worker threads an closes the queue. 2597 2598 """ 2599 self._wpool.TerminateWorkers() 2600 2601 self._queue_filelock.Close() 2602 self._queue_filelock = None
2603