Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the job queue handling. 
  23   
  24  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  25  used by all other classes in this module. 
  26   
  27  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  28      processing jobs 
  29   
  30  """ 
  31   
  32  import logging 
  33  import errno 
  34  import time 
  35  import weakref 
  36  import threading 
  37  import itertools 
  38  import operator 
  39   
  40  try: 
  41    # pylint: disable=E0611 
  42    from pyinotify import pyinotify 
  43  except ImportError: 
  44    import pyinotify 
  45   
  46  from ganeti import asyncnotifier 
  47  from ganeti import constants 
  48  from ganeti import serializer 
  49  from ganeti import workerpool 
  50  from ganeti import locking 
  51  from ganeti import opcodes 
  52  from ganeti import errors 
  53  from ganeti import mcpu 
  54  from ganeti import utils 
  55  from ganeti import jstore 
  56  from ganeti import rpc 
  57  from ganeti import runtime 
  58  from ganeti import netutils 
  59  from ganeti import compat 
  60  from ganeti import ht 
  61  from ganeti import query 
  62  from ganeti import qlang 
  63  from ganeti import pathutils 
  64  from ganeti import vcluster 
  65   
  66   
  67  JOBQUEUE_THREADS = 25 
  68   
  69  # member lock names to be passed to @ssynchronized decorator 
  70  _LOCK = "_lock" 
  71  _QUEUE = "_queue" 
  72   
  73  #: Retrieves "id" attribute 
  74  _GetIdAttr = operator.attrgetter("id") 
75 76 77 -class CancelJob(Exception):
78 """Special exception to cancel a job. 79 80 """
81
82 83 -class QueueShutdown(Exception):
84 """Special exception to abort a job when the job queue is shutting down. 85 86 """
87
88 89 -def TimeStampNow():
90 """Returns the current timestamp. 91 92 @rtype: tuple 93 @return: the current time in the (seconds, microseconds) format 94 95 """ 96 return utils.SplitTime(time.time())
97
98 99 -def _CallJqUpdate(runner, names, file_name, content):
100 """Updates job queue file after virtualizing filename. 101 102 """ 103 virt_file_name = vcluster.MakeVirtualPath(file_name) 104 return runner.call_jobqueue_update(names, virt_file_name, content)
105
106 107 -class _SimpleJobQuery:
108 """Wrapper for job queries. 109 110 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}. 111 112 """
113 - def __init__(self, fields):
114 """Initializes this class. 115 116 """ 117 self._query = query.Query(query.JOB_FIELDS, fields)
118
119 - def __call__(self, job):
120 """Executes a job query using cached field list. 121 122 """ 123 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124
125 126 -class _QueuedOpCode(object):
127 """Encapsulates an opcode object. 128 129 @ivar log: holds the execution log and consists of tuples 130 of the form C{(log_serial, timestamp, level, message)} 131 @ivar input: the OpCode we encapsulate 132 @ivar status: the current status 133 @ivar result: the result of the LU execution 134 @ivar start_timestamp: timestamp for the start of the execution 135 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 136 @ivar stop_timestamp: timestamp for the end of the execution 137 138 """ 139 __slots__ = ["input", "status", "result", "log", "priority", 140 "start_timestamp", "exec_timestamp", "end_timestamp", 141 "__weakref__"] 142
143 - def __init__(self, op):
144 """Initializes instances of this class. 145 146 @type op: L{opcodes.OpCode} 147 @param op: the opcode we encapsulate 148 149 """ 150 self.input = op 151 self.status = constants.OP_STATUS_QUEUED 152 self.result = None 153 self.log = [] 154 self.start_timestamp = None 155 self.exec_timestamp = None 156 self.end_timestamp = None 157 158 # Get initial priority (it might change during the lifetime of this opcode) 159 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160 161 @classmethod
162 - def Restore(cls, state):
163 """Restore the _QueuedOpCode from the serialized form. 164 165 @type state: dict 166 @param state: the serialized state 167 @rtype: _QueuedOpCode 168 @return: a new _QueuedOpCode instance 169 170 """ 171 obj = _QueuedOpCode.__new__(cls) 172 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 173 obj.status = state["status"] 174 obj.result = state["result"] 175 obj.log = state["log"] 176 obj.start_timestamp = state.get("start_timestamp", None) 177 obj.exec_timestamp = state.get("exec_timestamp", None) 178 obj.end_timestamp = state.get("end_timestamp", None) 179 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 180 return obj
181
182 - def Serialize(self):
183 """Serializes this _QueuedOpCode. 184 185 @rtype: dict 186 @return: the dictionary holding the serialized state 187 188 """ 189 return { 190 "input": self.input.__getstate__(), 191 "status": self.status, 192 "result": self.result, 193 "log": self.log, 194 "start_timestamp": self.start_timestamp, 195 "exec_timestamp": self.exec_timestamp, 196 "end_timestamp": self.end_timestamp, 197 "priority": self.priority, 198 }
199
200 201 -class _QueuedJob(object):
202 """In-memory job representation. 203 204 This is what we use to track the user-submitted jobs. Locking must 205 be taken care of by users of this class. 206 207 @type queue: L{JobQueue} 208 @ivar queue: the parent queue 209 @ivar id: the job ID 210 @type ops: list 211 @ivar ops: the list of _QueuedOpCode that constitute the job 212 @type log_serial: int 213 @ivar log_serial: holds the index for the next log entry 214 @ivar received_timestamp: the timestamp for when the job was received 215 @ivar start_timestmap: the timestamp for start of execution 216 @ivar end_timestamp: the timestamp for end of execution 217 @ivar writable: Whether the job is allowed to be modified 218 219 """ 220 # pylint: disable=W0212 221 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 222 "received_timestamp", "start_timestamp", "end_timestamp", 223 "__weakref__", "processor_lock", "writable", "archived"] 224
225 - def _AddReasons(self):
226 """Extend the reason trail 227 228 Add the reason for all the opcodes of this job to be executed. 229 230 """ 231 count = 0 232 for queued_op in self.ops: 233 op = queued_op.input 234 reason_src = opcodes.NameToReasonSrc(op.__class__.__name__) 235 reason_text = "job=%d;index=%d" % (self.id, count) 236 reason = getattr(op, "reason", []) 237 reason.append((reason_src, reason_text, utils.EpochNano())) 238 op.reason = reason 239 count = count + 1
240
241 - def __init__(self, queue, job_id, ops, writable):
242 """Constructor for the _QueuedJob. 243 244 @type queue: L{JobQueue} 245 @param queue: our parent queue 246 @type job_id: job_id 247 @param job_id: our job id 248 @type ops: list 249 @param ops: the list of opcodes we hold, which will be encapsulated 250 in _QueuedOpCodes 251 @type writable: bool 252 @param writable: Whether job can be modified 253 254 """ 255 if not ops: 256 raise errors.GenericError("A job needs at least one opcode") 257 258 self.queue = queue 259 self.id = int(job_id) 260 self.ops = [_QueuedOpCode(op) for op in ops] 261 self._AddReasons() 262 self.log_serial = 0 263 self.received_timestamp = TimeStampNow() 264 self.start_timestamp = None 265 self.end_timestamp = None 266 self.archived = False 267 268 self._InitInMemory(self, writable) 269 270 assert not self.archived, "New jobs can not be marked as archived"
271 272 @staticmethod
273 - def _InitInMemory(obj, writable):
274 """Initializes in-memory variables. 275 276 """ 277 obj.writable = writable 278 obj.ops_iter = None 279 obj.cur_opctx = None 280 281 # Read-only jobs are not processed and therefore don't need a lock 282 if writable: 283 obj.processor_lock = threading.Lock() 284 else: 285 obj.processor_lock = None
286
287 - def __repr__(self):
288 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 289 "id=%s" % self.id, 290 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 291 292 return "<%s at %#x>" % (" ".join(status), id(self))
293 294 @classmethod
295 - def Restore(cls, queue, state, writable, archived):
296 """Restore a _QueuedJob from serialized state: 297 298 @type queue: L{JobQueue} 299 @param queue: to which queue the restored job belongs 300 @type state: dict 301 @param state: the serialized state 302 @type writable: bool 303 @param writable: Whether job can be modified 304 @type archived: bool 305 @param archived: Whether job was already archived 306 @rtype: _JobQueue 307 @return: the restored _JobQueue instance 308 309 """ 310 obj = _QueuedJob.__new__(cls) 311 obj.queue = queue 312 obj.id = int(state["id"]) 313 obj.received_timestamp = state.get("received_timestamp", None) 314 obj.start_timestamp = state.get("start_timestamp", None) 315 obj.end_timestamp = state.get("end_timestamp", None) 316 obj.archived = archived 317 318 obj.ops = [] 319 obj.log_serial = 0 320 for op_state in state["ops"]: 321 op = _QueuedOpCode.Restore(op_state) 322 for log_entry in op.log: 323 obj.log_serial = max(obj.log_serial, log_entry[0]) 324 obj.ops.append(op) 325 326 cls._InitInMemory(obj, writable) 327 328 return obj
329
330 - def Serialize(self):
331 """Serialize the _JobQueue instance. 332 333 @rtype: dict 334 @return: the serialized state 335 336 """ 337 return { 338 "id": self.id, 339 "ops": [op.Serialize() for op in self.ops], 340 "start_timestamp": self.start_timestamp, 341 "end_timestamp": self.end_timestamp, 342 "received_timestamp": self.received_timestamp, 343 }
344
345 - def CalcStatus(self):
346 """Compute the status of this job. 347 348 This function iterates over all the _QueuedOpCodes in the job and 349 based on their status, computes the job status. 350 351 The algorithm is: 352 - if we find a cancelled, or finished with error, the job 353 status will be the same 354 - otherwise, the last opcode with the status one of: 355 - waitlock 356 - canceling 357 - running 358 359 will determine the job status 360 361 - otherwise, it means either all opcodes are queued, or success, 362 and the job status will be the same 363 364 @return: the job status 365 366 """ 367 status = constants.JOB_STATUS_QUEUED 368 369 all_success = True 370 for op in self.ops: 371 if op.status == constants.OP_STATUS_SUCCESS: 372 continue 373 374 all_success = False 375 376 if op.status == constants.OP_STATUS_QUEUED: 377 pass 378 elif op.status == constants.OP_STATUS_WAITING: 379 status = constants.JOB_STATUS_WAITING 380 elif op.status == constants.OP_STATUS_RUNNING: 381 status = constants.JOB_STATUS_RUNNING 382 elif op.status == constants.OP_STATUS_CANCELING: 383 status = constants.JOB_STATUS_CANCELING 384 break 385 elif op.status == constants.OP_STATUS_ERROR: 386 status = constants.JOB_STATUS_ERROR 387 # The whole job fails if one opcode failed 388 break 389 elif op.status == constants.OP_STATUS_CANCELED: 390 status = constants.OP_STATUS_CANCELED 391 break 392 393 if all_success: 394 status = constants.JOB_STATUS_SUCCESS 395 396 return status
397
398 - def CalcPriority(self):
399 """Gets the current priority for this job. 400 401 Only unfinished opcodes are considered. When all are done, the default 402 priority is used. 403 404 @rtype: int 405 406 """ 407 priorities = [op.priority for op in self.ops 408 if op.status not in constants.OPS_FINALIZED] 409 410 if not priorities: 411 # All opcodes are done, assume default priority 412 return constants.OP_PRIO_DEFAULT 413 414 return min(priorities)
415
416 - def GetLogEntries(self, newer_than):
417 """Selectively returns the log entries. 418 419 @type newer_than: None or int 420 @param newer_than: if this is None, return all log entries, 421 otherwise return only the log entries with serial higher 422 than this value 423 @rtype: list 424 @return: the list of the log entries selected 425 426 """ 427 if newer_than is None: 428 serial = -1 429 else: 430 serial = newer_than 431 432 entries = [] 433 for op in self.ops: 434 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 435 436 return entries
437
438 - def GetInfo(self, fields):
439 """Returns information about a job. 440 441 @type fields: list 442 @param fields: names of fields to return 443 @rtype: list 444 @return: list with one element for each field 445 @raise errors.OpExecError: when an invalid field 446 has been passed 447 448 """ 449 return _SimpleJobQuery(fields)(self)
450
451 - def MarkUnfinishedOps(self, status, result):
452 """Mark unfinished opcodes with a given status and result. 453 454 This is an utility function for marking all running or waiting to 455 be run opcodes with a given status. Opcodes which are already 456 finalised are not changed. 457 458 @param status: a given opcode status 459 @param result: the opcode result 460 461 """ 462 not_marked = True 463 for op in self.ops: 464 if op.status in constants.OPS_FINALIZED: 465 assert not_marked, "Finalized opcodes found after non-finalized ones" 466 continue 467 op.status = status 468 op.result = result 469 not_marked = False
470
471 - def Finalize(self):
472 """Marks the job as finalized. 473 474 """ 475 self.end_timestamp = TimeStampNow()
476
477 - def Cancel(self):
478 """Marks job as canceled/-ing if possible. 479 480 @rtype: tuple; (bool, string) 481 @return: Boolean describing whether job was successfully canceled or marked 482 as canceling and a text message 483 484 """ 485 status = self.CalcStatus() 486 487 if status == constants.JOB_STATUS_QUEUED: 488 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 489 "Job canceled by request") 490 self.Finalize() 491 return (True, "Job %s canceled" % self.id) 492 493 elif status == constants.JOB_STATUS_WAITING: 494 # The worker will notice the new status and cancel the job 495 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 496 return (True, "Job %s will be canceled" % self.id) 497 498 else: 499 logging.debug("Job %s is no longer waiting in the queue", self.id) 500 return (False, "Job %s is no longer waiting in the queue" % self.id)
501
502 - def ChangePriority(self, priority):
503 """Changes the job priority. 504 505 @type priority: int 506 @param priority: New priority 507 @rtype: tuple; (bool, string) 508 @return: Boolean describing whether job's priority was successfully changed 509 and a text message 510 511 """ 512 status = self.CalcStatus() 513 514 if status in constants.JOBS_FINALIZED: 515 return (False, "Job %s is finished" % self.id) 516 elif status == constants.JOB_STATUS_CANCELING: 517 return (False, "Job %s is cancelling" % self.id) 518 else: 519 assert status in (constants.JOB_STATUS_QUEUED, 520 constants.JOB_STATUS_WAITING, 521 constants.JOB_STATUS_RUNNING) 522 523 changed = False 524 for op in self.ops: 525 if (op.status == constants.OP_STATUS_RUNNING or 526 op.status in constants.OPS_FINALIZED): 527 assert not changed, \ 528 ("Found opcode for which priority should not be changed after" 529 " priority has been changed for previous opcodes") 530 continue 531 532 assert op.status in (constants.OP_STATUS_QUEUED, 533 constants.OP_STATUS_WAITING) 534 535 changed = True 536 537 # Set new priority (doesn't modify opcode input) 538 op.priority = priority 539 540 if changed: 541 return (True, ("Priorities of pending opcodes for job %s have been" 542 " changed to %s" % (self.id, priority))) 543 else: 544 return (False, "Job %s had no pending opcodes" % self.id)
545
546 547 -class _OpExecCallbacks(mcpu.OpExecCbBase):
548 - def __init__(self, queue, job, op):
549 """Initializes this class. 550 551 @type queue: L{JobQueue} 552 @param queue: Job queue 553 @type job: L{_QueuedJob} 554 @param job: Job object 555 @type op: L{_QueuedOpCode} 556 @param op: OpCode 557 558 """ 559 assert queue, "Queue is missing" 560 assert job, "Job is missing" 561 assert op, "Opcode is missing" 562 563 self._queue = queue 564 self._job = job 565 self._op = op
566
567 - def _CheckCancel(self):
568 """Raises an exception to cancel the job if asked to. 569 570 """ 571 # Cancel here if we were asked to 572 if self._op.status == constants.OP_STATUS_CANCELING: 573 logging.debug("Canceling opcode") 574 raise CancelJob() 575 576 # See if queue is shutting down 577 if not self._queue.AcceptingJobsUnlocked(): 578 logging.debug("Queue is shutting down") 579 raise QueueShutdown()
580 581 @locking.ssynchronized(_QUEUE, shared=1)
582 - def NotifyStart(self):
583 """Mark the opcode as running, not lock-waiting. 584 585 This is called from the mcpu code as a notifier function, when the LU is 586 finally about to start the Exec() method. Of course, to have end-user 587 visible results, the opcode must be initially (before calling into 588 Processor.ExecOpCode) set to OP_STATUS_WAITING. 589 590 """ 591 assert self._op in self._job.ops 592 assert self._op.status in (constants.OP_STATUS_WAITING, 593 constants.OP_STATUS_CANCELING) 594 595 # Cancel here if we were asked to 596 self._CheckCancel() 597 598 logging.debug("Opcode is now running") 599 600 self._op.status = constants.OP_STATUS_RUNNING 601 self._op.exec_timestamp = TimeStampNow() 602 603 # And finally replicate the job status 604 self._queue.UpdateJobUnlocked(self._job)
605 606 @locking.ssynchronized(_QUEUE, shared=1)
607 - def _AppendFeedback(self, timestamp, log_type, log_msg):
608 """Internal feedback append function, with locks 609 610 """ 611 self._job.log_serial += 1 612 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 613 self._queue.UpdateJobUnlocked(self._job, replicate=False)
614
615 - def Feedback(self, *args):
616 """Append a log entry. 617 618 """ 619 assert len(args) < 3 620 621 if len(args) == 1: 622 log_type = constants.ELOG_MESSAGE 623 log_msg = args[0] 624 else: 625 (log_type, log_msg) = args 626 627 # The time is split to make serialization easier and not lose 628 # precision. 629 timestamp = utils.SplitTime(time.time()) 630 self._AppendFeedback(timestamp, log_type, log_msg)
631
632 - def CurrentPriority(self):
633 """Returns current priority for opcode. 634 635 """ 636 assert self._op.status in (constants.OP_STATUS_WAITING, 637 constants.OP_STATUS_CANCELING) 638 639 # Cancel here if we were asked to 640 self._CheckCancel() 641 642 return self._op.priority
643
644 - def SubmitManyJobs(self, jobs):
645 """Submits jobs for processing. 646 647 See L{JobQueue.SubmitManyJobs}. 648 649 """ 650 # Locking is done in job queue 651 return self._queue.SubmitManyJobs(jobs)
652
653 654 -class _JobChangesChecker(object):
655 - def __init__(self, fields, prev_job_info, prev_log_serial):
656 """Initializes this class. 657 658 @type fields: list of strings 659 @param fields: Fields requested by LUXI client 660 @type prev_job_info: string 661 @param prev_job_info: previous job info, as passed by the LUXI client 662 @type prev_log_serial: string 663 @param prev_log_serial: previous job serial, as passed by the LUXI client 664 665 """ 666 self._squery = _SimpleJobQuery(fields) 667 self._prev_job_info = prev_job_info 668 self._prev_log_serial = prev_log_serial
669
670 - def __call__(self, job):
671 """Checks whether job has changed. 672 673 @type job: L{_QueuedJob} 674 @param job: Job object 675 676 """ 677 assert not job.writable, "Expected read-only job" 678 679 status = job.CalcStatus() 680 job_info = self._squery(job) 681 log_entries = job.GetLogEntries(self._prev_log_serial) 682 683 # Serializing and deserializing data can cause type changes (e.g. from 684 # tuple to list) or precision loss. We're doing it here so that we get 685 # the same modifications as the data received from the client. Without 686 # this, the comparison afterwards might fail without the data being 687 # significantly different. 688 # TODO: we just deserialized from disk, investigate how to make sure that 689 # the job info and log entries are compatible to avoid this further step. 690 # TODO: Doing something like in testutils.py:UnifyValueType might be more 691 # efficient, though floats will be tricky 692 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 693 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 694 695 # Don't even try to wait if the job is no longer running, there will be 696 # no changes. 697 if (status not in (constants.JOB_STATUS_QUEUED, 698 constants.JOB_STATUS_RUNNING, 699 constants.JOB_STATUS_WAITING) or 700 job_info != self._prev_job_info or 701 (log_entries and self._prev_log_serial != log_entries[0][0])): 702 logging.debug("Job %s changed", job.id) 703 return (job_info, log_entries) 704 705 return None
706
707 708 -class _JobFileChangesWaiter(object):
709 - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
710 """Initializes this class. 711 712 @type filename: string 713 @param filename: Path to job file 714 @raises errors.InotifyError: if the notifier cannot be setup 715 716 """ 717 self._wm = _inotify_wm_cls() 718 self._inotify_handler = \ 719 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) 720 self._notifier = \ 721 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) 722 try: 723 self._inotify_handler.enable() 724 except Exception: 725 # pyinotify doesn't close file descriptors automatically 726 self._notifier.stop() 727 raise
728
729 - def _OnInotify(self, notifier_enabled):
730 """Callback for inotify. 731 732 """ 733 if not notifier_enabled: 734 self._inotify_handler.enable()
735
736 - def Wait(self, timeout):
737 """Waits for the job file to change. 738 739 @type timeout: float 740 @param timeout: Timeout in seconds 741 @return: Whether there have been events 742 743 """ 744 assert timeout >= 0 745 have_events = self._notifier.check_events(timeout * 1000) 746 if have_events: 747 self._notifier.read_events() 748 self._notifier.process_events() 749 return have_events
750
751 - def Close(self):
752 """Closes underlying notifier and its file descriptor. 753 754 """ 755 self._notifier.stop()
756
757 758 -class _JobChangesWaiter(object):
759 - def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
760 """Initializes this class. 761 762 @type filename: string 763 @param filename: Path to job file 764 765 """ 766 self._filewaiter = None 767 self._filename = filename 768 self._waiter_cls = _waiter_cls
769
770 - def Wait(self, timeout):
771 """Waits for a job to change. 772 773 @type timeout: float 774 @param timeout: Timeout in seconds 775 @return: Whether there have been events 776 777 """ 778 if self._filewaiter: 779 return self._filewaiter.Wait(timeout) 780 781 # Lazy setup: Avoid inotify setup cost when job file has already changed. 782 # If this point is reached, return immediately and let caller check the job 783 # file again in case there were changes since the last check. This avoids a 784 # race condition. 785 self._filewaiter = self._waiter_cls(self._filename) 786 787 return True
788
789 - def Close(self):
790 """Closes underlying waiter. 791 792 """ 793 if self._filewaiter: 794 self._filewaiter.Close()
795
796 797 -class _WaitForJobChangesHelper(object):
798 """Helper class using inotify to wait for changes in a job file. 799 800 This class takes a previous job status and serial, and alerts the client when 801 the current job status has changed. 802 803 """ 804 @staticmethod
805 - def _CheckForChanges(counter, job_load_fn, check_fn):
806 if counter.next() > 0: 807 # If this isn't the first check the job is given some more time to change 808 # again. This gives better performance for jobs generating many 809 # changes/messages. 810 time.sleep(0.1) 811 812 job = job_load_fn() 813 if not job: 814 raise errors.JobLost() 815 816 result = check_fn(job) 817 if result is None: 818 raise utils.RetryAgain() 819 820 return result
821
822 - def __call__(self, filename, job_load_fn, 823 fields, prev_job_info, prev_log_serial, timeout, 824 _waiter_cls=_JobChangesWaiter):
825 """Waits for changes on a job. 826 827 @type filename: string 828 @param filename: File on which to wait for changes 829 @type job_load_fn: callable 830 @param job_load_fn: Function to load job 831 @type fields: list of strings 832 @param fields: Which fields to check for changes 833 @type prev_job_info: list or None 834 @param prev_job_info: Last job information returned 835 @type prev_log_serial: int 836 @param prev_log_serial: Last job message serial number 837 @type timeout: float 838 @param timeout: maximum time to wait in seconds 839 840 """ 841 counter = itertools.count() 842 try: 843 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) 844 waiter = _waiter_cls(filename) 845 try: 846 return utils.Retry(compat.partial(self._CheckForChanges, 847 counter, job_load_fn, check_fn), 848 utils.RETRY_REMAINING_TIME, timeout, 849 wait_fn=waiter.Wait) 850 finally: 851 waiter.Close() 852 except errors.JobLost: 853 return None 854 except utils.RetryTimeout: 855 return constants.JOB_NOTCHANGED
856
857 858 -def _EncodeOpError(err):
859 """Encodes an error which occurred while processing an opcode. 860 861 """ 862 if isinstance(err, errors.GenericError): 863 to_encode = err 864 else: 865 to_encode = errors.OpExecError(str(err)) 866 867 return errors.EncodeException(to_encode)
868
869 870 -class _TimeoutStrategyWrapper:
871 - def __init__(self, fn):
872 """Initializes this class. 873 874 """ 875 self._fn = fn 876 self._next = None
877
878 - def _Advance(self):
879 """Gets the next timeout if necessary. 880 881 """ 882 if self._next is None: 883 self._next = self._fn()
884
885 - def Peek(self):
886 """Returns the next timeout. 887 888 """ 889 self._Advance() 890 return self._next
891
892 - def Next(self):
893 """Returns the current timeout and advances the internal state. 894 895 """ 896 self._Advance() 897 result = self._next 898 self._next = None 899 return result
900
901 902 -class _OpExecContext:
903 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
904 """Initializes this class. 905 906 """ 907 self.op = op 908 self.index = index 909 self.log_prefix = log_prefix 910 self.summary = op.input.Summary() 911 912 # Create local copy to modify 913 if getattr(op.input, opcodes.DEPEND_ATTR, None): 914 self.jobdeps = op.input.depends[:] 915 else: 916 self.jobdeps = None 917 918 self._timeout_strategy_factory = timeout_strategy_factory 919 self._ResetTimeoutStrategy()
920
921 - def _ResetTimeoutStrategy(self):
922 """Creates a new timeout strategy. 923 924 """ 925 self._timeout_strategy = \ 926 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
927
928 - def CheckPriorityIncrease(self):
929 """Checks whether priority can and should be increased. 930 931 Called when locks couldn't be acquired. 932 933 """ 934 op = self.op 935 936 # Exhausted all retries and next round should not use blocking acquire 937 # for locks? 938 if (self._timeout_strategy.Peek() is None and 939 op.priority > constants.OP_PRIO_HIGHEST): 940 logging.debug("Increasing priority") 941 op.priority -= 1 942 self._ResetTimeoutStrategy() 943 return True 944 945 return False
946
947 - def GetNextLockTimeout(self):
948 """Returns the next lock acquire timeout. 949 950 """ 951 return self._timeout_strategy.Next()
952
953 954 -class _JobProcessor(object):
955 (DEFER, 956 WAITDEP, 957 FINISHED) = range(1, 4) 958
959 - def __init__(self, queue, opexec_fn, job, 960 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
961 """Initializes this class. 962 963 """ 964 self.queue = queue 965 self.opexec_fn = opexec_fn 966 self.job = job 967 self._timeout_strategy_factory = _timeout_strategy_factory
968 969 @staticmethod
970 - def _FindNextOpcode(job, timeout_strategy_factory):
971 """Locates the next opcode to run. 972 973 @type job: L{_QueuedJob} 974 @param job: Job object 975 @param timeout_strategy_factory: Callable to create new timeout strategy 976 977 """ 978 # Create some sort of a cache to speed up locating next opcode for future 979 # lookups 980 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 981 # pending and one for processed ops. 982 if job.ops_iter is None: 983 job.ops_iter = enumerate(job.ops) 984 985 # Find next opcode to run 986 while True: 987 try: 988 (idx, op) = job.ops_iter.next() 989 except StopIteration: 990 raise errors.ProgrammerError("Called for a finished job") 991 992 if op.status == constants.OP_STATUS_RUNNING: 993 # Found an opcode already marked as running 994 raise errors.ProgrammerError("Called for job marked as running") 995 996 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 997 timeout_strategy_factory) 998 999 if op.status not in constants.OPS_FINALIZED: 1000 return opctx 1001 1002 # This is a job that was partially completed before master daemon 1003 # shutdown, so it can be expected that some opcodes are already 1004 # completed successfully (if any did error out, then the whole job 1005 # should have been aborted and not resubmitted for processing). 1006 logging.info("%s: opcode %s already processed, skipping", 1007 opctx.log_prefix, opctx.summary)
1008 1009 @staticmethod
1010 - def _MarkWaitlock(job, op):
1011 """Marks an opcode as waiting for locks. 1012 1013 The job's start timestamp is also set if necessary. 1014 1015 @type job: L{_QueuedJob} 1016 @param job: Job object 1017 @type op: L{_QueuedOpCode} 1018 @param op: Opcode object 1019 1020 """ 1021 assert op in job.ops 1022 assert op.status in (constants.OP_STATUS_QUEUED, 1023 constants.OP_STATUS_WAITING) 1024 1025 update = False 1026 1027 op.result = None 1028 1029 if op.status == constants.OP_STATUS_QUEUED: 1030 op.status = constants.OP_STATUS_WAITING 1031 update = True 1032 1033 if op.start_timestamp is None: 1034 op.start_timestamp = TimeStampNow() 1035 update = True 1036 1037 if job.start_timestamp is None: 1038 job.start_timestamp = op.start_timestamp 1039 update = True 1040 1041 assert op.status == constants.OP_STATUS_WAITING 1042 1043 return update
1044 1045 @staticmethod
1046 - def _CheckDependencies(queue, job, opctx):
1047 """Checks if an opcode has dependencies and if so, processes them. 1048 1049 @type queue: L{JobQueue} 1050 @param queue: Queue object 1051 @type job: L{_QueuedJob} 1052 @param job: Job object 1053 @type opctx: L{_OpExecContext} 1054 @param opctx: Opcode execution context 1055 @rtype: bool 1056 @return: Whether opcode will be re-scheduled by dependency tracker 1057 1058 """ 1059 op = opctx.op 1060 1061 result = False 1062 1063 while opctx.jobdeps: 1064 (dep_job_id, dep_status) = opctx.jobdeps[0] 1065 1066 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 1067 dep_status) 1068 assert ht.TNonEmptyString(depmsg), "No dependency message" 1069 1070 logging.info("%s: %s", opctx.log_prefix, depmsg) 1071 1072 if depresult == _JobDependencyManager.CONTINUE: 1073 # Remove dependency and continue 1074 opctx.jobdeps.pop(0) 1075 1076 elif depresult == _JobDependencyManager.WAIT: 1077 # Need to wait for notification, dependency tracker will re-add job 1078 # to workerpool 1079 result = True 1080 break 1081 1082 elif depresult == _JobDependencyManager.CANCEL: 1083 # Job was cancelled, cancel this job as well 1084 job.Cancel() 1085 assert op.status == constants.OP_STATUS_CANCELING 1086 break 1087 1088 elif depresult in (_JobDependencyManager.WRONGSTATUS, 1089 _JobDependencyManager.ERROR): 1090 # Job failed or there was an error, this job must fail 1091 op.status = constants.OP_STATUS_ERROR 1092 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 1093 break 1094 1095 else: 1096 raise errors.ProgrammerError("Unknown dependency result '%s'" % 1097 depresult) 1098 1099 return result
1100
1101 - def _ExecOpCodeUnlocked(self, opctx):
1102 """Processes one opcode and returns the result. 1103 1104 """ 1105 op = opctx.op 1106 1107 assert op.status in (constants.OP_STATUS_WAITING, 1108 constants.OP_STATUS_CANCELING) 1109 1110 # The very last check if the job was cancelled before trying to execute 1111 if op.status == constants.OP_STATUS_CANCELING: 1112 return (constants.OP_STATUS_CANCELING, None) 1113 1114 timeout = opctx.GetNextLockTimeout() 1115 1116 try: 1117 # Make sure not to hold queue lock while calling ExecOpCode 1118 result = self.opexec_fn(op.input, 1119 _OpExecCallbacks(self.queue, self.job, op), 1120 timeout=timeout) 1121 except mcpu.LockAcquireTimeout: 1122 assert timeout is not None, "Received timeout for blocking acquire" 1123 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 1124 1125 assert op.status in (constants.OP_STATUS_WAITING, 1126 constants.OP_STATUS_CANCELING) 1127 1128 # Was job cancelled while we were waiting for the lock? 1129 if op.status == constants.OP_STATUS_CANCELING: 1130 return (constants.OP_STATUS_CANCELING, None) 1131 1132 # Queue is shutting down, return to queued 1133 if not self.queue.AcceptingJobsUnlocked(): 1134 return (constants.OP_STATUS_QUEUED, None) 1135 1136 # Stay in waitlock while trying to re-acquire lock 1137 return (constants.OP_STATUS_WAITING, None) 1138 except CancelJob: 1139 logging.exception("%s: Canceling job", opctx.log_prefix) 1140 assert op.status == constants.OP_STATUS_CANCELING 1141 return (constants.OP_STATUS_CANCELING, None) 1142 1143 except QueueShutdown: 1144 logging.exception("%s: Queue is shutting down", opctx.log_prefix) 1145 1146 assert op.status == constants.OP_STATUS_WAITING 1147 1148 # Job hadn't been started yet, so it should return to the queue 1149 return (constants.OP_STATUS_QUEUED, None) 1150 1151 except Exception, err: # pylint: disable=W0703 1152 logging.exception("%s: Caught exception in %s", 1153 opctx.log_prefix, opctx.summary) 1154 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 1155 else: 1156 logging.debug("%s: %s successful", 1157 opctx.log_prefix, opctx.summary) 1158 return (constants.OP_STATUS_SUCCESS, result)
1159
1160 - def __call__(self, _nextop_fn=None):
1161 """Continues execution of a job. 1162 1163 @param _nextop_fn: Callback function for tests 1164 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 1165 be deferred and C{WAITDEP} if the dependency manager 1166 (L{_JobDependencyManager}) will re-schedule the job when appropriate 1167 1168 """ 1169 queue = self.queue 1170 job = self.job 1171 1172 logging.debug("Processing job %s", job.id) 1173 1174 queue.acquire(shared=1) 1175 try: 1176 opcount = len(job.ops) 1177 1178 assert job.writable, "Expected writable job" 1179 1180 # Don't do anything for finalized jobs 1181 if job.CalcStatus() in constants.JOBS_FINALIZED: 1182 return self.FINISHED 1183 1184 # Is a previous opcode still pending? 1185 if job.cur_opctx: 1186 opctx = job.cur_opctx 1187 job.cur_opctx = None 1188 else: 1189 if __debug__ and _nextop_fn: 1190 _nextop_fn() 1191 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 1192 1193 op = opctx.op 1194 1195 # Consistency check 1196 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 1197 constants.OP_STATUS_CANCELING) 1198 for i in job.ops[opctx.index + 1:]) 1199 1200 assert op.status in (constants.OP_STATUS_QUEUED, 1201 constants.OP_STATUS_WAITING, 1202 constants.OP_STATUS_CANCELING) 1203 1204 assert (op.priority <= constants.OP_PRIO_LOWEST and 1205 op.priority >= constants.OP_PRIO_HIGHEST) 1206 1207 waitjob = None 1208 1209 if op.status != constants.OP_STATUS_CANCELING: 1210 assert op.status in (constants.OP_STATUS_QUEUED, 1211 constants.OP_STATUS_WAITING) 1212 1213 # Prepare to start opcode 1214 if self._MarkWaitlock(job, op): 1215 # Write to disk 1216 queue.UpdateJobUnlocked(job) 1217 1218 assert op.status == constants.OP_STATUS_WAITING 1219 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1220 assert job.start_timestamp and op.start_timestamp 1221 assert waitjob is None 1222 1223 # Check if waiting for a job is necessary 1224 waitjob = self._CheckDependencies(queue, job, opctx) 1225 1226 assert op.status in (constants.OP_STATUS_WAITING, 1227 constants.OP_STATUS_CANCELING, 1228 constants.OP_STATUS_ERROR) 1229 1230 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1231 constants.OP_STATUS_ERROR)): 1232 logging.info("%s: opcode %s waiting for locks", 1233 opctx.log_prefix, opctx.summary) 1234 1235 assert not opctx.jobdeps, "Not all dependencies were removed" 1236 1237 queue.release() 1238 try: 1239 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1240 finally: 1241 queue.acquire(shared=1) 1242 1243 op.status = op_status 1244 op.result = op_result 1245 1246 assert not waitjob 1247 1248 if op.status in (constants.OP_STATUS_WAITING, 1249 constants.OP_STATUS_QUEUED): 1250 # waiting: Couldn't get locks in time 1251 # queued: Queue is shutting down 1252 assert not op.end_timestamp 1253 else: 1254 # Finalize opcode 1255 op.end_timestamp = TimeStampNow() 1256 1257 if op.status == constants.OP_STATUS_CANCELING: 1258 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1259 for i in job.ops[opctx.index:]) 1260 else: 1261 assert op.status in constants.OPS_FINALIZED 1262 1263 if op.status == constants.OP_STATUS_QUEUED: 1264 # Queue is shutting down 1265 assert not waitjob 1266 1267 finalize = False 1268 1269 # Reset context 1270 job.cur_opctx = None 1271 1272 # In no case must the status be finalized here 1273 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1274 1275 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1276 finalize = False 1277 1278 if not waitjob and opctx.CheckPriorityIncrease(): 1279 # Priority was changed, need to update on-disk file 1280 queue.UpdateJobUnlocked(job) 1281 1282 # Keep around for another round 1283 job.cur_opctx = opctx 1284 1285 assert (op.priority <= constants.OP_PRIO_LOWEST and 1286 op.priority >= constants.OP_PRIO_HIGHEST) 1287 1288 # In no case must the status be finalized here 1289 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1290 1291 else: 1292 # Ensure all opcodes so far have been successful 1293 assert (opctx.index == 0 or 1294 compat.all(i.status == constants.OP_STATUS_SUCCESS 1295 for i in job.ops[:opctx.index])) 1296 1297 # Reset context 1298 job.cur_opctx = None 1299 1300 if op.status == constants.OP_STATUS_SUCCESS: 1301 finalize = False 1302 1303 elif op.status == constants.OP_STATUS_ERROR: 1304 # Ensure failed opcode has an exception as its result 1305 assert errors.GetEncodedError(job.ops[opctx.index].result) 1306 1307 to_encode = errors.OpExecError("Preceding opcode failed") 1308 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1309 _EncodeOpError(to_encode)) 1310 finalize = True 1311 1312 # Consistency check 1313 assert compat.all(i.status == constants.OP_STATUS_ERROR and 1314 errors.GetEncodedError(i.result) 1315 for i in job.ops[opctx.index:]) 1316 1317 elif op.status == constants.OP_STATUS_CANCELING: 1318 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1319 "Job canceled by request") 1320 finalize = True 1321 1322 else: 1323 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1324 1325 if opctx.index == (opcount - 1): 1326 # Finalize on last opcode 1327 finalize = True 1328 1329 if finalize: 1330 # All opcodes have been run, finalize job 1331 job.Finalize() 1332 1333 # Write to disk. If the job status is final, this is the final write 1334 # allowed. Once the file has been written, it can be archived anytime. 1335 queue.UpdateJobUnlocked(job) 1336 1337 assert not waitjob 1338 1339 if finalize: 1340 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1341 return self.FINISHED 1342 1343 assert not waitjob or queue.depmgr.JobWaiting(job) 1344 1345 if waitjob: 1346 return self.WAITDEP 1347 else: 1348 return self.DEFER 1349 finally: 1350 assert job.writable, "Job became read-only while being processed" 1351 queue.release()
1352
1353 1354 -def _EvaluateJobProcessorResult(depmgr, job, result):
1355 """Looks at a result from L{_JobProcessor} for a job. 1356 1357 To be used in a L{_JobQueueWorker}. 1358 1359 """ 1360 if result == _JobProcessor.FINISHED: 1361 # Notify waiting jobs 1362 depmgr.NotifyWaiters(job.id) 1363 1364 elif result == _JobProcessor.DEFER: 1365 # Schedule again 1366 raise workerpool.DeferTask(priority=job.CalcPriority()) 1367 1368 elif result == _JobProcessor.WAITDEP: 1369 # No-op, dependency manager will re-schedule 1370 pass 1371 1372 else: 1373 raise errors.ProgrammerError("Job processor returned unknown status %s" % 1374 (result, ))
1375
1376 1377 -class _JobQueueWorker(workerpool.BaseWorker):
1378 """The actual job workers. 1379 1380 """
1381 - def RunTask(self, job): # pylint: disable=W0221
1382 """Job executor. 1383 1384 @type job: L{_QueuedJob} 1385 @param job: the job to be processed 1386 1387 """ 1388 assert job.writable, "Expected writable job" 1389 1390 # Ensure only one worker is active on a single job. If a job registers for 1391 # a dependency job, and the other job notifies before the first worker is 1392 # done, the job can end up in the tasklist more than once. 1393 job.processor_lock.acquire() 1394 try: 1395 return self._RunTaskInner(job) 1396 finally: 1397 job.processor_lock.release()
1398
1399 - def _RunTaskInner(self, job):
1400 """Executes a job. 1401 1402 Must be called with per-job lock acquired. 1403 1404 """ 1405 queue = job.queue 1406 assert queue == self.pool.queue 1407 1408 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) 1409 setname_fn(None) 1410 1411 proc = mcpu.Processor(queue.context, job.id) 1412 1413 # Create wrapper for setting thread name 1414 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, 1415 proc.ExecOpCode) 1416 1417 _EvaluateJobProcessorResult(queue.depmgr, job, 1418 _JobProcessor(queue, wrap_execop_fn, job)())
1419 1420 @staticmethod
1421 - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1422 """Updates the worker thread name to include a short summary of the opcode. 1423 1424 @param setname_fn: Callable setting worker thread name 1425 @param execop_fn: Callable for executing opcode (usually 1426 L{mcpu.Processor.ExecOpCode}) 1427 1428 """ 1429 setname_fn(op) 1430 try: 1431 return execop_fn(op, *args, **kwargs) 1432 finally: 1433 setname_fn(None)
1434 1435 @staticmethod
1436 - def _GetWorkerName(job, op):
1437 """Sets the worker thread name. 1438 1439 @type job: L{_QueuedJob} 1440 @type op: L{opcodes.OpCode} 1441 1442 """ 1443 parts = ["Job%s" % job.id] 1444 1445 if op: 1446 parts.append(op.TinySummary()) 1447 1448 return "/".join(parts)
1449
1450 1451 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1452 """Simple class implementing a job-processing workerpool. 1453 1454 """
1455 - def __init__(self, queue):
1456 super(_JobQueueWorkerPool, self).__init__("Jq", 1457 JOBQUEUE_THREADS, 1458 _JobQueueWorker) 1459 self.queue = queue
1460
1461 1462 -class _JobDependencyManager:
1463 """Keeps track of job dependencies. 1464 1465 """ 1466 (WAIT, 1467 ERROR, 1468 CANCEL, 1469 CONTINUE, 1470 WRONGSTATUS) = range(1, 6) 1471
1472 - def __init__(self, getstatus_fn, enqueue_fn):
1473 """Initializes this class. 1474 1475 """ 1476 self._getstatus_fn = getstatus_fn 1477 self._enqueue_fn = enqueue_fn 1478 1479 self._waiters = {} 1480 self._lock = locking.SharedLock("JobDepMgr")
1481 1482 @locking.ssynchronized(_LOCK, shared=1)
1483 - def GetLockInfo(self, requested): # pylint: disable=W0613
1484 """Retrieves information about waiting jobs. 1485 1486 @type requested: set 1487 @param requested: Requested information, see C{query.LQ_*} 1488 1489 """ 1490 # No need to sort here, that's being done by the lock manager and query 1491 # library. There are no priorities for notifying jobs, hence all show up as 1492 # one item under "pending". 1493 return [("job/%s" % job_id, None, None, 1494 [("job", [job.id for job in waiters])]) 1495 for job_id, waiters in self._waiters.items() 1496 if waiters]
1497 1498 @locking.ssynchronized(_LOCK, shared=1)
1499 - def JobWaiting(self, job):
1500 """Checks if a job is waiting. 1501 1502 """ 1503 return compat.any(job in jobs 1504 for jobs in self._waiters.values())
1505 1506 @locking.ssynchronized(_LOCK)
1507 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1508 """Checks if a dependency job has the requested status. 1509 1510 If the other job is not yet in a finalized status, the calling job will be 1511 notified (re-added to the workerpool) at a later point. 1512 1513 @type job: L{_QueuedJob} 1514 @param job: Job object 1515 @type dep_job_id: int 1516 @param dep_job_id: ID of dependency job 1517 @type dep_status: list 1518 @param dep_status: Required status 1519 1520 """ 1521 assert ht.TJobId(job.id) 1522 assert ht.TJobId(dep_job_id) 1523 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1524 1525 if job.id == dep_job_id: 1526 return (self.ERROR, "Job can't depend on itself") 1527 1528 # Get status of dependency job 1529 try: 1530 status = self._getstatus_fn(dep_job_id) 1531 except errors.JobLost, err: 1532 return (self.ERROR, "Dependency error: %s" % err) 1533 1534 assert status in constants.JOB_STATUS_ALL 1535 1536 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1537 1538 if status not in constants.JOBS_FINALIZED: 1539 # Register for notification and wait for job to finish 1540 job_id_waiters.add(job) 1541 return (self.WAIT, 1542 "Need to wait for job %s, wanted status '%s'" % 1543 (dep_job_id, dep_status)) 1544 1545 # Remove from waiters list 1546 if job in job_id_waiters: 1547 job_id_waiters.remove(job) 1548 1549 if (status == constants.JOB_STATUS_CANCELED and 1550 constants.JOB_STATUS_CANCELED not in dep_status): 1551 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1552 1553 elif not dep_status or status in dep_status: 1554 return (self.CONTINUE, 1555 "Dependency job %s finished with status '%s'" % 1556 (dep_job_id, status)) 1557 1558 else: 1559 return (self.WRONGSTATUS, 1560 "Dependency job %s finished with status '%s'," 1561 " not one of '%s' as required" % 1562 (dep_job_id, status, utils.CommaJoin(dep_status)))
1563
1564 - def _RemoveEmptyWaitersUnlocked(self):
1565 """Remove all jobs without actual waiters. 1566 1567 """ 1568 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1569 if not waiters]: 1570 del self._waiters[job_id]
1571
1572 - def NotifyWaiters(self, job_id):
1573 """Notifies all jobs waiting for a certain job ID. 1574 1575 @attention: Do not call until L{CheckAndRegister} returned a status other 1576 than C{WAITDEP} for C{job_id}, or behaviour is undefined 1577 @type job_id: int 1578 @param job_id: Job ID 1579 1580 """ 1581 assert ht.TJobId(job_id) 1582 1583 self._lock.acquire() 1584 try: 1585 self._RemoveEmptyWaitersUnlocked() 1586 1587 jobs = self._waiters.pop(job_id, None) 1588 finally: 1589 self._lock.release() 1590 1591 if jobs: 1592 # Re-add jobs to workerpool 1593 logging.debug("Re-adding %s jobs which were waiting for job %s", 1594 len(jobs), job_id) 1595 self._enqueue_fn(jobs)
1596
1597 1598 -def _RequireOpenQueue(fn):
1599 """Decorator for "public" functions. 1600 1601 This function should be used for all 'public' functions. That is, 1602 functions usually called from other classes. Note that this should 1603 be applied only to methods (not plain functions), since it expects 1604 that the decorated function is called with a first argument that has 1605 a '_queue_filelock' argument. 1606 1607 @warning: Use this decorator only after locking.ssynchronized 1608 1609 Example:: 1610 @locking.ssynchronized(_LOCK) 1611 @_RequireOpenQueue 1612 def Example(self): 1613 pass 1614 1615 """ 1616 def wrapper(self, *args, **kwargs): 1617 # pylint: disable=W0212 1618 assert self._queue_filelock is not None, "Queue should be open" 1619 return fn(self, *args, **kwargs)
1620 return wrapper 1621
1622 1623 -def _RequireNonDrainedQueue(fn):
1624 """Decorator checking for a non-drained queue. 1625 1626 To be used with functions submitting new jobs. 1627 1628 """ 1629 def wrapper(self, *args, **kwargs): 1630 """Wrapper function. 1631 1632 @raise errors.JobQueueDrainError: if the job queue is marked for draining 1633 1634 """ 1635 # Ok when sharing the big job queue lock, as the drain file is created when 1636 # the lock is exclusive. 1637 # Needs access to protected member, pylint: disable=W0212 1638 if self._drained: 1639 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 1640 1641 if not self._accepting_jobs: 1642 raise errors.JobQueueError("Job queue is shutting down, refusing job") 1643 1644 return fn(self, *args, **kwargs)
1645 return wrapper 1646
1647 1648 -class JobQueue(object):
1649 """Queue used to manage the jobs. 1650 1651 """
1652 - def __init__(self, context):
1653 """Constructor for JobQueue. 1654 1655 The constructor will initialize the job queue object and then 1656 start loading the current jobs from disk, either for starting them 1657 (if they were queue) or for aborting them (if they were already 1658 running). 1659 1660 @type context: GanetiContext 1661 @param context: the context object for access to the configuration 1662 data and other ganeti objects 1663 1664 """ 1665 self.context = context 1666 self._memcache = weakref.WeakValueDictionary() 1667 self._my_hostname = netutils.Hostname.GetSysName() 1668 1669 # The Big JobQueue lock. If a code block or method acquires it in shared 1670 # mode safe it must guarantee concurrency with all the code acquiring it in 1671 # shared mode, including itself. In order not to acquire it at all 1672 # concurrency must be guaranteed with all code acquiring it in shared mode 1673 # and all code acquiring it exclusively. 1674 self._lock = locking.SharedLock("JobQueue") 1675 1676 self.acquire = self._lock.acquire 1677 self.release = self._lock.release 1678 1679 # Accept jobs by default 1680 self._accepting_jobs = True 1681 1682 # Initialize the queue, and acquire the filelock. 1683 # This ensures no other process is working on the job queue. 1684 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) 1685 1686 # Read serial file 1687 self._last_serial = jstore.ReadSerial() 1688 assert self._last_serial is not None, ("Serial file was modified between" 1689 " check in jstore and here") 1690 1691 # Get initial list of nodes 1692 self._nodes = dict((n.name, n.primary_ip) 1693 for n in self.context.cfg.GetAllNodesInfo().values() 1694 if n.master_candidate) 1695 1696 # Remove master node 1697 self._nodes.pop(self._my_hostname, None) 1698 1699 # TODO: Check consistency across nodes 1700 1701 self._queue_size = None 1702 self._UpdateQueueSizeUnlocked() 1703 assert ht.TInt(self._queue_size) 1704 self._drained = jstore.CheckDrainFlag() 1705 1706 # Job dependencies 1707 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, 1708 self._EnqueueJobs) 1709 self.context.glm.AddToLockMonitor(self.depmgr) 1710 1711 # Setup worker pool 1712 self._wpool = _JobQueueWorkerPool(self) 1713 try: 1714 self._InspectQueue() 1715 except: 1716 self._wpool.TerminateWorkers() 1717 raise
1718 1719 @locking.ssynchronized(_LOCK) 1720 @_RequireOpenQueue
1721 - def _InspectQueue(self):
1722 """Loads the whole job queue and resumes unfinished jobs. 1723 1724 This function needs the lock here because WorkerPool.AddTask() may start a 1725 job while we're still doing our work. 1726 1727 """ 1728 logging.info("Inspecting job queue") 1729 1730 restartjobs = [] 1731 1732 all_job_ids = self._GetJobIDsUnlocked() 1733 jobs_count = len(all_job_ids) 1734 lastinfo = time.time() 1735 for idx, job_id in enumerate(all_job_ids): 1736 # Give an update every 1000 jobs or 10 seconds 1737 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or 1738 idx == (jobs_count - 1)): 1739 logging.info("Job queue inspection: %d/%d (%0.1f %%)", 1740 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) 1741 lastinfo = time.time() 1742 1743 job = self._LoadJobUnlocked(job_id) 1744 1745 # a failure in loading the job can cause 'None' to be returned 1746 if job is None: 1747 continue 1748 1749 status = job.CalcStatus() 1750 1751 if status == constants.JOB_STATUS_QUEUED: 1752 restartjobs.append(job) 1753 1754 elif status in (constants.JOB_STATUS_RUNNING, 1755 constants.JOB_STATUS_WAITING, 1756 constants.JOB_STATUS_CANCELING): 1757 logging.warning("Unfinished job %s found: %s", job.id, job) 1758 1759 if status == constants.JOB_STATUS_WAITING: 1760 # Restart job 1761 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1762 restartjobs.append(job) 1763 else: 1764 to_encode = errors.OpExecError("Unclean master daemon shutdown") 1765 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1766 _EncodeOpError(to_encode)) 1767 job.Finalize() 1768 1769 self.UpdateJobUnlocked(job) 1770 1771 if restartjobs: 1772 logging.info("Restarting %s jobs", len(restartjobs)) 1773 self._EnqueueJobsUnlocked(restartjobs) 1774 1775 logging.info("Job queue inspection finished")
1776
1777 - def _GetRpc(self, address_list):
1778 """Gets RPC runner with context. 1779 1780 """ 1781 return rpc.JobQueueRunner(self.context, address_list)
1782 1783 @locking.ssynchronized(_LOCK) 1784 @_RequireOpenQueue
1785 - def AddNode(self, node):
1786 """Register a new node with the queue. 1787 1788 @type node: L{objects.Node} 1789 @param node: the node object to be added 1790 1791 """ 1792 node_name = node.name 1793 assert node_name != self._my_hostname 1794 1795 # Clean queue directory on added node 1796 result = self._GetRpc(None).call_jobqueue_purge(node_name) 1797 msg = result.fail_msg 1798 if msg: 1799 logging.warning("Cannot cleanup queue directory on node %s: %s", 1800 node_name, msg) 1801 1802 if not node.master_candidate: 1803 # remove if existing, ignoring errors 1804 self._nodes.pop(node_name, None) 1805 # and skip the replication of the job ids 1806 return 1807 1808 # Upload the whole queue excluding archived jobs 1809 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1810 1811 # Upload current serial file 1812 files.append(pathutils.JOB_QUEUE_SERIAL_FILE) 1813 1814 # Static address list 1815 addrs = [node.primary_ip] 1816 1817 for file_name in files: 1818 # Read file content 1819 content = utils.ReadFile(file_name) 1820 1821 result = _CallJqUpdate(self._GetRpc(addrs), [node_name], 1822 file_name, content) 1823 msg = result[node_name].fail_msg 1824 if msg: 1825 logging.error("Failed to upload file %s to node %s: %s", 1826 file_name, node_name, msg) 1827 1828 # Set queue drained flag 1829 result = \ 1830 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name], 1831 self._drained) 1832 msg = result[node_name].fail_msg 1833 if msg: 1834 logging.error("Failed to set queue drained flag on node %s: %s", 1835 node_name, msg) 1836 1837 self._nodes[node_name] = node.primary_ip
1838 1839 @locking.ssynchronized(_LOCK) 1840 @_RequireOpenQueue
1841 - def RemoveNode(self, node_name):
1842 """Callback called when removing nodes from the cluster. 1843 1844 @type node_name: str 1845 @param node_name: the name of the node to remove 1846 1847 """ 1848 self._nodes.pop(node_name, None)
1849 1850 @staticmethod
1851 - def _CheckRpcResult(result, nodes, failmsg):
1852 """Verifies the status of an RPC call. 1853 1854 Since we aim to keep consistency should this node (the current 1855 master) fail, we will log errors if our rpc fail, and especially 1856 log the case when more than half of the nodes fails. 1857 1858 @param result: the data as returned from the rpc call 1859 @type nodes: list 1860 @param nodes: the list of nodes we made the call to 1861 @type failmsg: str 1862 @param failmsg: the identifier to be used for logging 1863 1864 """ 1865 failed = [] 1866 success = [] 1867 1868 for node in nodes: 1869 msg = result[node].fail_msg 1870 if msg: 1871 failed.append(node) 1872 logging.error("RPC call %s (%s) failed on node %s: %s", 1873 result[node].call, failmsg, node, msg) 1874 else: 1875 success.append(node) 1876 1877 # +1 for the master node 1878 if (len(success) + 1) < len(failed): 1879 # TODO: Handle failing nodes 1880 logging.error("More than half of the nodes failed")
1881
1882 - def _GetNodeIp(self):
1883 """Helper for returning the node name/ip list. 1884 1885 @rtype: (list, list) 1886 @return: a tuple of two lists, the first one with the node 1887 names and the second one with the node addresses 1888 1889 """ 1890 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1891 name_list = self._nodes.keys() 1892 addr_list = [self._nodes[name] for name in name_list] 1893 return name_list, addr_list
1894
1895 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1896 """Writes a file locally and then replicates it to all nodes. 1897 1898 This function will replace the contents of a file on the local 1899 node and then replicate it to all the other nodes we have. 1900 1901 @type file_name: str 1902 @param file_name: the path of the file to be replicated 1903 @type data: str 1904 @param data: the new contents of the file 1905 @type replicate: boolean 1906 @param replicate: whether to spread the changes to the remote nodes 1907 1908 """ 1909 getents = runtime.GetEnts() 1910 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1911 gid=getents.daemons_gid, 1912 mode=constants.JOB_QUEUE_FILES_PERMS) 1913 1914 if replicate: 1915 names, addrs = self._GetNodeIp() 1916 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) 1917 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1918
1919 - def _RenameFilesUnlocked(self, rename):
1920 """Renames a file locally and then replicate the change. 1921 1922 This function will rename a file in the local queue directory 1923 and then replicate this rename to all the other nodes we have. 1924 1925 @type rename: list of (old, new) 1926 @param rename: List containing tuples mapping old to new names 1927 1928 """ 1929 # Rename them locally 1930 for old, new in rename: 1931 utils.RenameFile(old, new, mkdir=True) 1932 1933 # ... and on all nodes 1934 names, addrs = self._GetNodeIp() 1935 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1936 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1937
1938 - def _NewSerialsUnlocked(self, count):
1939 """Generates a new job identifier. 1940 1941 Job identifiers are unique during the lifetime of a cluster. 1942 1943 @type count: integer 1944 @param count: how many serials to return 1945 @rtype: list of int 1946 @return: a list of job identifiers. 1947 1948 """ 1949 assert ht.TNonNegativeInt(count) 1950 1951 # New number 1952 serial = self._last_serial + count 1953 1954 # Write to file 1955 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE, 1956 "%s\n" % serial, True) 1957 1958 result = [jstore.FormatJobID(v) 1959 for v in range(self._last_serial + 1, serial + 1)] 1960 1961 # Keep it only if we were able to write the file 1962 self._last_serial = serial 1963 1964 assert len(result) == count 1965 1966 return result
1967 1968 @staticmethod
1969 - def _GetJobPath(job_id):
1970 """Returns the job file for a given job id. 1971 1972 @type job_id: str 1973 @param job_id: the job identifier 1974 @rtype: str 1975 @return: the path to the job file 1976 1977 """ 1978 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1979 1980 @staticmethod
1981 - def _GetArchivedJobPath(job_id):
1982 """Returns the archived job file for a give job id. 1983 1984 @type job_id: str 1985 @param job_id: the job identifier 1986 @rtype: str 1987 @return: the path to the archived job file 1988 1989 """ 1990 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, 1991 jstore.GetArchiveDirectory(job_id), 1992 "job-%s" % job_id)
1993 1994 @staticmethod
1995 - def _DetermineJobDirectories(archived):
1996 """Build list of directories containing job files. 1997 1998 @type archived: bool 1999 @param archived: Whether to include directories for archived jobs 2000 @rtype: list 2001 2002 """ 2003 result = [pathutils.QUEUE_DIR] 2004 2005 if archived: 2006 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR 2007 result.extend(map(compat.partial(utils.PathJoin, archive_path), 2008 utils.ListVisibleFiles(archive_path))) 2009 2010 return result
2011 2012 @classmethod
2013 - def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2014 """Return all known job IDs. 2015 2016 The method only looks at disk because it's a requirement that all 2017 jobs are present on disk (so in the _memcache we don't have any 2018 extra IDs). 2019 2020 @type sort: boolean 2021 @param sort: perform sorting on the returned job ids 2022 @rtype: list 2023 @return: the list of job IDs 2024 2025 """ 2026 jlist = [] 2027 2028 for path in cls._DetermineJobDirectories(archived): 2029 for filename in utils.ListVisibleFiles(path): 2030 m = constants.JOB_FILE_RE.match(filename) 2031 if m: 2032 jlist.append(int(m.group(1))) 2033 2034 if sort: 2035 jlist.sort() 2036 return jlist
2037
2038 - def _LoadJobUnlocked(self, job_id):
2039 """Loads a job from the disk or memory. 2040 2041 Given a job id, this will return the cached job object if 2042 existing, or try to load the job from the disk. If loading from 2043 disk, it will also add the job to the cache. 2044 2045 @type job_id: int 2046 @param job_id: the job id 2047 @rtype: L{_QueuedJob} or None 2048 @return: either None or the job object 2049 2050 """ 2051 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!" 2052 2053 job = self._memcache.get(job_id, None) 2054 if job: 2055 logging.debug("Found job %s in memcache", job_id) 2056 assert job.writable, "Found read-only job in memcache" 2057 return job 2058 2059 try: 2060 job = self._LoadJobFromDisk(job_id, False) 2061 if job is None: 2062 return job 2063 except errors.JobFileCorrupted: 2064 old_path = self._GetJobPath(job_id) 2065 new_path = self._GetArchivedJobPath(job_id) 2066 if old_path == new_path: 2067 # job already archived (future case) 2068 logging.exception("Can't parse job %s", job_id) 2069 else: 2070 # non-archived case 2071 logging.exception("Can't parse job %s, will archive.", job_id) 2072 self._RenameFilesUnlocked([(old_path, new_path)]) 2073 return None 2074 2075 assert job.writable, "Job just loaded is not writable" 2076 2077 self._memcache[job_id] = job 2078 logging.debug("Added job %s to the cache", job_id) 2079 return job
2080
2081 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2082 """Load the given job file from disk. 2083 2084 Given a job file, read, load and restore it in a _QueuedJob format. 2085 2086 @type job_id: int 2087 @param job_id: job identifier 2088 @type try_archived: bool 2089 @param try_archived: Whether to try loading an archived job 2090 @rtype: L{_QueuedJob} or None 2091 @return: either None or the job object 2092 2093 """ 2094 path_functions = [(self._GetJobPath, False)] 2095 2096 if try_archived: 2097 path_functions.append((self._GetArchivedJobPath, True)) 2098 2099 raw_data = None 2100 archived = None 2101 2102 for (fn, archived) in path_functions: 2103 filepath = fn(job_id) 2104 logging.debug("Loading job from %s", filepath) 2105 try: 2106 raw_data = utils.ReadFile(filepath) 2107 except EnvironmentError, err: 2108 if err.errno != errno.ENOENT: 2109 raise 2110 else: 2111 break 2112 2113 if not raw_data: 2114 return None 2115 2116 if writable is None: 2117 writable = not archived 2118 2119 try: 2120 data = serializer.LoadJson(raw_data) 2121 job = _QueuedJob.Restore(self, data, writable, archived) 2122 except Exception, err: # pylint: disable=W0703 2123 raise errors.JobFileCorrupted(err) 2124 2125 return job
2126
2127 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2128 """Load the given job file from disk. 2129 2130 Given a job file, read, load and restore it in a _QueuedJob format. 2131 In case of error reading the job, it gets returned as None, and the 2132 exception is logged. 2133 2134 @type job_id: int 2135 @param job_id: job identifier 2136 @type try_archived: bool 2137 @param try_archived: Whether to try loading an archived job 2138 @rtype: L{_QueuedJob} or None 2139 @return: either None or the job object 2140 2141 """ 2142 try: 2143 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 2144 except (errors.JobFileCorrupted, EnvironmentError): 2145 logging.exception("Can't load/parse job %s", job_id) 2146 return None
2147
2148 - def _UpdateQueueSizeUnlocked(self):
2149 """Update the queue size. 2150 2151 """ 2152 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2153 2154 @locking.ssynchronized(_LOCK) 2155 @_RequireOpenQueue
2156 - def SetDrainFlag(self, drain_flag):
2157 """Sets the drain flag for the queue. 2158 2159 @type drain_flag: boolean 2160 @param drain_flag: Whether to set or unset the drain flag 2161 2162 """ 2163 # Change flag locally 2164 jstore.SetDrainFlag(drain_flag) 2165 2166 self._drained = drain_flag 2167 2168 # ... and on all nodes 2169 (names, addrs) = self._GetNodeIp() 2170 result = \ 2171 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag) 2172 self._CheckRpcResult(result, self._nodes, 2173 "Setting queue drain flag to %s" % drain_flag) 2174 2175 return True
2176 2177 @_RequireOpenQueue
2178 - def _SubmitJobUnlocked(self, job_id, ops):
2179 """Create and store a new job. 2180 2181 This enters the job into our job queue and also puts it on the new 2182 queue, in order for it to be picked up by the queue processors. 2183 2184 @type job_id: job ID 2185 @param job_id: the job ID for the new job 2186 @type ops: list 2187 @param ops: The list of OpCodes that will become the new job. 2188 @rtype: L{_QueuedJob} 2189 @return: the job object to be queued 2190 @raise errors.JobQueueFull: if the job queue has too many jobs in it 2191 @raise errors.GenericError: If an opcode is not valid 2192 2193 """ 2194 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: 2195 raise errors.JobQueueFull() 2196 2197 job = _QueuedJob(self, job_id, ops, True) 2198 2199 for idx, op in enumerate(job.ops): 2200 # Check priority 2201 if op.priority not in constants.OP_PRIO_SUBMIT_VALID: 2202 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2203 raise errors.GenericError("Opcode %s has invalid priority %s, allowed" 2204 " are %s" % (idx, op.priority, allowed)) 2205 2206 # Check job dependencies 2207 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) 2208 if not opcodes.TNoRelativeJobDependencies(dependencies): 2209 raise errors.GenericError("Opcode %s has invalid dependencies, must" 2210 " match %s: %s" % 2211 (idx, opcodes.TNoRelativeJobDependencies, 2212 dependencies)) 2213 2214 # Write to disk 2215 self.UpdateJobUnlocked(job) 2216 2217 self._queue_size += 1 2218 2219 logging.debug("Adding new job %s to the cache", job_id) 2220 self._memcache[job_id] = job 2221 2222 return job
2223 2224 @locking.ssynchronized(_LOCK) 2225 @_RequireOpenQueue 2226 @_RequireNonDrainedQueue
2227 - def SubmitJob(self, ops):
2228 """Create and store a new job. 2229 2230 @see: L{_SubmitJobUnlocked} 2231 2232 """ 2233 (job_id, ) = self._NewSerialsUnlocked(1) 2234 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) 2235 return job_id
2236 2237 @locking.ssynchronized(_LOCK) 2238 @_RequireOpenQueue 2239 @_RequireNonDrainedQueue
2240 - def SubmitManyJobs(self, jobs):
2241 """Create and store multiple jobs. 2242 2243 @see: L{_SubmitJobUnlocked} 2244 2245 """ 2246 all_job_ids = self._NewSerialsUnlocked(len(jobs)) 2247 2248 (results, added_jobs) = \ 2249 self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) 2250 2251 self._EnqueueJobsUnlocked(added_jobs) 2252 2253 return results
2254 2255 @staticmethod
2256 - def _FormatSubmitError(msg, ops):
2257 """Formats errors which occurred while submitting a job. 2258 2259 """ 2260 return ("%s; opcodes %s" % 2261 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2262 2263 @staticmethod
2264 - def _ResolveJobDependencies(resolve_fn, deps):
2265 """Resolves relative job IDs in dependencies. 2266 2267 @type resolve_fn: callable 2268 @param resolve_fn: Function to resolve a relative job ID 2269 @type deps: list 2270 @param deps: Dependencies 2271 @rtype: tuple; (boolean, string or list) 2272 @return: If successful (first tuple item), the returned list contains 2273 resolved job IDs along with the requested status; if not successful, 2274 the second element is an error message 2275 2276 """ 2277 result = [] 2278 2279 for (dep_job_id, dep_status) in deps: 2280 if ht.TRelativeJobId(dep_job_id): 2281 assert ht.TInt(dep_job_id) and dep_job_id < 0 2282 try: 2283 job_id = resolve_fn(dep_job_id) 2284 except IndexError: 2285 # Abort 2286 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 2287 else: 2288 job_id = dep_job_id 2289 2290 result.append((job_id, dep_status)) 2291 2292 return (True, result)
2293
2294 - def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2295 """Create and store multiple jobs. 2296 2297 @see: L{_SubmitJobUnlocked} 2298 2299 """ 2300 results = [] 2301 added_jobs = [] 2302 2303 def resolve_fn(job_idx, reljobid): 2304 assert reljobid < 0 2305 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2306 2307 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): 2308 for op in ops: 2309 if getattr(op, opcodes.DEPEND_ATTR, None): 2310 (status, data) = \ 2311 self._ResolveJobDependencies(compat.partial(resolve_fn, idx), 2312 op.depends) 2313 if not status: 2314 # Abort resolving dependencies 2315 assert ht.TNonEmptyString(data), "No error message" 2316 break 2317 # Use resolved dependencies 2318 op.depends = data 2319 else: 2320 try: 2321 job = self._SubmitJobUnlocked(job_id, ops) 2322 except errors.GenericError, err: 2323 status = False 2324 data = self._FormatSubmitError(str(err), ops) 2325 else: 2326 status = True 2327 data = job_id 2328 added_jobs.append(job) 2329 2330 results.append((status, data)) 2331 2332 return (results, added_jobs)
2333 2334 @locking.ssynchronized(_LOCK)
2335 - def _EnqueueJobs(self, jobs):
2336 """Helper function to add jobs to worker pool's queue. 2337 2338 @type jobs: list 2339 @param jobs: List of all jobs 2340 2341 """ 2342 return self._EnqueueJobsUnlocked(jobs)
2343
2344 - def _EnqueueJobsUnlocked(self, jobs):
2345 """Helper function to add jobs to worker pool's queue. 2346 2347 @type jobs: list 2348 @param jobs: List of all jobs 2349 2350 """ 2351 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" 2352 self._wpool.AddManyTasks([(job, ) for job in jobs], 2353 priority=[job.CalcPriority() for job in jobs], 2354 task_id=map(_GetIdAttr, jobs))
2355
2356 - def _GetJobStatusForDependencies(self, job_id):
2357 """Gets the status of a job for dependencies. 2358 2359 @type job_id: int 2360 @param job_id: Job ID 2361 @raise errors.JobLost: If job can't be found 2362 2363 """ 2364 # Not using in-memory cache as doing so would require an exclusive lock 2365 2366 # Try to load from disk 2367 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2368 2369 assert not job.writable, "Got writable job" # pylint: disable=E1101 2370 2371 if job: 2372 return job.CalcStatus() 2373 2374 raise errors.JobLost("Job %s not found" % job_id)
2375 2376 @_RequireOpenQueue
2377 - def UpdateJobUnlocked(self, job, replicate=True):
2378 """Update a job's on disk storage. 2379 2380 After a job has been modified, this function needs to be called in 2381 order to write the changes to disk and replicate them to the other 2382 nodes. 2383 2384 @type job: L{_QueuedJob} 2385 @param job: the changed job 2386 @type replicate: boolean 2387 @param replicate: whether to replicate the change to remote nodes 2388 2389 """ 2390 if __debug__: 2391 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 2392 assert (finalized ^ (job.end_timestamp is None)) 2393 assert job.writable, "Can't update read-only job" 2394 assert not job.archived, "Can't update archived job" 2395 2396 filename = self._GetJobPath(job.id) 2397 data = serializer.DumpJson(job.Serialize()) 2398 logging.debug("Writing job %s to %s", job.id, filename) 2399 self._UpdateJobQueueFile(filename, data, replicate)
2400
2401 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 2402 timeout):
2403 """Waits for changes in a job. 2404 2405 @type job_id: int 2406 @param job_id: Job identifier 2407 @type fields: list of strings 2408 @param fields: Which fields to check for changes 2409 @type prev_job_info: list or None 2410 @param prev_job_info: Last job information returned 2411 @type prev_log_serial: int 2412 @param prev_log_serial: Last job message serial number 2413 @type timeout: float 2414 @param timeout: maximum time to wait in seconds 2415 @rtype: tuple (job info, log entries) 2416 @return: a tuple of the job information as required via 2417 the fields parameter, and the log entries as a list 2418 2419 if the job has not changed and the timeout has expired, 2420 we instead return a special value, 2421 L{constants.JOB_NOTCHANGED}, which should be interpreted 2422 as such by the clients 2423 2424 """ 2425 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True, 2426 writable=False) 2427 2428 helper = _WaitForJobChangesHelper() 2429 2430 return helper(self._GetJobPath(job_id), load_fn, 2431 fields, prev_job_info, prev_log_serial, timeout)
2432 2433 @locking.ssynchronized(_LOCK) 2434 @_RequireOpenQueue
2435 - def CancelJob(self, job_id):
2436 """Cancels a job. 2437 2438 This will only succeed if the job has not started yet. 2439 2440 @type job_id: int 2441 @param job_id: job ID of job to be cancelled. 2442 2443 """ 2444 logging.info("Cancelling job %s", job_id) 2445 2446 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2447 2448 @locking.ssynchronized(_LOCK) 2449 @_RequireOpenQueue
2450 - def ChangeJobPriority(self, job_id, priority):
2451 """Changes a job's priority. 2452 2453 @type job_id: int 2454 @param job_id: ID of the job whose priority should be changed 2455 @type priority: int 2456 @param priority: New priority 2457 2458 """ 2459 logging.info("Changing priority of job %s to %s", job_id, priority) 2460 2461 if priority not in constants.OP_PRIO_SUBMIT_VALID: 2462 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2463 raise errors.GenericError("Invalid priority %s, allowed are %s" % 2464 (priority, allowed)) 2465 2466 def fn(job): 2467 (success, msg) = job.ChangePriority(priority) 2468 2469 if success: 2470 try: 2471 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) 2472 except workerpool.NoSuchTask: 2473 logging.debug("Job %s is not in workerpool at this time", job.id) 2474 2475 return (success, msg)
2476 2477 return self._ModifyJobUnlocked(job_id, fn) 2478
2479 - def _ModifyJobUnlocked(self, job_id, mod_fn):
2480 """Modifies a job. 2481 2482 @type job_id: int 2483 @param job_id: Job ID 2484 @type mod_fn: callable 2485 @param mod_fn: Modifying function, receiving job object as parameter, 2486 returning tuple of (status boolean, message string) 2487 2488 """ 2489 job = self._LoadJobUnlocked(job_id) 2490 if not job: 2491 logging.debug("Job %s not found", job_id) 2492 return (False, "Job %s not found" % job_id) 2493 2494 assert job.writable, "Can't modify read-only job" 2495 assert not job.archived, "Can't modify archived job" 2496 2497 (success, msg) = mod_fn(job) 2498 2499 if success: 2500 # If the job was finalized (e.g. cancelled), this is the final write 2501 # allowed. The job can be archived anytime. 2502 self.UpdateJobUnlocked(job) 2503 2504 return (success, msg)
2505 2506 @_RequireOpenQueue
2507 - def _ArchiveJobsUnlocked(self, jobs):
2508 """Archives jobs. 2509 2510 @type jobs: list of L{_QueuedJob} 2511 @param jobs: Job objects 2512 @rtype: int 2513 @return: Number of archived jobs 2514 2515 """ 2516 archive_jobs = [] 2517 rename_files = [] 2518 for job in jobs: 2519 assert job.writable, "Can't archive read-only job" 2520 assert not job.archived, "Can't cancel archived job" 2521 2522 if job.CalcStatus() not in constants.JOBS_FINALIZED: 2523 logging.debug("Job %s is not yet done", job.id) 2524 continue 2525 2526 archive_jobs.append(job) 2527 2528 old = self._GetJobPath(job.id) 2529 new = self._GetArchivedJobPath(job.id) 2530 rename_files.append((old, new)) 2531 2532 # TODO: What if 1..n files fail to rename? 2533 self._RenameFilesUnlocked(rename_files) 2534 2535 logging.debug("Successfully archived job(s) %s", 2536 utils.CommaJoin(job.id for job in archive_jobs)) 2537 2538 # Since we haven't quite checked, above, if we succeeded or failed renaming 2539 # the files, we update the cached queue size from the filesystem. When we 2540 # get around to fix the TODO: above, we can use the number of actually 2541 # archived jobs to fix this. 2542 self._UpdateQueueSizeUnlocked() 2543 return len(archive_jobs)
2544 2545 @locking.ssynchronized(_LOCK) 2546 @_RequireOpenQueue
2547 - def ArchiveJob(self, job_id):
2548 """Archives a job. 2549 2550 This is just a wrapper over L{_ArchiveJobsUnlocked}. 2551 2552 @type job_id: int 2553 @param job_id: Job ID of job to be archived. 2554 @rtype: bool 2555 @return: Whether job was archived 2556 2557 """ 2558 logging.info("Archiving job %s", job_id) 2559 2560 job = self._LoadJobUnlocked(job_id) 2561 if not job: 2562 logging.debug("Job %s not found", job_id) 2563 return False 2564 2565 return self._ArchiveJobsUnlocked([job]) == 1
2566 2567 @locking.ssynchronized(_LOCK) 2568 @_RequireOpenQueue
2569 - def AutoArchiveJobs(self, age, timeout):
2570 """Archives all jobs based on age. 2571 2572 The method will archive all jobs which are older than the age 2573 parameter. For jobs that don't have an end timestamp, the start 2574 timestamp will be considered. The special '-1' age will cause 2575 archival of all jobs (that are not running or queued). 2576 2577 @type age: int 2578 @param age: the minimum age in seconds 2579 2580 """ 2581 logging.info("Archiving jobs with age more than %s seconds", age) 2582 2583 now = time.time() 2584 end_time = now + timeout 2585 archived_count = 0 2586 last_touched = 0 2587 2588 all_job_ids = self._GetJobIDsUnlocked() 2589 pending = [] 2590 for idx, job_id in enumerate(all_job_ids): 2591 last_touched = idx + 1 2592 2593 # Not optimal because jobs could be pending 2594 # TODO: Measure average duration for job archival and take number of 2595 # pending jobs into account. 2596 if time.time() > end_time: 2597 break 2598 2599 # Returns None if the job failed to load 2600 job = self._LoadJobUnlocked(job_id) 2601 if job: 2602 if job.end_timestamp is None: 2603 if job.start_timestamp is None: 2604 job_age = job.received_timestamp 2605 else: 2606 job_age = job.start_timestamp 2607 else: 2608 job_age = job.end_timestamp 2609 2610 if age == -1 or now - job_age[0] > age: 2611 pending.append(job) 2612 2613 # Archive 10 jobs at a time 2614 if len(pending) >= 10: 2615 archived_count += self._ArchiveJobsUnlocked(pending) 2616 pending = [] 2617 2618 if pending: 2619 archived_count += self._ArchiveJobsUnlocked(pending) 2620 2621 return (archived_count, len(all_job_ids) - last_touched)
2622
2623 - def _Query(self, fields, qfilter):
2624 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, 2625 namefield="id") 2626 2627 # Archived jobs are only looked at if the "archived" field is referenced 2628 # either as a requested field or in the filter. By default archived jobs 2629 # are ignored. 2630 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData()) 2631 2632 job_ids = qobj.RequestedNames() 2633 2634 list_all = (job_ids is None) 2635 2636 if list_all: 2637 # Since files are added to/removed from the queue atomically, there's no 2638 # risk of getting the job ids in an inconsistent state. 2639 job_ids = self._GetJobIDsUnlocked(archived=include_archived) 2640 2641 jobs = [] 2642 2643 for job_id in job_ids: 2644 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2645 if job is not None or not list_all: 2646 jobs.append((job_id, job)) 2647 2648 return (qobj, jobs, list_all)
2649
2650 - def QueryJobs(self, fields, qfilter):
2651 """Returns a list of jobs in queue. 2652 2653 @type fields: sequence 2654 @param fields: List of wanted fields 2655 @type qfilter: None or query2 filter (list) 2656 @param qfilter: Query filter 2657 2658 """ 2659 (qobj, ctx, _) = self._Query(fields, qfilter) 2660 2661 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2662
2663 - def OldStyleQueryJobs(self, job_ids, fields):
2664 """Returns a list of jobs in queue. 2665 2666 @type job_ids: list 2667 @param job_ids: sequence of job identifiers or None for all 2668 @type fields: list 2669 @param fields: names of fields to return 2670 @rtype: list 2671 @return: list one element per job, each element being list with 2672 the requested fields 2673 2674 """ 2675 # backwards compat: 2676 job_ids = [int(jid) for jid in job_ids] 2677 qfilter = qlang.MakeSimpleFilter("id", job_ids) 2678 2679 (qobj, ctx, _) = self._Query(fields, qfilter) 2680 2681 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2682 2683 @locking.ssynchronized(_LOCK)
2684 - def PrepareShutdown(self):
2685 """Prepare to stop the job queue. 2686 2687 Disables execution of jobs in the workerpool and returns whether there are 2688 any jobs currently running. If the latter is the case, the job queue is not 2689 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can 2690 be called without interfering with any job. Queued and unfinished jobs will 2691 be resumed next time. 2692 2693 Once this function has been called no new job submissions will be accepted 2694 (see L{_RequireNonDrainedQueue}). 2695 2696 @rtype: bool 2697 @return: Whether there are any running jobs 2698 2699 """ 2700 if self._accepting_jobs: 2701 self._accepting_jobs = False 2702 2703 # Tell worker pool to stop processing pending tasks 2704 self._wpool.SetActive(False) 2705 2706 return self._wpool.HasRunningTasks()
2707
2708 - def AcceptingJobsUnlocked(self):
2709 """Returns whether jobs are accepted. 2710 2711 Once L{PrepareShutdown} has been called, no new jobs are accepted and the 2712 queue is shutting down. 2713 2714 @rtype: bool 2715 2716 """ 2717 return self._accepting_jobs
2718 2719 @locking.ssynchronized(_LOCK) 2720 @_RequireOpenQueue
2721 - def Shutdown(self):
2722 """Stops the job queue. 2723 2724 This shutdowns all the worker threads an closes the queue. 2725 2726 """ 2727 self._wpool.TerminateWorkers() 2728 2729 self._queue_filelock.Close() 2730 self._queue_filelock = None
2731