Package ganeti :: Package jqueue
[hide private]
[frames] | no frames]

Source Code for Package ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Module implementing the job queue handling. 
  32   
  33  """ 
  34   
  35  import logging 
  36  import errno 
  37  import time 
  38  import weakref 
  39  import threading 
  40  import itertools 
  41  import operator 
  42  import os 
  43   
  44  try: 
  45    # pylint: disable=E0611 
  46    from pyinotify import pyinotify 
  47  except ImportError: 
  48    import pyinotify 
  49   
  50  from ganeti import asyncnotifier 
  51  from ganeti import constants 
  52  from ganeti import serializer 
  53  from ganeti import locking 
  54  from ganeti import luxi 
  55  from ganeti import opcodes 
  56  from ganeti import opcodes_base 
  57  from ganeti import errors 
  58  from ganeti import mcpu 
  59  from ganeti import utils 
  60  from ganeti import jstore 
  61  import ganeti.rpc.node as rpc 
  62  from ganeti import runtime 
  63  from ganeti import netutils 
  64  from ganeti import compat 
  65  from ganeti import ht 
  66  from ganeti import query 
  67  from ganeti import qlang 
  68  from ganeti import pathutils 
  69  from ganeti import vcluster 
  70  from ganeti.cmdlib import cluster 
  71   
  72   
  73  #: Retrieves "id" attribute 
  74  _GetIdAttr = operator.attrgetter("id") 
75 76 77 -class CancelJob(Exception):
78 """Special exception to cancel a job. 79 80 """
81
82 83 -def TimeStampNow():
84 """Returns the current timestamp. 85 86 @rtype: tuple 87 @return: the current time in the (seconds, microseconds) format 88 89 """ 90 return utils.SplitTime(time.time())
91
92 93 -def _CallJqUpdate(runner, names, file_name, content):
94 """Updates job queue file after virtualizing filename. 95 96 """ 97 virt_file_name = vcluster.MakeVirtualPath(file_name) 98 return runner.call_jobqueue_update(names, virt_file_name, content)
99
100 101 -class _QueuedOpCode(object):
102 """Encapsulates an opcode object. 103 104 @ivar log: holds the execution log and consists of tuples 105 of the form C{(log_serial, timestamp, level, message)} 106 @ivar input: the OpCode we encapsulate 107 @ivar status: the current status 108 @ivar result: the result of the LU execution 109 @ivar start_timestamp: timestamp for the start of the execution 110 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 111 @ivar stop_timestamp: timestamp for the end of the execution 112 113 """ 114 __slots__ = ["input", "status", "result", "log", "priority", 115 "start_timestamp", "exec_timestamp", "end_timestamp", 116 "__weakref__"] 117
118 - def __init__(self, op):
119 """Initializes instances of this class. 120 121 @type op: L{opcodes.OpCode} 122 @param op: the opcode we encapsulate 123 124 """ 125 self.input = op 126 self.status = constants.OP_STATUS_QUEUED 127 self.result = None 128 self.log = [] 129 self.start_timestamp = None 130 self.exec_timestamp = None 131 self.end_timestamp = None 132 133 # Get initial priority (it might change during the lifetime of this opcode) 134 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
135 136 @classmethod
137 - def Restore(cls, state):
138 """Restore the _QueuedOpCode from the serialized form. 139 140 @type state: dict 141 @param state: the serialized state 142 @rtype: _QueuedOpCode 143 @return: a new _QueuedOpCode instance 144 145 """ 146 obj = _QueuedOpCode.__new__(cls) 147 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 148 obj.status = state["status"] 149 obj.result = state["result"] 150 obj.log = state["log"] 151 obj.start_timestamp = state.get("start_timestamp", None) 152 obj.exec_timestamp = state.get("exec_timestamp", None) 153 obj.end_timestamp = state.get("end_timestamp", None) 154 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 155 return obj
156
157 - def Serialize(self):
158 """Serializes this _QueuedOpCode. 159 160 @rtype: dict 161 @return: the dictionary holding the serialized state 162 163 """ 164 return { 165 "input": self.input.__getstate__(), 166 "status": self.status, 167 "result": self.result, 168 "log": self.log, 169 "start_timestamp": self.start_timestamp, 170 "exec_timestamp": self.exec_timestamp, 171 "end_timestamp": self.end_timestamp, 172 "priority": self.priority, 173 }
174
175 176 -class _QueuedJob(object):
177 """In-memory job representation. 178 179 This is what we use to track the user-submitted jobs. Locking must 180 be taken care of by users of this class. 181 182 @type queue: L{JobQueue} 183 @ivar queue: the parent queue 184 @ivar id: the job ID 185 @type ops: list 186 @ivar ops: the list of _QueuedOpCode that constitute the job 187 @type log_serial: int 188 @ivar log_serial: holds the index for the next log entry 189 @ivar received_timestamp: the timestamp for when the job was received 190 @ivar start_timestmap: the timestamp for start of execution 191 @ivar end_timestamp: the timestamp for end of execution 192 @ivar writable: Whether the job is allowed to be modified 193 194 """ 195 # pylint: disable=W0212 196 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 197 "received_timestamp", "start_timestamp", "end_timestamp", 198 "writable", "archived", 199 "livelock", "process_id", 200 "__weakref__"] 201
202 - def AddReasons(self, pickup=False):
203 """Extend the reason trail 204 205 Add the reason for all the opcodes of this job to be executed. 206 207 """ 208 count = 0 209 for queued_op in self.ops: 210 op = queued_op.input 211 if pickup: 212 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP 213 else: 214 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE 215 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__, 216 reason_src_prefix) 217 reason_text = "job=%d;index=%d" % (self.id, count) 218 reason = getattr(op, "reason", []) 219 reason.append((reason_src, reason_text, utils.EpochNano())) 220 op.reason = reason 221 count = count + 1
222
223 - def __init__(self, queue, job_id, ops, writable):
224 """Constructor for the _QueuedJob. 225 226 @type queue: L{JobQueue} 227 @param queue: our parent queue 228 @type job_id: job_id 229 @param job_id: our job id 230 @type ops: list 231 @param ops: the list of opcodes we hold, which will be encapsulated 232 in _QueuedOpCodes 233 @type writable: bool 234 @param writable: Whether job can be modified 235 236 """ 237 if not ops: 238 raise errors.GenericError("A job needs at least one opcode") 239 240 self.queue = queue 241 self.id = int(job_id) 242 self.ops = [_QueuedOpCode(op) for op in ops] 243 self.AddReasons() 244 self.log_serial = 0 245 self.received_timestamp = TimeStampNow() 246 self.start_timestamp = None 247 self.end_timestamp = None 248 self.archived = False 249 self.livelock = None 250 self.process_id = None 251 252 self.writable = None 253 254 self._InitInMemory(self, writable) 255 256 assert not self.archived, "New jobs can not be marked as archived"
257 258 @staticmethod
259 - def _InitInMemory(obj, writable):
260 """Initializes in-memory variables. 261 262 """ 263 obj.writable = writable 264 obj.ops_iter = None 265 obj.cur_opctx = None
266
267 - def __repr__(self):
268 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 269 "id=%s" % self.id, 270 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 271 272 return "<%s at %#x>" % (" ".join(status), id(self))
273 274 @classmethod
275 - def Restore(cls, queue, state, writable, archived):
276 """Restore a _QueuedJob from serialized state: 277 278 @type queue: L{JobQueue} 279 @param queue: to which queue the restored job belongs 280 @type state: dict 281 @param state: the serialized state 282 @type writable: bool 283 @param writable: Whether job can be modified 284 @type archived: bool 285 @param archived: Whether job was already archived 286 @rtype: _JobQueue 287 @return: the restored _JobQueue instance 288 289 """ 290 obj = _QueuedJob.__new__(cls) 291 obj.queue = queue 292 obj.id = int(state["id"]) 293 obj.received_timestamp = state.get("received_timestamp", None) 294 obj.start_timestamp = state.get("start_timestamp", None) 295 obj.end_timestamp = state.get("end_timestamp", None) 296 obj.archived = archived 297 obj.livelock = state.get("livelock", None) 298 obj.process_id = state.get("process_id", None) 299 if obj.process_id is not None: 300 obj.process_id = int(obj.process_id) 301 302 obj.ops = [] 303 obj.log_serial = 0 304 for op_state in state["ops"]: 305 op = _QueuedOpCode.Restore(op_state) 306 for log_entry in op.log: 307 obj.log_serial = max(obj.log_serial, log_entry[0]) 308 obj.ops.append(op) 309 310 cls._InitInMemory(obj, writable) 311 312 return obj
313
314 - def Serialize(self):
315 """Serialize the _JobQueue instance. 316 317 @rtype: dict 318 @return: the serialized state 319 320 """ 321 return { 322 "id": self.id, 323 "ops": [op.Serialize() for op in self.ops], 324 "start_timestamp": self.start_timestamp, 325 "end_timestamp": self.end_timestamp, 326 "received_timestamp": self.received_timestamp, 327 "livelock": self.livelock, 328 "process_id": self.process_id, 329 }
330
331 - def CalcStatus(self):
332 """Compute the status of this job. 333 334 This function iterates over all the _QueuedOpCodes in the job and 335 based on their status, computes the job status. 336 337 The algorithm is: 338 - if we find a cancelled, or finished with error, the job 339 status will be the same 340 - otherwise, the last opcode with the status one of: 341 - waitlock 342 - canceling 343 - running 344 345 will determine the job status 346 347 - otherwise, it means either all opcodes are queued, or success, 348 and the job status will be the same 349 350 @return: the job status 351 352 """ 353 status = constants.JOB_STATUS_QUEUED 354 355 all_success = True 356 for op in self.ops: 357 if op.status == constants.OP_STATUS_SUCCESS: 358 continue 359 360 all_success = False 361 362 if op.status == constants.OP_STATUS_QUEUED: 363 pass 364 elif op.status == constants.OP_STATUS_WAITING: 365 status = constants.JOB_STATUS_WAITING 366 elif op.status == constants.OP_STATUS_RUNNING: 367 status = constants.JOB_STATUS_RUNNING 368 elif op.status == constants.OP_STATUS_CANCELING: 369 status = constants.JOB_STATUS_CANCELING 370 break 371 elif op.status == constants.OP_STATUS_ERROR: 372 status = constants.JOB_STATUS_ERROR 373 # The whole job fails if one opcode failed 374 break 375 elif op.status == constants.OP_STATUS_CANCELED: 376 status = constants.OP_STATUS_CANCELED 377 break 378 379 if all_success: 380 status = constants.JOB_STATUS_SUCCESS 381 382 return status
383
384 - def CalcPriority(self):
385 """Gets the current priority for this job. 386 387 Only unfinished opcodes are considered. When all are done, the default 388 priority is used. 389 390 @rtype: int 391 392 """ 393 priorities = [op.priority for op in self.ops 394 if op.status not in constants.OPS_FINALIZED] 395 396 if not priorities: 397 # All opcodes are done, assume default priority 398 return constants.OP_PRIO_DEFAULT 399 400 return min(priorities)
401
402 - def GetLogEntries(self, newer_than):
403 """Selectively returns the log entries. 404 405 @type newer_than: None or int 406 @param newer_than: if this is None, return all log entries, 407 otherwise return only the log entries with serial higher 408 than this value 409 @rtype: list 410 @return: the list of the log entries selected 411 412 """ 413 if newer_than is None: 414 serial = -1 415 else: 416 serial = newer_than 417 418 entries = [] 419 for op in self.ops: 420 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 421 422 return entries
423
424 - def MarkUnfinishedOps(self, status, result):
425 """Mark unfinished opcodes with a given status and result. 426 427 This is an utility function for marking all running or waiting to 428 be run opcodes with a given status. Opcodes which are already 429 finalised are not changed. 430 431 @param status: a given opcode status 432 @param result: the opcode result 433 434 """ 435 not_marked = True 436 for op in self.ops: 437 if op.status in constants.OPS_FINALIZED: 438 assert not_marked, "Finalized opcodes found after non-finalized ones" 439 continue 440 op.status = status 441 op.result = result 442 not_marked = False
443
444 - def Finalize(self):
445 """Marks the job as finalized. 446 447 """ 448 self.end_timestamp = TimeStampNow()
449
450 - def Cancel(self):
451 """Marks job as canceled/-ing if possible. 452 453 @rtype: tuple; (bool, string) 454 @return: Boolean describing whether job was successfully canceled or marked 455 as canceling and a text message 456 457 """ 458 status = self.CalcStatus() 459 460 if status == constants.JOB_STATUS_QUEUED: 461 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 462 "Job canceled by request") 463 self.Finalize() 464 return (True, "Job %s canceled" % self.id) 465 466 elif status == constants.JOB_STATUS_WAITING: 467 # The worker will notice the new status and cancel the job 468 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 469 return (True, "Job %s will be canceled" % self.id) 470 471 else: 472 logging.debug("Job %s is no longer waiting in the queue", self.id) 473 return (False, "Job %s is no longer waiting in the queue" % self.id)
474
475 - def ChangePriority(self, priority):
476 """Changes the job priority. 477 478 @type priority: int 479 @param priority: New priority 480 @rtype: tuple; (bool, string) 481 @return: Boolean describing whether job's priority was successfully changed 482 and a text message 483 484 """ 485 status = self.CalcStatus() 486 487 if status in constants.JOBS_FINALIZED: 488 return (False, "Job %s is finished" % self.id) 489 elif status == constants.JOB_STATUS_CANCELING: 490 return (False, "Job %s is cancelling" % self.id) 491 else: 492 assert status in (constants.JOB_STATUS_QUEUED, 493 constants.JOB_STATUS_WAITING, 494 constants.JOB_STATUS_RUNNING) 495 496 changed = False 497 for op in self.ops: 498 if (op.status == constants.OP_STATUS_RUNNING or 499 op.status in constants.OPS_FINALIZED): 500 assert not changed, \ 501 ("Found opcode for which priority should not be changed after" 502 " priority has been changed for previous opcodes") 503 continue 504 505 assert op.status in (constants.OP_STATUS_QUEUED, 506 constants.OP_STATUS_WAITING) 507 508 changed = True 509 510 # Set new priority (doesn't modify opcode input) 511 op.priority = priority 512 513 if changed: 514 return (True, ("Priorities of pending opcodes for job %s have been" 515 " changed to %s" % (self.id, priority))) 516 else: 517 return (False, "Job %s had no pending opcodes" % self.id)
518
519 - def SetPid(self, pid):
520 """Sets the job's process ID 521 522 @type pid: int 523 @param pid: the process ID 524 525 """ 526 status = self.CalcStatus() 527 528 if status in (constants.JOB_STATUS_QUEUED, 529 constants.JOB_STATUS_WAITING): 530 if self.process_id is not None: 531 logging.warning("Replacing the process id %s of job %s with %s", 532 self.process_id, self.id, pid) 533 self.process_id = pid 534 else: 535 logging.warning("Can set pid only for queued/waiting jobs")
536
537 538 -class _OpExecCallbacks(mcpu.OpExecCbBase):
539
540 - def __init__(self, queue, job, op):
541 """Initializes this class. 542 543 @type queue: L{JobQueue} 544 @param queue: Job queue 545 @type job: L{_QueuedJob} 546 @param job: Job object 547 @type op: L{_QueuedOpCode} 548 @param op: OpCode 549 550 """ 551 super(_OpExecCallbacks, self).__init__() 552 553 assert queue, "Queue is missing" 554 assert job, "Job is missing" 555 assert op, "Opcode is missing" 556 557 self._queue = queue 558 self._job = job 559 self._op = op
560
561 - def _CheckCancel(self):
562 """Raises an exception to cancel the job if asked to. 563 564 """ 565 # Cancel here if we were asked to 566 if self._op.status == constants.OP_STATUS_CANCELING: 567 logging.debug("Canceling opcode") 568 raise CancelJob()
569
570 - def NotifyStart(self):
571 """Mark the opcode as running, not lock-waiting. 572 573 This is called from the mcpu code as a notifier function, when the LU is 574 finally about to start the Exec() method. Of course, to have end-user 575 visible results, the opcode must be initially (before calling into 576 Processor.ExecOpCode) set to OP_STATUS_WAITING. 577 578 """ 579 assert self._op in self._job.ops 580 assert self._op.status in (constants.OP_STATUS_WAITING, 581 constants.OP_STATUS_CANCELING) 582 583 # Cancel here if we were asked to 584 self._CheckCancel() 585 586 logging.debug("Opcode is now running") 587 588 self._op.status = constants.OP_STATUS_RUNNING 589 self._op.exec_timestamp = TimeStampNow() 590 591 # And finally replicate the job status 592 self._queue.UpdateJobUnlocked(self._job)
593
594 - def NotifyRetry(self):
595 """Mark opcode again as lock-waiting. 596 597 This is called from the mcpu code just after calling PrepareRetry. 598 The opcode will now again acquire locks (more, hopefully). 599 600 """ 601 self._op.status = constants.OP_STATUS_WAITING 602 logging.debug("Opcode will be retried. Back to waiting.")
603
604 - def _AppendFeedback(self, timestamp, log_type, log_msg):
605 """Internal feedback append function, with locks 606 607 """ 608 self._job.log_serial += 1 609 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 610 self._queue.UpdateJobUnlocked(self._job, replicate=False)
611
612 - def Feedback(self, *args):
613 """Append a log entry. 614 615 """ 616 assert len(args) < 3 617 618 if len(args) == 1: 619 log_type = constants.ELOG_MESSAGE 620 log_msg = args[0] 621 else: 622 (log_type, log_msg) = args 623 624 # The time is split to make serialization easier and not lose 625 # precision. 626 timestamp = utils.SplitTime(time.time()) 627 self._AppendFeedback(timestamp, log_type, log_msg)
628
629 - def CurrentPriority(self):
630 """Returns current priority for opcode. 631 632 """ 633 assert self._op.status in (constants.OP_STATUS_WAITING, 634 constants.OP_STATUS_CANCELING) 635 636 # Cancel here if we were asked to 637 self._CheckCancel() 638 639 return self._op.priority
640
641 - def SubmitManyJobs(self, jobs):
642 """Submits jobs for processing. 643 644 See L{JobQueue.SubmitManyJobs}. 645 646 """ 647 # Locking is done in job queue 648 return self._queue.SubmitManyJobs(jobs)
649
650 651 -def _EncodeOpError(err):
652 """Encodes an error which occurred while processing an opcode. 653 654 """ 655 if isinstance(err, errors.GenericError): 656 to_encode = err 657 else: 658 to_encode = errors.OpExecError(str(err)) 659 660 return errors.EncodeException(to_encode)
661
662 663 -class _TimeoutStrategyWrapper:
664 - def __init__(self, fn):
665 """Initializes this class. 666 667 """ 668 self._fn = fn 669 self._next = None
670
671 - def _Advance(self):
672 """Gets the next timeout if necessary. 673 674 """ 675 if self._next is None: 676 self._next = self._fn()
677
678 - def Peek(self):
679 """Returns the next timeout. 680 681 """ 682 self._Advance() 683 return self._next
684
685 - def Next(self):
686 """Returns the current timeout and advances the internal state. 687 688 """ 689 self._Advance() 690 result = self._next 691 self._next = None 692 return result
693
694 695 -class _OpExecContext:
696 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
697 """Initializes this class. 698 699 """ 700 self.op = op 701 self.index = index 702 self.log_prefix = log_prefix 703 self.summary = op.input.Summary() 704 705 # Create local copy to modify 706 if getattr(op.input, opcodes_base.DEPEND_ATTR, None): 707 self.jobdeps = op.input.depends[:] 708 else: 709 self.jobdeps = None 710 711 self._timeout_strategy_factory = timeout_strategy_factory 712 self._ResetTimeoutStrategy()
713
714 - def _ResetTimeoutStrategy(self):
715 """Creates a new timeout strategy. 716 717 """ 718 self._timeout_strategy = \ 719 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
720
721 - def CheckPriorityIncrease(self):
722 """Checks whether priority can and should be increased. 723 724 Called when locks couldn't be acquired. 725 726 """ 727 op = self.op 728 729 # Exhausted all retries and next round should not use blocking acquire 730 # for locks? 731 if (self._timeout_strategy.Peek() is None and 732 op.priority > constants.OP_PRIO_HIGHEST): 733 logging.debug("Increasing priority") 734 op.priority -= 1 735 self._ResetTimeoutStrategy() 736 return True 737 738 return False
739
740 - def GetNextLockTimeout(self):
741 """Returns the next lock acquire timeout. 742 743 """ 744 return self._timeout_strategy.Next()
745
746 747 -class _JobProcessor(object):
748 (DEFER, 749 WAITDEP, 750 FINISHED) = range(1, 4) 751
752 - def __init__(self, queue, opexec_fn, job, 753 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
754 """Initializes this class. 755 756 """ 757 self.queue = queue 758 self.opexec_fn = opexec_fn 759 self.job = job 760 self._timeout_strategy_factory = _timeout_strategy_factory
761 762 @staticmethod
763 - def _FindNextOpcode(job, timeout_strategy_factory):
764 """Locates the next opcode to run. 765 766 @type job: L{_QueuedJob} 767 @param job: Job object 768 @param timeout_strategy_factory: Callable to create new timeout strategy 769 770 """ 771 # Create some sort of a cache to speed up locating next opcode for future 772 # lookups 773 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 774 # pending and one for processed ops. 775 if job.ops_iter is None: 776 job.ops_iter = enumerate(job.ops) 777 778 # Find next opcode to run 779 while True: 780 try: 781 (idx, op) = job.ops_iter.next() 782 except StopIteration: 783 raise errors.ProgrammerError("Called for a finished job") 784 785 if op.status == constants.OP_STATUS_RUNNING: 786 # Found an opcode already marked as running 787 raise errors.ProgrammerError("Called for job marked as running") 788 789 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 790 timeout_strategy_factory) 791 792 if op.status not in constants.OPS_FINALIZED: 793 return opctx 794 795 # This is a job that was partially completed before master daemon 796 # shutdown, so it can be expected that some opcodes are already 797 # completed successfully (if any did error out, then the whole job 798 # should have been aborted and not resubmitted for processing). 799 logging.info("%s: opcode %s already processed, skipping", 800 opctx.log_prefix, opctx.summary)
801 802 @staticmethod
803 - def _MarkWaitlock(job, op):
804 """Marks an opcode as waiting for locks. 805 806 The job's start timestamp is also set if necessary. 807 808 @type job: L{_QueuedJob} 809 @param job: Job object 810 @type op: L{_QueuedOpCode} 811 @param op: Opcode object 812 813 """ 814 assert op in job.ops 815 assert op.status in (constants.OP_STATUS_QUEUED, 816 constants.OP_STATUS_WAITING) 817 818 update = False 819 820 op.result = None 821 822 if op.status == constants.OP_STATUS_QUEUED: 823 op.status = constants.OP_STATUS_WAITING 824 update = True 825 826 if op.start_timestamp is None: 827 op.start_timestamp = TimeStampNow() 828 update = True 829 830 if job.start_timestamp is None: 831 job.start_timestamp = op.start_timestamp 832 update = True 833 834 assert op.status == constants.OP_STATUS_WAITING 835 836 return update
837 838 @staticmethod
839 - def _CheckDependencies(queue, job, opctx):
840 """Checks if an opcode has dependencies and if so, processes them. 841 842 @type queue: L{JobQueue} 843 @param queue: Queue object 844 @type job: L{_QueuedJob} 845 @param job: Job object 846 @type opctx: L{_OpExecContext} 847 @param opctx: Opcode execution context 848 @rtype: bool 849 @return: Whether opcode will be re-scheduled by dependency tracker 850 851 """ 852 op = opctx.op 853 854 result = False 855 856 while opctx.jobdeps: 857 (dep_job_id, dep_status) = opctx.jobdeps[0] 858 859 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 860 dep_status) 861 assert ht.TNonEmptyString(depmsg), "No dependency message" 862 863 logging.info("%s: %s", opctx.log_prefix, depmsg) 864 865 if depresult == _JobDependencyManager.CONTINUE: 866 # Remove dependency and continue 867 opctx.jobdeps.pop(0) 868 869 elif depresult == _JobDependencyManager.WAIT: 870 # Need to wait for notification, dependency tracker will re-add job 871 # to workerpool 872 result = True 873 break 874 875 elif depresult == _JobDependencyManager.CANCEL: 876 # Job was cancelled, cancel this job as well 877 job.Cancel() 878 assert op.status == constants.OP_STATUS_CANCELING 879 break 880 881 elif depresult in (_JobDependencyManager.WRONGSTATUS, 882 _JobDependencyManager.ERROR): 883 # Job failed or there was an error, this job must fail 884 op.status = constants.OP_STATUS_ERROR 885 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 886 break 887 888 else: 889 raise errors.ProgrammerError("Unknown dependency result '%s'" % 890 depresult) 891 892 return result
893
894 - def _ExecOpCodeUnlocked(self, opctx):
895 """Processes one opcode and returns the result. 896 897 """ 898 op = opctx.op 899 900 assert op.status in (constants.OP_STATUS_WAITING, 901 constants.OP_STATUS_CANCELING) 902 903 # The very last check if the job was cancelled before trying to execute 904 if op.status == constants.OP_STATUS_CANCELING: 905 return (constants.OP_STATUS_CANCELING, None) 906 907 timeout = opctx.GetNextLockTimeout() 908 909 try: 910 # Make sure not to hold queue lock while calling ExecOpCode 911 result = self.opexec_fn(op.input, 912 _OpExecCallbacks(self.queue, self.job, op), 913 timeout=timeout) 914 except mcpu.LockAcquireTimeout: 915 assert timeout is not None, "Received timeout for blocking acquire" 916 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 917 918 assert op.status in (constants.OP_STATUS_WAITING, 919 constants.OP_STATUS_CANCELING) 920 921 # Was job cancelled while we were waiting for the lock? 922 if op.status == constants.OP_STATUS_CANCELING: 923 return (constants.OP_STATUS_CANCELING, None) 924 925 # Stay in waitlock while trying to re-acquire lock 926 return (constants.OP_STATUS_WAITING, None) 927 except CancelJob: 928 logging.exception("%s: Canceling job", opctx.log_prefix) 929 assert op.status == constants.OP_STATUS_CANCELING 930 return (constants.OP_STATUS_CANCELING, None) 931 932 except Exception, err: # pylint: disable=W0703 933 logging.exception("%s: Caught exception in %s", 934 opctx.log_prefix, opctx.summary) 935 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 936 else: 937 logging.debug("%s: %s successful", 938 opctx.log_prefix, opctx.summary) 939 return (constants.OP_STATUS_SUCCESS, result)
940
941 - def __call__(self, _nextop_fn=None):
942 """Continues execution of a job. 943 944 @param _nextop_fn: Callback function for tests 945 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 946 be deferred and C{WAITDEP} if the dependency manager 947 (L{_JobDependencyManager}) will re-schedule the job when appropriate 948 949 """ 950 queue = self.queue 951 job = self.job 952 953 logging.debug("Processing job %s", job.id) 954 955 try: 956 opcount = len(job.ops) 957 958 assert job.writable, "Expected writable job" 959 960 # Don't do anything for finalized jobs 961 if job.CalcStatus() in constants.JOBS_FINALIZED: 962 return self.FINISHED 963 964 # Is a previous opcode still pending? 965 if job.cur_opctx: 966 opctx = job.cur_opctx 967 job.cur_opctx = None 968 else: 969 if __debug__ and _nextop_fn: 970 _nextop_fn() 971 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 972 973 op = opctx.op 974 975 # Consistency check 976 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 977 constants.OP_STATUS_CANCELING) 978 for i in job.ops[opctx.index + 1:]) 979 980 assert op.status in (constants.OP_STATUS_QUEUED, 981 constants.OP_STATUS_WAITING, 982 constants.OP_STATUS_CANCELING) 983 984 assert (op.priority <= constants.OP_PRIO_LOWEST and 985 op.priority >= constants.OP_PRIO_HIGHEST) 986 987 waitjob = None 988 989 if op.status != constants.OP_STATUS_CANCELING: 990 assert op.status in (constants.OP_STATUS_QUEUED, 991 constants.OP_STATUS_WAITING) 992 993 # Prepare to start opcode 994 if self._MarkWaitlock(job, op): 995 # Write to disk 996 queue.UpdateJobUnlocked(job) 997 998 assert op.status == constants.OP_STATUS_WAITING 999 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1000 assert job.start_timestamp and op.start_timestamp 1001 assert waitjob is None 1002 1003 # Check if waiting for a job is necessary 1004 waitjob = self._CheckDependencies(queue, job, opctx) 1005 1006 assert op.status in (constants.OP_STATUS_WAITING, 1007 constants.OP_STATUS_CANCELING, 1008 constants.OP_STATUS_ERROR) 1009 1010 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1011 constants.OP_STATUS_ERROR)): 1012 logging.info("%s: opcode %s waiting for locks", 1013 opctx.log_prefix, opctx.summary) 1014 1015 assert not opctx.jobdeps, "Not all dependencies were removed" 1016 1017 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1018 1019 op.status = op_status 1020 op.result = op_result 1021 1022 assert not waitjob 1023 1024 if op.status in (constants.OP_STATUS_WAITING, 1025 constants.OP_STATUS_QUEUED): 1026 # waiting: Couldn't get locks in time 1027 # queued: Queue is shutting down 1028 assert not op.end_timestamp 1029 else: 1030 # Finalize opcode 1031 op.end_timestamp = TimeStampNow() 1032 1033 if op.status == constants.OP_STATUS_CANCELING: 1034 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1035 for i in job.ops[opctx.index:]) 1036 else: 1037 assert op.status in constants.OPS_FINALIZED 1038 1039 if op.status == constants.OP_STATUS_QUEUED: 1040 # Queue is shutting down 1041 assert not waitjob 1042 1043 finalize = False 1044 1045 # Reset context 1046 job.cur_opctx = None 1047 1048 # In no case must the status be finalized here 1049 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1050 1051 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1052 finalize = False 1053 1054 if not waitjob and opctx.CheckPriorityIncrease(): 1055 # Priority was changed, need to update on-disk file 1056 queue.UpdateJobUnlocked(job) 1057 1058 # Keep around for another round 1059 job.cur_opctx = opctx 1060 1061 assert (op.priority <= constants.OP_PRIO_LOWEST and 1062 op.priority >= constants.OP_PRIO_HIGHEST) 1063 1064 # In no case must the status be finalized here 1065 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1066 1067 else: 1068 # Ensure all opcodes so far have been successful 1069 assert (opctx.index == 0 or 1070 compat.all(i.status == constants.OP_STATUS_SUCCESS 1071 for i in job.ops[:opctx.index])) 1072 1073 # Reset context 1074 job.cur_opctx = None 1075 1076 if op.status == constants.OP_STATUS_SUCCESS: 1077 finalize = False 1078 1079 elif op.status == constants.OP_STATUS_ERROR: 1080 # If we get here, we cannot afford to check for any consistency 1081 # any more, we just want to clean up. 1082 # TODO: Actually, it wouldn't be a bad idea to start a timer 1083 # here to kill the whole process. 1084 to_encode = errors.OpExecError("Preceding opcode failed") 1085 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1086 _EncodeOpError(to_encode)) 1087 finalize = True 1088 elif op.status == constants.OP_STATUS_CANCELING: 1089 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1090 "Job canceled by request") 1091 finalize = True 1092 1093 else: 1094 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1095 1096 if opctx.index == (opcount - 1): 1097 # Finalize on last opcode 1098 finalize = True 1099 1100 if finalize: 1101 # All opcodes have been run, finalize job 1102 job.Finalize() 1103 1104 # Write to disk. If the job status is final, this is the final write 1105 # allowed. Once the file has been written, it can be archived anytime. 1106 queue.UpdateJobUnlocked(job) 1107 1108 assert not waitjob 1109 1110 if finalize: 1111 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1112 return self.FINISHED 1113 1114 assert not waitjob or queue.depmgr.JobWaiting(job) 1115 1116 if waitjob: 1117 return self.WAITDEP 1118 else: 1119 return self.DEFER 1120 finally: 1121 assert job.writable, "Job became read-only while being processed"
1122
1123 1124 -class _JobDependencyManager:
1125 """Keeps track of job dependencies. 1126 1127 """ 1128 (WAIT, 1129 ERROR, 1130 CANCEL, 1131 CONTINUE, 1132 WRONGSTATUS) = range(1, 6) 1133
1134 - def __init__(self, getstatus_fn):
1135 """Initializes this class. 1136 1137 """ 1138 self._getstatus_fn = getstatus_fn 1139 1140 self._waiters = {}
1141
1142 - def JobWaiting(self, job):
1143 """Checks if a job is waiting. 1144 1145 """ 1146 return compat.any(job in jobs 1147 for jobs in self._waiters.values())
1148
1149 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1150 """Checks if a dependency job has the requested status. 1151 1152 If the other job is not yet in a finalized status, the calling job will be 1153 notified (re-added to the workerpool) at a later point. 1154 1155 @type job: L{_QueuedJob} 1156 @param job: Job object 1157 @type dep_job_id: int 1158 @param dep_job_id: ID of dependency job 1159 @type dep_status: list 1160 @param dep_status: Required status 1161 1162 """ 1163 assert ht.TJobId(job.id) 1164 assert ht.TJobId(dep_job_id) 1165 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1166 1167 if job.id == dep_job_id: 1168 return (self.ERROR, "Job can't depend on itself") 1169 1170 # Get status of dependency job 1171 try: 1172 status = self._getstatus_fn(dep_job_id) 1173 except errors.JobLost, err: 1174 return (self.ERROR, "Dependency error: %s" % err) 1175 1176 assert status in constants.JOB_STATUS_ALL 1177 1178 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1179 1180 if status not in constants.JOBS_FINALIZED: 1181 # Register for notification and wait for job to finish 1182 job_id_waiters.add(job) 1183 return (self.WAIT, 1184 "Need to wait for job %s, wanted status '%s'" % 1185 (dep_job_id, dep_status)) 1186 1187 # Remove from waiters list 1188 if job in job_id_waiters: 1189 job_id_waiters.remove(job) 1190 1191 if (status == constants.JOB_STATUS_CANCELED and 1192 constants.JOB_STATUS_CANCELED not in dep_status): 1193 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1194 1195 elif not dep_status or status in dep_status: 1196 return (self.CONTINUE, 1197 "Dependency job %s finished with status '%s'" % 1198 (dep_job_id, status)) 1199 1200 else: 1201 return (self.WRONGSTATUS, 1202 "Dependency job %s finished with status '%s'," 1203 " not one of '%s' as required" % 1204 (dep_job_id, status, utils.CommaJoin(dep_status)))
1205
1207 """Remove all jobs without actual waiters. 1208 1209 """ 1210 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1211 if not waiters]: 1212 del self._waiters[job_id]
1213
1214 1215 -class JobQueue(object):
1216 """Queue used to manage the jobs. 1217 1218 """
1219 - def __init__(self, context, cfg):
1220 """Constructor for JobQueue. 1221 1222 The constructor will initialize the job queue object and then 1223 start loading the current jobs from disk, either for starting them 1224 (if they were queue) or for aborting them (if they were already 1225 running). 1226 1227 @type context: GanetiContext 1228 @param context: the context object for access to the configuration 1229 data and other ganeti objects 1230 1231 """ 1232 self.context = context 1233 self._memcache = weakref.WeakValueDictionary() 1234 self._my_hostname = netutils.Hostname.GetSysName() 1235 1236 # Get initial list of nodes 1237 self._nodes = dict((n.name, n.primary_ip) 1238 for n in cfg.GetAllNodesInfo().values() 1239 if n.master_candidate) 1240 1241 # Remove master node 1242 self._nodes.pop(self._my_hostname, None) 1243 1244 # Job dependencies 1245 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies)
1246
1247 - def _GetRpc(self, address_list):
1248 """Gets RPC runner with context. 1249 1250 """ 1251 return rpc.JobQueueRunner(self.context, address_list)
1252 1253 @staticmethod
1254 - def _CheckRpcResult(result, nodes, failmsg):
1255 """Verifies the status of an RPC call. 1256 1257 Since we aim to keep consistency should this node (the current 1258 master) fail, we will log errors if our rpc fail, and especially 1259 log the case when more than half of the nodes fails. 1260 1261 @param result: the data as returned from the rpc call 1262 @type nodes: list 1263 @param nodes: the list of nodes we made the call to 1264 @type failmsg: str 1265 @param failmsg: the identifier to be used for logging 1266 1267 """ 1268 failed = [] 1269 success = [] 1270 1271 for node in nodes: 1272 msg = result[node].fail_msg 1273 if msg: 1274 failed.append(node) 1275 logging.error("RPC call %s (%s) failed on node %s: %s", 1276 result[node].call, failmsg, node, msg) 1277 else: 1278 success.append(node) 1279 1280 # +1 for the master node 1281 if (len(success) + 1) < len(failed): 1282 # TODO: Handle failing nodes 1283 logging.error("More than half of the nodes failed")
1284
1285 - def _GetNodeIp(self):
1286 """Helper for returning the node name/ip list. 1287 1288 @rtype: (list, list) 1289 @return: a tuple of two lists, the first one with the node 1290 names and the second one with the node addresses 1291 1292 """ 1293 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1294 name_list = self._nodes.keys() 1295 addr_list = [self._nodes[name] for name in name_list] 1296 return name_list, addr_list
1297
1298 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1299 """Writes a file locally and then replicates it to all nodes. 1300 1301 This function will replace the contents of a file on the local 1302 node and then replicate it to all the other nodes we have. 1303 1304 @type file_name: str 1305 @param file_name: the path of the file to be replicated 1306 @type data: str 1307 @param data: the new contents of the file 1308 @type replicate: boolean 1309 @param replicate: whether to spread the changes to the remote nodes 1310 1311 """ 1312 getents = runtime.GetEnts() 1313 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1314 gid=getents.daemons_gid, 1315 mode=constants.JOB_QUEUE_FILES_PERMS) 1316 1317 if replicate: 1318 names, addrs = self._GetNodeIp() 1319 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) 1320 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1321
1322 - def _RenameFilesUnlocked(self, rename):
1323 """Renames a file locally and then replicate the change. 1324 1325 This function will rename a file in the local queue directory 1326 and then replicate this rename to all the other nodes we have. 1327 1328 @type rename: list of (old, new) 1329 @param rename: List containing tuples mapping old to new names 1330 1331 """ 1332 # Rename them locally 1333 for old, new in rename: 1334 utils.RenameFile(old, new, mkdir=True) 1335 1336 # ... and on all nodes 1337 names, addrs = self._GetNodeIp() 1338 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1339 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1340 1341 @staticmethod
1342 - def _GetJobPath(job_id):
1343 """Returns the job file for a given job id. 1344 1345 @type job_id: str 1346 @param job_id: the job identifier 1347 @rtype: str 1348 @return: the path to the job file 1349 1350 """ 1351 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1352 1353 @staticmethod
1354 - def _GetArchivedJobPath(job_id):
1355 """Returns the archived job file for a give job id. 1356 1357 @type job_id: str 1358 @param job_id: the job identifier 1359 @rtype: str 1360 @return: the path to the archived job file 1361 1362 """ 1363 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, 1364 jstore.GetArchiveDirectory(job_id), 1365 "job-%s" % job_id)
1366 1367 @staticmethod
1368 - def _DetermineJobDirectories(archived):
1369 """Build list of directories containing job files. 1370 1371 @type archived: bool 1372 @param archived: Whether to include directories for archived jobs 1373 @rtype: list 1374 1375 """ 1376 result = [pathutils.QUEUE_DIR] 1377 1378 if archived: 1379 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR 1380 result.extend(map(compat.partial(utils.PathJoin, archive_path), 1381 utils.ListVisibleFiles(archive_path))) 1382 1383 return result
1384 1385 @classmethod
1386 - def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1387 """Return all known job IDs. 1388 1389 The method only looks at disk because it's a requirement that all 1390 jobs are present on disk (so in the _memcache we don't have any 1391 extra IDs). 1392 1393 @type sort: boolean 1394 @param sort: perform sorting on the returned job ids 1395 @rtype: list 1396 @return: the list of job IDs 1397 1398 """ 1399 jlist = [] 1400 1401 for path in cls._DetermineJobDirectories(archived): 1402 for filename in utils.ListVisibleFiles(path): 1403 m = constants.JOB_FILE_RE.match(filename) 1404 if m: 1405 jlist.append(int(m.group(1))) 1406 1407 if sort: 1408 jlist.sort() 1409 return jlist
1410
1411 - def _LoadJobUnlocked(self, job_id):
1412 """Loads a job from the disk or memory. 1413 1414 Given a job id, this will return the cached job object if 1415 existing, or try to load the job from the disk. If loading from 1416 disk, it will also add the job to the cache. 1417 1418 @type job_id: int 1419 @param job_id: the job id 1420 @rtype: L{_QueuedJob} or None 1421 @return: either None or the job object 1422 1423 """ 1424 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!" 1425 1426 job = self._memcache.get(job_id, None) 1427 if job: 1428 logging.debug("Found job %s in memcache", job_id) 1429 assert job.writable, "Found read-only job in memcache" 1430 return job 1431 1432 try: 1433 job = JobQueue._LoadJobFromDisk(self, job_id, False) 1434 if job is None: 1435 return job 1436 except errors.JobFileCorrupted: 1437 old_path = self._GetJobPath(job_id) 1438 new_path = self._GetArchivedJobPath(job_id) 1439 if old_path == new_path: 1440 # job already archived (future case) 1441 logging.exception("Can't parse job %s", job_id) 1442 else: 1443 # non-archived case 1444 logging.exception("Can't parse job %s, will archive.", job_id) 1445 self._RenameFilesUnlocked([(old_path, new_path)]) 1446 return None 1447 1448 assert job.writable, "Job just loaded is not writable" 1449 1450 self._memcache[job_id] = job 1451 logging.debug("Added job %s to the cache", job_id) 1452 return job
1453 1454 @staticmethod
1455 - def _LoadJobFromDisk(queue, job_id, try_archived, writable=None):
1456 """Load the given job file from disk. 1457 1458 Given a job file, read, load and restore it in a _QueuedJob format. 1459 1460 @type job_id: int 1461 @param job_id: job identifier 1462 @type try_archived: bool 1463 @param try_archived: Whether to try loading an archived job 1464 @rtype: L{_QueuedJob} or None 1465 @return: either None or the job object 1466 1467 """ 1468 path_functions = [(JobQueue._GetJobPath, False)] 1469 1470 if try_archived: 1471 path_functions.append((JobQueue._GetArchivedJobPath, True)) 1472 1473 raw_data = None 1474 archived = None 1475 1476 for (fn, archived) in path_functions: 1477 filepath = fn(job_id) 1478 logging.debug("Loading job from %s", filepath) 1479 try: 1480 raw_data = utils.ReadFile(filepath) 1481 except EnvironmentError, err: 1482 if err.errno != errno.ENOENT: 1483 raise 1484 else: 1485 break 1486 1487 if not raw_data: 1488 logging.debug("No data available for job %s", job_id) 1489 return None 1490 1491 if writable is None: 1492 writable = not archived 1493 1494 try: 1495 data = serializer.LoadJson(raw_data) 1496 job = _QueuedJob.Restore(queue, data, writable, archived) 1497 except Exception, err: # pylint: disable=W0703 1498 raise errors.JobFileCorrupted(err) 1499 1500 return job
1501 1502 @staticmethod
1503 - def SafeLoadJobFromDisk(queue, job_id, try_archived, writable=None):
1504 """Load the given job file from disk. 1505 1506 Given a job file, read, load and restore it in a _QueuedJob format. 1507 In case of error reading the job, it gets returned as None, and the 1508 exception is logged. 1509 1510 @type job_id: int 1511 @param job_id: job identifier 1512 @type try_archived: bool 1513 @param try_archived: Whether to try loading an archived job 1514 @rtype: L{_QueuedJob} or None 1515 @return: either None or the job object 1516 1517 """ 1518 try: 1519 return JobQueue._LoadJobFromDisk(queue, job_id, try_archived, 1520 writable=writable) 1521 except (errors.JobFileCorrupted, EnvironmentError): 1522 logging.exception("Can't load/parse job %s", job_id) 1523 return None
1524 1525 @classmethod
1526 - def SubmitManyJobs(cls, jobs):
1527 """Create and store multiple jobs. 1528 1529 """ 1530 return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
1531 1532 @staticmethod
1533 - def _ResolveJobDependencies(resolve_fn, deps):
1534 """Resolves relative job IDs in dependencies. 1535 1536 @type resolve_fn: callable 1537 @param resolve_fn: Function to resolve a relative job ID 1538 @type deps: list 1539 @param deps: Dependencies 1540 @rtype: tuple; (boolean, string or list) 1541 @return: If successful (first tuple item), the returned list contains 1542 resolved job IDs along with the requested status; if not successful, 1543 the second element is an error message 1544 1545 """ 1546 result = [] 1547 1548 for (dep_job_id, dep_status) in deps: 1549 if ht.TRelativeJobId(dep_job_id): 1550 assert ht.TInt(dep_job_id) and dep_job_id < 0 1551 try: 1552 job_id = resolve_fn(dep_job_id) 1553 except IndexError: 1554 # Abort 1555 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 1556 else: 1557 job_id = dep_job_id 1558 1559 result.append((job_id, dep_status)) 1560 1561 return (True, result)
1562
1563 - def _GetJobStatusForDependencies(self, job_id):
1564 """Gets the status of a job for dependencies. 1565 1566 @type job_id: int 1567 @param job_id: Job ID 1568 @raise errors.JobLost: If job can't be found 1569 1570 """ 1571 # Not using in-memory cache as doing so would require an exclusive lock 1572 1573 # Try to load from disk 1574 job = JobQueue.SafeLoadJobFromDisk(self, job_id, True, writable=False) 1575 1576 if job: 1577 assert not job.writable, "Got writable job" # pylint: disable=E1101 1578 1579 if job: 1580 return job.CalcStatus() 1581 1582 raise errors.JobLost("Job %s not found" % job_id)
1583
1584 - def UpdateJobUnlocked(self, job, replicate=True):
1585 """Update a job's on disk storage. 1586 1587 After a job has been modified, this function needs to be called in 1588 order to write the changes to disk and replicate them to the other 1589 nodes. 1590 1591 @type job: L{_QueuedJob} 1592 @param job: the changed job 1593 @type replicate: boolean 1594 @param replicate: whether to replicate the change to remote nodes 1595 1596 """ 1597 if __debug__: 1598 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 1599 assert (finalized ^ (job.end_timestamp is None)) 1600 assert job.writable, "Can't update read-only job" 1601 assert not job.archived, "Can't update archived job" 1602 1603 filename = self._GetJobPath(job.id) 1604 data = serializer.DumpJson(job.Serialize()) 1605 logging.debug("Writing job %s to %s", job.id, filename) 1606 self._UpdateJobQueueFile(filename, data, replicate)
1607
1608 - def HasJobBeenFinalized(self, job_id):
1609 """Checks if a job has been finalized. 1610 1611 @type job_id: int 1612 @param job_id: Job identifier 1613 @rtype: boolean 1614 @return: True if the job has been finalized, 1615 False if the timeout has been reached, 1616 None if the job doesn't exist 1617 1618 """ 1619 job = JobQueue.SafeLoadJobFromDisk(self, job_id, True, writable=False) 1620 if job is not None: 1621 return job.CalcStatus() in constants.JOBS_FINALIZED 1622 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed: 1623 # FIXME: The above variable is a temporary workaround until the Python job 1624 # queue is completely removed. When removing the job queue, also remove 1625 # the variable from LUClusterDestroy. 1626 return True 1627 else: 1628 return None
1629
1630 - def CancelJob(self, job_id):
1631 """Cancels a job. 1632 1633 This will only succeed if the job has not started yet. 1634 1635 @type job_id: int 1636 @param job_id: job ID of job to be cancelled. 1637 1638 """ 1639 logging.info("Cancelling job %s", job_id) 1640 1641 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1642
1643 - def ChangeJobPriority(self, job_id, priority):
1644 """Changes a job's priority. 1645 1646 @type job_id: int 1647 @param job_id: ID of the job whose priority should be changed 1648 @type priority: int 1649 @param priority: New priority 1650 1651 """ 1652 logging.info("Changing priority of job %s to %s", job_id, priority) 1653 1654 if priority not in constants.OP_PRIO_SUBMIT_VALID: 1655 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 1656 raise errors.GenericError("Invalid priority %s, allowed are %s" % 1657 (priority, allowed)) 1658 1659 def fn(job): 1660 (success, msg) = job.ChangePriority(priority) 1661 return (success, msg)
1662 1663 return self._ModifyJobUnlocked(job_id, fn)
1664
1665 - def _ModifyJobUnlocked(self, job_id, mod_fn):
1666 """Modifies a job. 1667 1668 @type job_id: int 1669 @param job_id: Job ID 1670 @type mod_fn: callable 1671 @param mod_fn: Modifying function, receiving job object as parameter, 1672 returning tuple of (status boolean, message string) 1673 1674 """ 1675 job = self._LoadJobUnlocked(job_id) 1676 if not job: 1677 logging.debug("Job %s not found", job_id) 1678 return (False, "Job %s not found" % job_id) 1679 1680 assert job.writable, "Can't modify read-only job" 1681 assert not job.archived, "Can't modify archived job" 1682 1683 (success, msg) = mod_fn(job) 1684 1685 if success: 1686 # If the job was finalized (e.g. cancelled), this is the final write 1687 # allowed. The job can be archived anytime. 1688 self.UpdateJobUnlocked(job) 1689 1690 return (success, msg)
1691