Package ganeti :: Package jqueue
[hide private]
[frames] | no frames]

Source Code for Package ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Module implementing the job queue handling. 
  32   
  33  """ 
  34   
  35  import logging 
  36  import errno 
  37  import time 
  38  import weakref 
  39  import threading 
  40  import itertools 
  41  import operator 
  42  import os 
  43   
  44  try: 
  45    # pylint: disable=E0611 
  46    from pyinotify import pyinotify 
  47  except ImportError: 
  48    import pyinotify 
  49   
  50  from ganeti import asyncnotifier 
  51  from ganeti import constants 
  52  from ganeti import serializer 
  53  from ganeti import locking 
  54  from ganeti import luxi 
  55  from ganeti import opcodes 
  56  from ganeti import opcodes_base 
  57  from ganeti import errors 
  58  from ganeti import mcpu 
  59  from ganeti import utils 
  60  from ganeti import jstore 
  61  import ganeti.rpc.node as rpc 
  62  from ganeti import runtime 
  63  from ganeti import netutils 
  64  from ganeti import compat 
  65  from ganeti import ht 
  66  from ganeti import query 
  67  from ganeti import qlang 
  68  from ganeti import pathutils 
  69  from ganeti import vcluster 
  70  from ganeti.cmdlib import cluster 
  71   
  72   
  73  #: Retrieves "id" attribute 
  74  _GetIdAttr = operator.attrgetter("id") 
75 76 77 -class CancelJob(Exception):
78 """Special exception to cancel a job. 79 80 """
81
82 83 -def TimeStampNow():
84 """Returns the current timestamp. 85 86 @rtype: tuple 87 @return: the current time in the (seconds, microseconds) format 88 89 """ 90 return utils.SplitTime(time.time())
91
92 93 -def _CallJqUpdate(runner, names, file_name, content):
94 """Updates job queue file after virtualizing filename. 95 96 """ 97 virt_file_name = vcluster.MakeVirtualPath(file_name) 98 return runner.call_jobqueue_update(names, virt_file_name, content)
99
100 101 -class _QueuedOpCode(object):
102 """Encapsulates an opcode object. 103 104 @ivar log: holds the execution log and consists of tuples 105 of the form C{(log_serial, timestamp, level, message)} 106 @ivar input: the OpCode we encapsulate 107 @ivar status: the current status 108 @ivar result: the result of the LU execution 109 @ivar start_timestamp: timestamp for the start of the execution 110 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 111 @ivar stop_timestamp: timestamp for the end of the execution 112 113 """ 114 __slots__ = ["input", "status", "result", "log", "priority", 115 "start_timestamp", "exec_timestamp", "end_timestamp", 116 "__weakref__"] 117
118 - def __init__(self, op):
119 """Initializes instances of this class. 120 121 @type op: L{opcodes.OpCode} 122 @param op: the opcode we encapsulate 123 124 """ 125 self.input = op 126 self.status = constants.OP_STATUS_QUEUED 127 self.result = None 128 self.log = [] 129 self.start_timestamp = None 130 self.exec_timestamp = None 131 self.end_timestamp = None 132 133 # Get initial priority (it might change during the lifetime of this opcode) 134 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
135 136 @classmethod
137 - def Restore(cls, state):
138 """Restore the _QueuedOpCode from the serialized form. 139 140 @type state: dict 141 @param state: the serialized state 142 @rtype: _QueuedOpCode 143 @return: a new _QueuedOpCode instance 144 145 """ 146 obj = _QueuedOpCode.__new__(cls) 147 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 148 obj.status = state["status"] 149 obj.result = state["result"] 150 obj.log = state["log"] 151 obj.start_timestamp = state.get("start_timestamp", None) 152 obj.exec_timestamp = state.get("exec_timestamp", None) 153 obj.end_timestamp = state.get("end_timestamp", None) 154 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 155 return obj
156
157 - def Serialize(self):
158 """Serializes this _QueuedOpCode. 159 160 @rtype: dict 161 @return: the dictionary holding the serialized state 162 163 """ 164 return { 165 "input": self.input.__getstate__(), 166 "status": self.status, 167 "result": self.result, 168 "log": self.log, 169 "start_timestamp": self.start_timestamp, 170 "exec_timestamp": self.exec_timestamp, 171 "end_timestamp": self.end_timestamp, 172 "priority": self.priority, 173 }
174
175 176 -class _QueuedJob(object):
177 """In-memory job representation. 178 179 This is what we use to track the user-submitted jobs. Locking must 180 be taken care of by users of this class. 181 182 @type queue: L{JobQueue} 183 @ivar queue: the parent queue 184 @ivar id: the job ID 185 @type ops: list 186 @ivar ops: the list of _QueuedOpCode that constitute the job 187 @type log_serial: int 188 @ivar log_serial: holds the index for the next log entry 189 @ivar received_timestamp: the timestamp for when the job was received 190 @ivar start_timestmap: the timestamp for start of execution 191 @ivar end_timestamp: the timestamp for end of execution 192 @ivar writable: Whether the job is allowed to be modified 193 194 """ 195 # pylint: disable=W0212 196 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 197 "received_timestamp", "start_timestamp", "end_timestamp", 198 "writable", "archived", 199 "livelock", "process_id", 200 "__weakref__"] 201
202 - def AddReasons(self, pickup=False):
203 """Extend the reason trail 204 205 Add the reason for all the opcodes of this job to be executed. 206 207 """ 208 count = 0 209 for queued_op in self.ops: 210 op = queued_op.input 211 if pickup: 212 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP 213 else: 214 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE 215 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__, 216 reason_src_prefix) 217 reason_text = "job=%d;index=%d" % (self.id, count) 218 reason = getattr(op, "reason", []) 219 reason.append((reason_src, reason_text, utils.EpochNano())) 220 op.reason = reason 221 count = count + 1
222
223 - def __init__(self, queue, job_id, ops, writable):
224 """Constructor for the _QueuedJob. 225 226 @type queue: L{JobQueue} 227 @param queue: our parent queue 228 @type job_id: job_id 229 @param job_id: our job id 230 @type ops: list 231 @param ops: the list of opcodes we hold, which will be encapsulated 232 in _QueuedOpCodes 233 @type writable: bool 234 @param writable: Whether job can be modified 235 236 """ 237 if not ops: 238 raise errors.GenericError("A job needs at least one opcode") 239 240 self.queue = queue 241 self.id = int(job_id) 242 self.ops = [_QueuedOpCode(op) for op in ops] 243 self.AddReasons() 244 self.log_serial = 0 245 self.received_timestamp = TimeStampNow() 246 self.start_timestamp = None 247 self.end_timestamp = None 248 self.archived = False 249 self.livelock = None 250 self.process_id = None 251 252 self._InitInMemory(self, writable) 253 254 assert not self.archived, "New jobs can not be marked as archived"
255 256 @staticmethod
257 - def _InitInMemory(obj, writable):
258 """Initializes in-memory variables. 259 260 """ 261 obj.writable = writable 262 obj.ops_iter = None 263 obj.cur_opctx = None
264
265 - def __repr__(self):
266 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 267 "id=%s" % self.id, 268 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 269 270 return "<%s at %#x>" % (" ".join(status), id(self))
271 272 @classmethod
273 - def Restore(cls, queue, state, writable, archived):
274 """Restore a _QueuedJob from serialized state: 275 276 @type queue: L{JobQueue} 277 @param queue: to which queue the restored job belongs 278 @type state: dict 279 @param state: the serialized state 280 @type writable: bool 281 @param writable: Whether job can be modified 282 @type archived: bool 283 @param archived: Whether job was already archived 284 @rtype: _JobQueue 285 @return: the restored _JobQueue instance 286 287 """ 288 obj = _QueuedJob.__new__(cls) 289 obj.queue = queue 290 obj.id = int(state["id"]) 291 obj.received_timestamp = state.get("received_timestamp", None) 292 obj.start_timestamp = state.get("start_timestamp", None) 293 obj.end_timestamp = state.get("end_timestamp", None) 294 obj.archived = archived 295 obj.livelock = state.get("livelock", None) 296 obj.process_id = state.get("process_id", None) 297 if obj.process_id is not None: 298 obj.process_id = int(obj.process_id) 299 300 obj.ops = [] 301 obj.log_serial = 0 302 for op_state in state["ops"]: 303 op = _QueuedOpCode.Restore(op_state) 304 for log_entry in op.log: 305 obj.log_serial = max(obj.log_serial, log_entry[0]) 306 obj.ops.append(op) 307 308 cls._InitInMemory(obj, writable) 309 310 return obj
311
312 - def Serialize(self):
313 """Serialize the _JobQueue instance. 314 315 @rtype: dict 316 @return: the serialized state 317 318 """ 319 return { 320 "id": self.id, 321 "ops": [op.Serialize() for op in self.ops], 322 "start_timestamp": self.start_timestamp, 323 "end_timestamp": self.end_timestamp, 324 "received_timestamp": self.received_timestamp, 325 "livelock": self.livelock, 326 "process_id": self.process_id, 327 }
328
329 - def CalcStatus(self):
330 """Compute the status of this job. 331 332 This function iterates over all the _QueuedOpCodes in the job and 333 based on their status, computes the job status. 334 335 The algorithm is: 336 - if we find a cancelled, or finished with error, the job 337 status will be the same 338 - otherwise, the last opcode with the status one of: 339 - waitlock 340 - canceling 341 - running 342 343 will determine the job status 344 345 - otherwise, it means either all opcodes are queued, or success, 346 and the job status will be the same 347 348 @return: the job status 349 350 """ 351 status = constants.JOB_STATUS_QUEUED 352 353 all_success = True 354 for op in self.ops: 355 if op.status == constants.OP_STATUS_SUCCESS: 356 continue 357 358 all_success = False 359 360 if op.status == constants.OP_STATUS_QUEUED: 361 pass 362 elif op.status == constants.OP_STATUS_WAITING: 363 status = constants.JOB_STATUS_WAITING 364 elif op.status == constants.OP_STATUS_RUNNING: 365 status = constants.JOB_STATUS_RUNNING 366 elif op.status == constants.OP_STATUS_CANCELING: 367 status = constants.JOB_STATUS_CANCELING 368 break 369 elif op.status == constants.OP_STATUS_ERROR: 370 status = constants.JOB_STATUS_ERROR 371 # The whole job fails if one opcode failed 372 break 373 elif op.status == constants.OP_STATUS_CANCELED: 374 status = constants.OP_STATUS_CANCELED 375 break 376 377 if all_success: 378 status = constants.JOB_STATUS_SUCCESS 379 380 return status
381
382 - def CalcPriority(self):
383 """Gets the current priority for this job. 384 385 Only unfinished opcodes are considered. When all are done, the default 386 priority is used. 387 388 @rtype: int 389 390 """ 391 priorities = [op.priority for op in self.ops 392 if op.status not in constants.OPS_FINALIZED] 393 394 if not priorities: 395 # All opcodes are done, assume default priority 396 return constants.OP_PRIO_DEFAULT 397 398 return min(priorities)
399
400 - def GetLogEntries(self, newer_than):
401 """Selectively returns the log entries. 402 403 @type newer_than: None or int 404 @param newer_than: if this is None, return all log entries, 405 otherwise return only the log entries with serial higher 406 than this value 407 @rtype: list 408 @return: the list of the log entries selected 409 410 """ 411 if newer_than is None: 412 serial = -1 413 else: 414 serial = newer_than 415 416 entries = [] 417 for op in self.ops: 418 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 419 420 return entries
421
422 - def MarkUnfinishedOps(self, status, result):
423 """Mark unfinished opcodes with a given status and result. 424 425 This is an utility function for marking all running or waiting to 426 be run opcodes with a given status. Opcodes which are already 427 finalised are not changed. 428 429 @param status: a given opcode status 430 @param result: the opcode result 431 432 """ 433 not_marked = True 434 for op in self.ops: 435 if op.status in constants.OPS_FINALIZED: 436 assert not_marked, "Finalized opcodes found after non-finalized ones" 437 continue 438 op.status = status 439 op.result = result 440 not_marked = False
441
442 - def Finalize(self):
443 """Marks the job as finalized. 444 445 """ 446 self.end_timestamp = TimeStampNow()
447
448 - def Cancel(self):
449 """Marks job as canceled/-ing if possible. 450 451 @rtype: tuple; (bool, string) 452 @return: Boolean describing whether job was successfully canceled or marked 453 as canceling and a text message 454 455 """ 456 status = self.CalcStatus() 457 458 if status == constants.JOB_STATUS_QUEUED: 459 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 460 "Job canceled by request") 461 self.Finalize() 462 return (True, "Job %s canceled" % self.id) 463 464 elif status == constants.JOB_STATUS_WAITING: 465 # The worker will notice the new status and cancel the job 466 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 467 return (True, "Job %s will be canceled" % self.id) 468 469 else: 470 logging.debug("Job %s is no longer waiting in the queue", self.id) 471 return (False, "Job %s is no longer waiting in the queue" % self.id)
472
473 - def ChangePriority(self, priority):
474 """Changes the job priority. 475 476 @type priority: int 477 @param priority: New priority 478 @rtype: tuple; (bool, string) 479 @return: Boolean describing whether job's priority was successfully changed 480 and a text message 481 482 """ 483 status = self.CalcStatus() 484 485 if status in constants.JOBS_FINALIZED: 486 return (False, "Job %s is finished" % self.id) 487 elif status == constants.JOB_STATUS_CANCELING: 488 return (False, "Job %s is cancelling" % self.id) 489 else: 490 assert status in (constants.JOB_STATUS_QUEUED, 491 constants.JOB_STATUS_WAITING, 492 constants.JOB_STATUS_RUNNING) 493 494 changed = False 495 for op in self.ops: 496 if (op.status == constants.OP_STATUS_RUNNING or 497 op.status in constants.OPS_FINALIZED): 498 assert not changed, \ 499 ("Found opcode for which priority should not be changed after" 500 " priority has been changed for previous opcodes") 501 continue 502 503 assert op.status in (constants.OP_STATUS_QUEUED, 504 constants.OP_STATUS_WAITING) 505 506 changed = True 507 508 # Set new priority (doesn't modify opcode input) 509 op.priority = priority 510 511 if changed: 512 return (True, ("Priorities of pending opcodes for job %s have been" 513 " changed to %s" % (self.id, priority))) 514 else: 515 return (False, "Job %s had no pending opcodes" % self.id)
516
517 - def SetPid(self, pid):
518 """Sets the job's process ID 519 520 @type pid: int 521 @param pid: the process ID 522 523 """ 524 status = self.CalcStatus() 525 526 if status in (constants.JOB_STATUS_QUEUED, 527 constants.JOB_STATUS_WAITING): 528 if self.process_id is not None: 529 logging.warning("Replacing the process id %s of job %s with %s", 530 self.process_id, self.id, pid) 531 self.process_id = pid 532 else: 533 logging.warning("Can set pid only for queued/waiting jobs")
534
535 536 -class _OpExecCallbacks(mcpu.OpExecCbBase):
537
538 - def __init__(self, queue, job, op):
539 """Initializes this class. 540 541 @type queue: L{JobQueue} 542 @param queue: Job queue 543 @type job: L{_QueuedJob} 544 @param job: Job object 545 @type op: L{_QueuedOpCode} 546 @param op: OpCode 547 548 """ 549 super(_OpExecCallbacks, self).__init__() 550 551 assert queue, "Queue is missing" 552 assert job, "Job is missing" 553 assert op, "Opcode is missing" 554 555 self._queue = queue 556 self._job = job 557 self._op = op
558
559 - def _CheckCancel(self):
560 """Raises an exception to cancel the job if asked to. 561 562 """ 563 # Cancel here if we were asked to 564 if self._op.status == constants.OP_STATUS_CANCELING: 565 logging.debug("Canceling opcode") 566 raise CancelJob()
567
568 - def NotifyStart(self):
569 """Mark the opcode as running, not lock-waiting. 570 571 This is called from the mcpu code as a notifier function, when the LU is 572 finally about to start the Exec() method. Of course, to have end-user 573 visible results, the opcode must be initially (before calling into 574 Processor.ExecOpCode) set to OP_STATUS_WAITING. 575 576 """ 577 assert self._op in self._job.ops 578 assert self._op.status in (constants.OP_STATUS_WAITING, 579 constants.OP_STATUS_CANCELING) 580 581 # Cancel here if we were asked to 582 self._CheckCancel() 583 584 logging.debug("Opcode is now running") 585 586 self._op.status = constants.OP_STATUS_RUNNING 587 self._op.exec_timestamp = TimeStampNow() 588 589 # And finally replicate the job status 590 self._queue.UpdateJobUnlocked(self._job)
591
592 - def NotifyRetry(self):
593 """Mark opcode again as lock-waiting. 594 595 This is called from the mcpu code just after calling PrepareRetry. 596 The opcode will now again acquire locks (more, hopefully). 597 598 """ 599 self._op.status = constants.OP_STATUS_WAITING 600 logging.debug("Opcode will be retried. Back to waiting.")
601
602 - def _AppendFeedback(self, timestamp, log_type, log_msg):
603 """Internal feedback append function, with locks 604 605 """ 606 self._job.log_serial += 1 607 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 608 self._queue.UpdateJobUnlocked(self._job, replicate=False)
609
610 - def Feedback(self, *args):
611 """Append a log entry. 612 613 """ 614 assert len(args) < 3 615 616 if len(args) == 1: 617 log_type = constants.ELOG_MESSAGE 618 log_msg = args[0] 619 else: 620 (log_type, log_msg) = args 621 622 # The time is split to make serialization easier and not lose 623 # precision. 624 timestamp = utils.SplitTime(time.time()) 625 self._AppendFeedback(timestamp, log_type, log_msg)
626
627 - def CurrentPriority(self):
628 """Returns current priority for opcode. 629 630 """ 631 assert self._op.status in (constants.OP_STATUS_WAITING, 632 constants.OP_STATUS_CANCELING) 633 634 # Cancel here if we were asked to 635 self._CheckCancel() 636 637 return self._op.priority
638
639 - def SubmitManyJobs(self, jobs):
640 """Submits jobs for processing. 641 642 See L{JobQueue.SubmitManyJobs}. 643 644 """ 645 # Locking is done in job queue 646 return self._queue.SubmitManyJobs(jobs)
647
648 649 -def _EncodeOpError(err):
650 """Encodes an error which occurred while processing an opcode. 651 652 """ 653 if isinstance(err, errors.GenericError): 654 to_encode = err 655 else: 656 to_encode = errors.OpExecError(str(err)) 657 658 return errors.EncodeException(to_encode)
659
660 661 -class _TimeoutStrategyWrapper:
662 - def __init__(self, fn):
663 """Initializes this class. 664 665 """ 666 self._fn = fn 667 self._next = None
668
669 - def _Advance(self):
670 """Gets the next timeout if necessary. 671 672 """ 673 if self._next is None: 674 self._next = self._fn()
675
676 - def Peek(self):
677 """Returns the next timeout. 678 679 """ 680 self._Advance() 681 return self._next
682
683 - def Next(self):
684 """Returns the current timeout and advances the internal state. 685 686 """ 687 self._Advance() 688 result = self._next 689 self._next = None 690 return result
691
692 693 -class _OpExecContext:
694 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
695 """Initializes this class. 696 697 """ 698 self.op = op 699 self.index = index 700 self.log_prefix = log_prefix 701 self.summary = op.input.Summary() 702 703 # Create local copy to modify 704 if getattr(op.input, opcodes_base.DEPEND_ATTR, None): 705 self.jobdeps = op.input.depends[:] 706 else: 707 self.jobdeps = None 708 709 self._timeout_strategy_factory = timeout_strategy_factory 710 self._ResetTimeoutStrategy()
711
712 - def _ResetTimeoutStrategy(self):
713 """Creates a new timeout strategy. 714 715 """ 716 self._timeout_strategy = \ 717 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
718
719 - def CheckPriorityIncrease(self):
720 """Checks whether priority can and should be increased. 721 722 Called when locks couldn't be acquired. 723 724 """ 725 op = self.op 726 727 # Exhausted all retries and next round should not use blocking acquire 728 # for locks? 729 if (self._timeout_strategy.Peek() is None and 730 op.priority > constants.OP_PRIO_HIGHEST): 731 logging.debug("Increasing priority") 732 op.priority -= 1 733 self._ResetTimeoutStrategy() 734 return True 735 736 return False
737
738 - def GetNextLockTimeout(self):
739 """Returns the next lock acquire timeout. 740 741 """ 742 return self._timeout_strategy.Next()
743
744 745 -class _JobProcessor(object):
746 (DEFER, 747 WAITDEP, 748 FINISHED) = range(1, 4) 749
750 - def __init__(self, queue, opexec_fn, job, 751 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
752 """Initializes this class. 753 754 """ 755 self.queue = queue 756 self.opexec_fn = opexec_fn 757 self.job = job 758 self._timeout_strategy_factory = _timeout_strategy_factory
759 760 @staticmethod
761 - def _FindNextOpcode(job, timeout_strategy_factory):
762 """Locates the next opcode to run. 763 764 @type job: L{_QueuedJob} 765 @param job: Job object 766 @param timeout_strategy_factory: Callable to create new timeout strategy 767 768 """ 769 # Create some sort of a cache to speed up locating next opcode for future 770 # lookups 771 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 772 # pending and one for processed ops. 773 if job.ops_iter is None: 774 job.ops_iter = enumerate(job.ops) 775 776 # Find next opcode to run 777 while True: 778 try: 779 (idx, op) = job.ops_iter.next() 780 except StopIteration: 781 raise errors.ProgrammerError("Called for a finished job") 782 783 if op.status == constants.OP_STATUS_RUNNING: 784 # Found an opcode already marked as running 785 raise errors.ProgrammerError("Called for job marked as running") 786 787 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 788 timeout_strategy_factory) 789 790 if op.status not in constants.OPS_FINALIZED: 791 return opctx 792 793 # This is a job that was partially completed before master daemon 794 # shutdown, so it can be expected that some opcodes are already 795 # completed successfully (if any did error out, then the whole job 796 # should have been aborted and not resubmitted for processing). 797 logging.info("%s: opcode %s already processed, skipping", 798 opctx.log_prefix, opctx.summary)
799 800 @staticmethod
801 - def _MarkWaitlock(job, op):
802 """Marks an opcode as waiting for locks. 803 804 The job's start timestamp is also set if necessary. 805 806 @type job: L{_QueuedJob} 807 @param job: Job object 808 @type op: L{_QueuedOpCode} 809 @param op: Opcode object 810 811 """ 812 assert op in job.ops 813 assert op.status in (constants.OP_STATUS_QUEUED, 814 constants.OP_STATUS_WAITING) 815 816 update = False 817 818 op.result = None 819 820 if op.status == constants.OP_STATUS_QUEUED: 821 op.status = constants.OP_STATUS_WAITING 822 update = True 823 824 if op.start_timestamp is None: 825 op.start_timestamp = TimeStampNow() 826 update = True 827 828 if job.start_timestamp is None: 829 job.start_timestamp = op.start_timestamp 830 update = True 831 832 assert op.status == constants.OP_STATUS_WAITING 833 834 return update
835 836 @staticmethod
837 - def _CheckDependencies(queue, job, opctx):
838 """Checks if an opcode has dependencies and if so, processes them. 839 840 @type queue: L{JobQueue} 841 @param queue: Queue object 842 @type job: L{_QueuedJob} 843 @param job: Job object 844 @type opctx: L{_OpExecContext} 845 @param opctx: Opcode execution context 846 @rtype: bool 847 @return: Whether opcode will be re-scheduled by dependency tracker 848 849 """ 850 op = opctx.op 851 852 result = False 853 854 while opctx.jobdeps: 855 (dep_job_id, dep_status) = opctx.jobdeps[0] 856 857 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 858 dep_status) 859 assert ht.TNonEmptyString(depmsg), "No dependency message" 860 861 logging.info("%s: %s", opctx.log_prefix, depmsg) 862 863 if depresult == _JobDependencyManager.CONTINUE: 864 # Remove dependency and continue 865 opctx.jobdeps.pop(0) 866 867 elif depresult == _JobDependencyManager.WAIT: 868 # Need to wait for notification, dependency tracker will re-add job 869 # to workerpool 870 result = True 871 break 872 873 elif depresult == _JobDependencyManager.CANCEL: 874 # Job was cancelled, cancel this job as well 875 job.Cancel() 876 assert op.status == constants.OP_STATUS_CANCELING 877 break 878 879 elif depresult in (_JobDependencyManager.WRONGSTATUS, 880 _JobDependencyManager.ERROR): 881 # Job failed or there was an error, this job must fail 882 op.status = constants.OP_STATUS_ERROR 883 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 884 break 885 886 else: 887 raise errors.ProgrammerError("Unknown dependency result '%s'" % 888 depresult) 889 890 return result
891
892 - def _ExecOpCodeUnlocked(self, opctx):
893 """Processes one opcode and returns the result. 894 895 """ 896 op = opctx.op 897 898 assert op.status in (constants.OP_STATUS_WAITING, 899 constants.OP_STATUS_CANCELING) 900 901 # The very last check if the job was cancelled before trying to execute 902 if op.status == constants.OP_STATUS_CANCELING: 903 return (constants.OP_STATUS_CANCELING, None) 904 905 timeout = opctx.GetNextLockTimeout() 906 907 try: 908 # Make sure not to hold queue lock while calling ExecOpCode 909 result = self.opexec_fn(op.input, 910 _OpExecCallbacks(self.queue, self.job, op), 911 timeout=timeout) 912 except mcpu.LockAcquireTimeout: 913 assert timeout is not None, "Received timeout for blocking acquire" 914 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 915 916 assert op.status in (constants.OP_STATUS_WAITING, 917 constants.OP_STATUS_CANCELING) 918 919 # Was job cancelled while we were waiting for the lock? 920 if op.status == constants.OP_STATUS_CANCELING: 921 return (constants.OP_STATUS_CANCELING, None) 922 923 # Stay in waitlock while trying to re-acquire lock 924 return (constants.OP_STATUS_WAITING, None) 925 except CancelJob: 926 logging.exception("%s: Canceling job", opctx.log_prefix) 927 assert op.status == constants.OP_STATUS_CANCELING 928 return (constants.OP_STATUS_CANCELING, None) 929 930 except Exception, err: # pylint: disable=W0703 931 logging.exception("%s: Caught exception in %s", 932 opctx.log_prefix, opctx.summary) 933 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 934 else: 935 logging.debug("%s: %s successful", 936 opctx.log_prefix, opctx.summary) 937 return (constants.OP_STATUS_SUCCESS, result)
938
939 - def __call__(self, _nextop_fn=None):
940 """Continues execution of a job. 941 942 @param _nextop_fn: Callback function for tests 943 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 944 be deferred and C{WAITDEP} if the dependency manager 945 (L{_JobDependencyManager}) will re-schedule the job when appropriate 946 947 """ 948 queue = self.queue 949 job = self.job 950 951 logging.debug("Processing job %s", job.id) 952 953 try: 954 opcount = len(job.ops) 955 956 assert job.writable, "Expected writable job" 957 958 # Don't do anything for finalized jobs 959 if job.CalcStatus() in constants.JOBS_FINALIZED: 960 return self.FINISHED 961 962 # Is a previous opcode still pending? 963 if job.cur_opctx: 964 opctx = job.cur_opctx 965 job.cur_opctx = None 966 else: 967 if __debug__ and _nextop_fn: 968 _nextop_fn() 969 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 970 971 op = opctx.op 972 973 # Consistency check 974 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 975 constants.OP_STATUS_CANCELING) 976 for i in job.ops[opctx.index + 1:]) 977 978 assert op.status in (constants.OP_STATUS_QUEUED, 979 constants.OP_STATUS_WAITING, 980 constants.OP_STATUS_CANCELING) 981 982 assert (op.priority <= constants.OP_PRIO_LOWEST and 983 op.priority >= constants.OP_PRIO_HIGHEST) 984 985 waitjob = None 986 987 if op.status != constants.OP_STATUS_CANCELING: 988 assert op.status in (constants.OP_STATUS_QUEUED, 989 constants.OP_STATUS_WAITING) 990 991 # Prepare to start opcode 992 if self._MarkWaitlock(job, op): 993 # Write to disk 994 queue.UpdateJobUnlocked(job) 995 996 assert op.status == constants.OP_STATUS_WAITING 997 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 998 assert job.start_timestamp and op.start_timestamp 999 assert waitjob is None 1000 1001 # Check if waiting for a job is necessary 1002 waitjob = self._CheckDependencies(queue, job, opctx) 1003 1004 assert op.status in (constants.OP_STATUS_WAITING, 1005 constants.OP_STATUS_CANCELING, 1006 constants.OP_STATUS_ERROR) 1007 1008 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1009 constants.OP_STATUS_ERROR)): 1010 logging.info("%s: opcode %s waiting for locks", 1011 opctx.log_prefix, opctx.summary) 1012 1013 assert not opctx.jobdeps, "Not all dependencies were removed" 1014 1015 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1016 1017 op.status = op_status 1018 op.result = op_result 1019 1020 assert not waitjob 1021 1022 if op.status in (constants.OP_STATUS_WAITING, 1023 constants.OP_STATUS_QUEUED): 1024 # waiting: Couldn't get locks in time 1025 # queued: Queue is shutting down 1026 assert not op.end_timestamp 1027 else: 1028 # Finalize opcode 1029 op.end_timestamp = TimeStampNow() 1030 1031 if op.status == constants.OP_STATUS_CANCELING: 1032 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1033 for i in job.ops[opctx.index:]) 1034 else: 1035 assert op.status in constants.OPS_FINALIZED 1036 1037 if op.status == constants.OP_STATUS_QUEUED: 1038 # Queue is shutting down 1039 assert not waitjob 1040 1041 finalize = False 1042 1043 # Reset context 1044 job.cur_opctx = None 1045 1046 # In no case must the status be finalized here 1047 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1048 1049 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1050 finalize = False 1051 1052 if not waitjob and opctx.CheckPriorityIncrease(): 1053 # Priority was changed, need to update on-disk file 1054 queue.UpdateJobUnlocked(job) 1055 1056 # Keep around for another round 1057 job.cur_opctx = opctx 1058 1059 assert (op.priority <= constants.OP_PRIO_LOWEST and 1060 op.priority >= constants.OP_PRIO_HIGHEST) 1061 1062 # In no case must the status be finalized here 1063 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1064 1065 else: 1066 # Ensure all opcodes so far have been successful 1067 assert (opctx.index == 0 or 1068 compat.all(i.status == constants.OP_STATUS_SUCCESS 1069 for i in job.ops[:opctx.index])) 1070 1071 # Reset context 1072 job.cur_opctx = None 1073 1074 if op.status == constants.OP_STATUS_SUCCESS: 1075 finalize = False 1076 1077 elif op.status == constants.OP_STATUS_ERROR: 1078 # If we get here, we cannot afford to check for any consistency 1079 # any more, we just want to clean up. 1080 # TODO: Actually, it wouldn't be a bad idea to start a timer 1081 # here to kill the whole process. 1082 to_encode = errors.OpExecError("Preceding opcode failed") 1083 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1084 _EncodeOpError(to_encode)) 1085 finalize = True 1086 elif op.status == constants.OP_STATUS_CANCELING: 1087 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1088 "Job canceled by request") 1089 finalize = True 1090 1091 else: 1092 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1093 1094 if opctx.index == (opcount - 1): 1095 # Finalize on last opcode 1096 finalize = True 1097 1098 if finalize: 1099 # All opcodes have been run, finalize job 1100 job.Finalize() 1101 1102 # Write to disk. If the job status is final, this is the final write 1103 # allowed. Once the file has been written, it can be archived anytime. 1104 queue.UpdateJobUnlocked(job) 1105 1106 assert not waitjob 1107 1108 if finalize: 1109 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1110 return self.FINISHED 1111 1112 assert not waitjob or queue.depmgr.JobWaiting(job) 1113 1114 if waitjob: 1115 return self.WAITDEP 1116 else: 1117 return self.DEFER 1118 finally: 1119 assert job.writable, "Job became read-only while being processed"
1120
1121 1122 -class _JobDependencyManager:
1123 """Keeps track of job dependencies. 1124 1125 """ 1126 (WAIT, 1127 ERROR, 1128 CANCEL, 1129 CONTINUE, 1130 WRONGSTATUS) = range(1, 6) 1131
1132 - def __init__(self, getstatus_fn):
1133 """Initializes this class. 1134 1135 """ 1136 self._getstatus_fn = getstatus_fn 1137 1138 self._waiters = {}
1139
1140 - def JobWaiting(self, job):
1141 """Checks if a job is waiting. 1142 1143 """ 1144 return compat.any(job in jobs 1145 for jobs in self._waiters.values())
1146
1147 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1148 """Checks if a dependency job has the requested status. 1149 1150 If the other job is not yet in a finalized status, the calling job will be 1151 notified (re-added to the workerpool) at a later point. 1152 1153 @type job: L{_QueuedJob} 1154 @param job: Job object 1155 @type dep_job_id: int 1156 @param dep_job_id: ID of dependency job 1157 @type dep_status: list 1158 @param dep_status: Required status 1159 1160 """ 1161 assert ht.TJobId(job.id) 1162 assert ht.TJobId(dep_job_id) 1163 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1164 1165 if job.id == dep_job_id: 1166 return (self.ERROR, "Job can't depend on itself") 1167 1168 # Get status of dependency job 1169 try: 1170 status = self._getstatus_fn(dep_job_id) 1171 except errors.JobLost, err: 1172 return (self.ERROR, "Dependency error: %s" % err) 1173 1174 assert status in constants.JOB_STATUS_ALL 1175 1176 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1177 1178 if status not in constants.JOBS_FINALIZED: 1179 # Register for notification and wait for job to finish 1180 job_id_waiters.add(job) 1181 return (self.WAIT, 1182 "Need to wait for job %s, wanted status '%s'" % 1183 (dep_job_id, dep_status)) 1184 1185 # Remove from waiters list 1186 if job in job_id_waiters: 1187 job_id_waiters.remove(job) 1188 1189 if (status == constants.JOB_STATUS_CANCELED and 1190 constants.JOB_STATUS_CANCELED not in dep_status): 1191 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1192 1193 elif not dep_status or status in dep_status: 1194 return (self.CONTINUE, 1195 "Dependency job %s finished with status '%s'" % 1196 (dep_job_id, status)) 1197 1198 else: 1199 return (self.WRONGSTATUS, 1200 "Dependency job %s finished with status '%s'," 1201 " not one of '%s' as required" % 1202 (dep_job_id, status, utils.CommaJoin(dep_status)))
1203
1205 """Remove all jobs without actual waiters. 1206 1207 """ 1208 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1209 if not waiters]: 1210 del self._waiters[job_id]
1211
1212 1213 -class JobQueue(object):
1214 """Queue used to manage the jobs. 1215 1216 """
1217 - def __init__(self, context, cfg):
1218 """Constructor for JobQueue. 1219 1220 The constructor will initialize the job queue object and then 1221 start loading the current jobs from disk, either for starting them 1222 (if they were queue) or for aborting them (if they were already 1223 running). 1224 1225 @type context: GanetiContext 1226 @param context: the context object for access to the configuration 1227 data and other ganeti objects 1228 1229 """ 1230 self.context = context 1231 self._memcache = weakref.WeakValueDictionary() 1232 self._my_hostname = netutils.Hostname.GetSysName() 1233 1234 # Get initial list of nodes 1235 self._nodes = dict((n.name, n.primary_ip) 1236 for n in cfg.GetAllNodesInfo().values() 1237 if n.master_candidate) 1238 1239 # Remove master node 1240 self._nodes.pop(self._my_hostname, None) 1241 1242 # Job dependencies 1243 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies)
1244
1245 - def _GetRpc(self, address_list):
1246 """Gets RPC runner with context. 1247 1248 """ 1249 return rpc.JobQueueRunner(self.context, address_list)
1250 1251 @staticmethod
1252 - def _CheckRpcResult(result, nodes, failmsg):
1253 """Verifies the status of an RPC call. 1254 1255 Since we aim to keep consistency should this node (the current 1256 master) fail, we will log errors if our rpc fail, and especially 1257 log the case when more than half of the nodes fails. 1258 1259 @param result: the data as returned from the rpc call 1260 @type nodes: list 1261 @param nodes: the list of nodes we made the call to 1262 @type failmsg: str 1263 @param failmsg: the identifier to be used for logging 1264 1265 """ 1266 failed = [] 1267 success = [] 1268 1269 for node in nodes: 1270 msg = result[node].fail_msg 1271 if msg: 1272 failed.append(node) 1273 logging.error("RPC call %s (%s) failed on node %s: %s", 1274 result[node].call, failmsg, node, msg) 1275 else: 1276 success.append(node) 1277 1278 # +1 for the master node 1279 if (len(success) + 1) < len(failed): 1280 # TODO: Handle failing nodes 1281 logging.error("More than half of the nodes failed")
1282
1283 - def _GetNodeIp(self):
1284 """Helper for returning the node name/ip list. 1285 1286 @rtype: (list, list) 1287 @return: a tuple of two lists, the first one with the node 1288 names and the second one with the node addresses 1289 1290 """ 1291 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1292 name_list = self._nodes.keys() 1293 addr_list = [self._nodes[name] for name in name_list] 1294 return name_list, addr_list
1295
1296 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1297 """Writes a file locally and then replicates it to all nodes. 1298 1299 This function will replace the contents of a file on the local 1300 node and then replicate it to all the other nodes we have. 1301 1302 @type file_name: str 1303 @param file_name: the path of the file to be replicated 1304 @type data: str 1305 @param data: the new contents of the file 1306 @type replicate: boolean 1307 @param replicate: whether to spread the changes to the remote nodes 1308 1309 """ 1310 getents = runtime.GetEnts() 1311 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1312 gid=getents.daemons_gid, 1313 mode=constants.JOB_QUEUE_FILES_PERMS) 1314 1315 if replicate: 1316 names, addrs = self._GetNodeIp() 1317 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) 1318 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1319
1320 - def _RenameFilesUnlocked(self, rename):
1321 """Renames a file locally and then replicate the change. 1322 1323 This function will rename a file in the local queue directory 1324 and then replicate this rename to all the other nodes we have. 1325 1326 @type rename: list of (old, new) 1327 @param rename: List containing tuples mapping old to new names 1328 1329 """ 1330 # Rename them locally 1331 for old, new in rename: 1332 utils.RenameFile(old, new, mkdir=True) 1333 1334 # ... and on all nodes 1335 names, addrs = self._GetNodeIp() 1336 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1337 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1338 1339 @staticmethod
1340 - def _GetJobPath(job_id):
1341 """Returns the job file for a given job id. 1342 1343 @type job_id: str 1344 @param job_id: the job identifier 1345 @rtype: str 1346 @return: the path to the job file 1347 1348 """ 1349 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1350 1351 @staticmethod
1352 - def _GetArchivedJobPath(job_id):
1353 """Returns the archived job file for a give job id. 1354 1355 @type job_id: str 1356 @param job_id: the job identifier 1357 @rtype: str 1358 @return: the path to the archived job file 1359 1360 """ 1361 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, 1362 jstore.GetArchiveDirectory(job_id), 1363 "job-%s" % job_id)
1364 1365 @staticmethod
1366 - def _DetermineJobDirectories(archived):
1367 """Build list of directories containing job files. 1368 1369 @type archived: bool 1370 @param archived: Whether to include directories for archived jobs 1371 @rtype: list 1372 1373 """ 1374 result = [pathutils.QUEUE_DIR] 1375 1376 if archived: 1377 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR 1378 result.extend(map(compat.partial(utils.PathJoin, archive_path), 1379 utils.ListVisibleFiles(archive_path))) 1380 1381 return result
1382 1383 @classmethod
1384 - def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1385 """Return all known job IDs. 1386 1387 The method only looks at disk because it's a requirement that all 1388 jobs are present on disk (so in the _memcache we don't have any 1389 extra IDs). 1390 1391 @type sort: boolean 1392 @param sort: perform sorting on the returned job ids 1393 @rtype: list 1394 @return: the list of job IDs 1395 1396 """ 1397 jlist = [] 1398 1399 for path in cls._DetermineJobDirectories(archived): 1400 for filename in utils.ListVisibleFiles(path): 1401 m = constants.JOB_FILE_RE.match(filename) 1402 if m: 1403 jlist.append(int(m.group(1))) 1404 1405 if sort: 1406 jlist.sort() 1407 return jlist
1408
1409 - def _LoadJobUnlocked(self, job_id):
1410 """Loads a job from the disk or memory. 1411 1412 Given a job id, this will return the cached job object if 1413 existing, or try to load the job from the disk. If loading from 1414 disk, it will also add the job to the cache. 1415 1416 @type job_id: int 1417 @param job_id: the job id 1418 @rtype: L{_QueuedJob} or None 1419 @return: either None or the job object 1420 1421 """ 1422 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!" 1423 1424 job = self._memcache.get(job_id, None) 1425 if job: 1426 logging.debug("Found job %s in memcache", job_id) 1427 assert job.writable, "Found read-only job in memcache" 1428 return job 1429 1430 try: 1431 job = self._LoadJobFromDisk(job_id, False) 1432 if job is None: 1433 return job 1434 except errors.JobFileCorrupted: 1435 old_path = self._GetJobPath(job_id) 1436 new_path = self._GetArchivedJobPath(job_id) 1437 if old_path == new_path: 1438 # job already archived (future case) 1439 logging.exception("Can't parse job %s", job_id) 1440 else: 1441 # non-archived case 1442 logging.exception("Can't parse job %s, will archive.", job_id) 1443 self._RenameFilesUnlocked([(old_path, new_path)]) 1444 return None 1445 1446 assert job.writable, "Job just loaded is not writable" 1447 1448 self._memcache[job_id] = job 1449 logging.debug("Added job %s to the cache", job_id) 1450 return job
1451
1452 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1453 """Load the given job file from disk. 1454 1455 Given a job file, read, load and restore it in a _QueuedJob format. 1456 1457 @type job_id: int 1458 @param job_id: job identifier 1459 @type try_archived: bool 1460 @param try_archived: Whether to try loading an archived job 1461 @rtype: L{_QueuedJob} or None 1462 @return: either None or the job object 1463 1464 """ 1465 path_functions = [(self._GetJobPath, False)] 1466 1467 if try_archived: 1468 path_functions.append((self._GetArchivedJobPath, True)) 1469 1470 raw_data = None 1471 archived = None 1472 1473 for (fn, archived) in path_functions: 1474 filepath = fn(job_id) 1475 logging.debug("Loading job from %s", filepath) 1476 try: 1477 raw_data = utils.ReadFile(filepath) 1478 except EnvironmentError, err: 1479 if err.errno != errno.ENOENT: 1480 raise 1481 else: 1482 break 1483 1484 if not raw_data: 1485 logging.debug("No data available for job %s", job_id) 1486 return None 1487 1488 if writable is None: 1489 writable = not archived 1490 1491 try: 1492 data = serializer.LoadJson(raw_data) 1493 job = _QueuedJob.Restore(self, data, writable, archived) 1494 except Exception, err: # pylint: disable=W0703 1495 raise errors.JobFileCorrupted(err) 1496 1497 return job
1498
1499 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1500 """Load the given job file from disk. 1501 1502 Given a job file, read, load and restore it in a _QueuedJob format. 1503 In case of error reading the job, it gets returned as None, and the 1504 exception is logged. 1505 1506 @type job_id: int 1507 @param job_id: job identifier 1508 @type try_archived: bool 1509 @param try_archived: Whether to try loading an archived job 1510 @rtype: L{_QueuedJob} or None 1511 @return: either None or the job object 1512 1513 """ 1514 try: 1515 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 1516 except (errors.JobFileCorrupted, EnvironmentError): 1517 logging.exception("Can't load/parse job %s", job_id) 1518 return None
1519 1520 @classmethod
1521 - def SubmitManyJobs(cls, jobs):
1522 """Create and store multiple jobs. 1523 1524 """ 1525 return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
1526 1527 @staticmethod
1528 - def _ResolveJobDependencies(resolve_fn, deps):
1529 """Resolves relative job IDs in dependencies. 1530 1531 @type resolve_fn: callable 1532 @param resolve_fn: Function to resolve a relative job ID 1533 @type deps: list 1534 @param deps: Dependencies 1535 @rtype: tuple; (boolean, string or list) 1536 @return: If successful (first tuple item), the returned list contains 1537 resolved job IDs along with the requested status; if not successful, 1538 the second element is an error message 1539 1540 """ 1541 result = [] 1542 1543 for (dep_job_id, dep_status) in deps: 1544 if ht.TRelativeJobId(dep_job_id): 1545 assert ht.TInt(dep_job_id) and dep_job_id < 0 1546 try: 1547 job_id = resolve_fn(dep_job_id) 1548 except IndexError: 1549 # Abort 1550 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 1551 else: 1552 job_id = dep_job_id 1553 1554 result.append((job_id, dep_status)) 1555 1556 return (True, result)
1557
1558 - def _GetJobStatusForDependencies(self, job_id):
1559 """Gets the status of a job for dependencies. 1560 1561 @type job_id: int 1562 @param job_id: Job ID 1563 @raise errors.JobLost: If job can't be found 1564 1565 """ 1566 # Not using in-memory cache as doing so would require an exclusive lock 1567 1568 # Try to load from disk 1569 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 1570 1571 if job: 1572 assert not job.writable, "Got writable job" # pylint: disable=E1101 1573 1574 if job: 1575 return job.CalcStatus() 1576 1577 raise errors.JobLost("Job %s not found" % job_id)
1578
1579 - def UpdateJobUnlocked(self, job, replicate=True):
1580 """Update a job's on disk storage. 1581 1582 After a job has been modified, this function needs to be called in 1583 order to write the changes to disk and replicate them to the other 1584 nodes. 1585 1586 @type job: L{_QueuedJob} 1587 @param job: the changed job 1588 @type replicate: boolean 1589 @param replicate: whether to replicate the change to remote nodes 1590 1591 """ 1592 if __debug__: 1593 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 1594 assert (finalized ^ (job.end_timestamp is None)) 1595 assert job.writable, "Can't update read-only job" 1596 assert not job.archived, "Can't update archived job" 1597 1598 filename = self._GetJobPath(job.id) 1599 data = serializer.DumpJson(job.Serialize()) 1600 logging.debug("Writing job %s to %s", job.id, filename) 1601 self._UpdateJobQueueFile(filename, data, replicate)
1602
1603 - def HasJobBeenFinalized(self, job_id):
1604 """Checks if a job has been finalized. 1605 1606 @type job_id: int 1607 @param job_id: Job identifier 1608 @rtype: boolean 1609 @return: True if the job has been finalized, 1610 False if the timeout has been reached, 1611 None if the job doesn't exist 1612 1613 """ 1614 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 1615 if job is not None: 1616 return job.CalcStatus() in constants.JOBS_FINALIZED 1617 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed: 1618 # FIXME: The above variable is a temporary workaround until the Python job 1619 # queue is completely removed. When removing the job queue, also remove 1620 # the variable from LUClusterDestroy. 1621 return True 1622 else: 1623 return None
1624
1625 - def CancelJob(self, job_id):
1626 """Cancels a job. 1627 1628 This will only succeed if the job has not started yet. 1629 1630 @type job_id: int 1631 @param job_id: job ID of job to be cancelled. 1632 1633 """ 1634 logging.info("Cancelling job %s", job_id) 1635 1636 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1637
1638 - def ChangeJobPriority(self, job_id, priority):
1639 """Changes a job's priority. 1640 1641 @type job_id: int 1642 @param job_id: ID of the job whose priority should be changed 1643 @type priority: int 1644 @param priority: New priority 1645 1646 """ 1647 logging.info("Changing priority of job %s to %s", job_id, priority) 1648 1649 if priority not in constants.OP_PRIO_SUBMIT_VALID: 1650 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 1651 raise errors.GenericError("Invalid priority %s, allowed are %s" % 1652 (priority, allowed)) 1653 1654 def fn(job): 1655 (success, msg) = job.ChangePriority(priority) 1656 return (success, msg)
1657 1658 return self._ModifyJobUnlocked(job_id, fn)
1659
1660 - def _ModifyJobUnlocked(self, job_id, mod_fn):
1661 """Modifies a job. 1662 1663 @type job_id: int 1664 @param job_id: Job ID 1665 @type mod_fn: callable 1666 @param mod_fn: Modifying function, receiving job object as parameter, 1667 returning tuple of (status boolean, message string) 1668 1669 """ 1670 job = self._LoadJobUnlocked(job_id) 1671 if not job: 1672 logging.debug("Job %s not found", job_id) 1673 return (False, "Job %s not found" % job_id) 1674 1675 assert job.writable, "Can't modify read-only job" 1676 assert not job.archived, "Can't modify archived job" 1677 1678 (success, msg) = mod_fn(job) 1679 1680 if success: 1681 # If the job was finalized (e.g. cancelled), this is the final write 1682 # allowed. The job can be archived anytime. 1683 self.UpdateJobUnlocked(job) 1684 1685 return (success, msg)
1686