Package ganeti :: Package jqueue
[hide private]
[frames] | no frames]

Source Code for Package ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Module implementing the job queue handling. 
  32   
  33  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  34  used by all other classes in this module. 
  35   
  36  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  37      processing jobs 
  38   
  39  """ 
  40   
  41  import logging 
  42  import errno 
  43  import time 
  44  import weakref 
  45  import threading 
  46  import itertools 
  47  import operator 
  48  import os 
  49   
  50  try: 
  51    # pylint: disable=E0611 
  52    from pyinotify import pyinotify 
  53  except ImportError: 
  54    import pyinotify 
  55   
  56  from ganeti import asyncnotifier 
  57  from ganeti import constants 
  58  from ganeti import serializer 
  59  from ganeti import workerpool 
  60  from ganeti import locking 
  61  from ganeti import luxi 
  62  from ganeti import opcodes 
  63  from ganeti import opcodes_base 
  64  from ganeti import errors 
  65  from ganeti import mcpu 
  66  from ganeti import utils 
  67  from ganeti import jstore 
  68  import ganeti.rpc.node as rpc 
  69  from ganeti import runtime 
  70  from ganeti import netutils 
  71  from ganeti import compat 
  72  from ganeti import ht 
  73  from ganeti import query 
  74  from ganeti import qlang 
  75  from ganeti import pathutils 
  76  from ganeti import vcluster 
  77  from ganeti.cmdlib import cluster 
  78   
  79   
  80  JOBQUEUE_THREADS = 1 
  81   
  82  # member lock names to be passed to @ssynchronized decorator 
  83  _LOCK = "_lock" 
  84  _QUEUE = "_queue" 
  85   
  86  #: Retrieves "id" attribute 
  87  _GetIdAttr = operator.attrgetter("id") 
88 89 90 -class CancelJob(Exception):
91 """Special exception to cancel a job. 92 93 """
94
95 96 -def TimeStampNow():
97 """Returns the current timestamp. 98 99 @rtype: tuple 100 @return: the current time in the (seconds, microseconds) format 101 102 """ 103 return utils.SplitTime(time.time())
104
105 106 -def _CallJqUpdate(runner, names, file_name, content):
107 """Updates job queue file after virtualizing filename. 108 109 """ 110 virt_file_name = vcluster.MakeVirtualPath(file_name) 111 return runner.call_jobqueue_update(names, virt_file_name, content)
112
113 114 -class _QueuedOpCode(object):
115 """Encapsulates an opcode object. 116 117 @ivar log: holds the execution log and consists of tuples 118 of the form C{(log_serial, timestamp, level, message)} 119 @ivar input: the OpCode we encapsulate 120 @ivar status: the current status 121 @ivar result: the result of the LU execution 122 @ivar start_timestamp: timestamp for the start of the execution 123 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 124 @ivar stop_timestamp: timestamp for the end of the execution 125 126 """ 127 __slots__ = ["input", "status", "result", "log", "priority", 128 "start_timestamp", "exec_timestamp", "end_timestamp", 129 "__weakref__"] 130
131 - def __init__(self, op):
132 """Initializes instances of this class. 133 134 @type op: L{opcodes.OpCode} 135 @param op: the opcode we encapsulate 136 137 """ 138 self.input = op 139 self.status = constants.OP_STATUS_QUEUED 140 self.result = None 141 self.log = [] 142 self.start_timestamp = None 143 self.exec_timestamp = None 144 self.end_timestamp = None 145 146 # Get initial priority (it might change during the lifetime of this opcode) 147 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
148 149 @classmethod
150 - def Restore(cls, state):
151 """Restore the _QueuedOpCode from the serialized form. 152 153 @type state: dict 154 @param state: the serialized state 155 @rtype: _QueuedOpCode 156 @return: a new _QueuedOpCode instance 157 158 """ 159 obj = _QueuedOpCode.__new__(cls) 160 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 161 obj.status = state["status"] 162 obj.result = state["result"] 163 obj.log = state["log"] 164 obj.start_timestamp = state.get("start_timestamp", None) 165 obj.exec_timestamp = state.get("exec_timestamp", None) 166 obj.end_timestamp = state.get("end_timestamp", None) 167 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 168 return obj
169
170 - def Serialize(self):
171 """Serializes this _QueuedOpCode. 172 173 @rtype: dict 174 @return: the dictionary holding the serialized state 175 176 """ 177 return { 178 "input": self.input.__getstate__(), 179 "status": self.status, 180 "result": self.result, 181 "log": self.log, 182 "start_timestamp": self.start_timestamp, 183 "exec_timestamp": self.exec_timestamp, 184 "end_timestamp": self.end_timestamp, 185 "priority": self.priority, 186 }
187
188 189 -class _QueuedJob(object):
190 """In-memory job representation. 191 192 This is what we use to track the user-submitted jobs. Locking must 193 be taken care of by users of this class. 194 195 @type queue: L{JobQueue} 196 @ivar queue: the parent queue 197 @ivar id: the job ID 198 @type ops: list 199 @ivar ops: the list of _QueuedOpCode that constitute the job 200 @type log_serial: int 201 @ivar log_serial: holds the index for the next log entry 202 @ivar received_timestamp: the timestamp for when the job was received 203 @ivar start_timestmap: the timestamp for start of execution 204 @ivar end_timestamp: the timestamp for end of execution 205 @ivar writable: Whether the job is allowed to be modified 206 207 """ 208 # pylint: disable=W0212 209 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 210 "received_timestamp", "start_timestamp", "end_timestamp", 211 "processor_lock", "writable", "archived", 212 "livelock", "process_id", 213 "__weakref__"] 214
215 - def AddReasons(self, pickup=False):
216 """Extend the reason trail 217 218 Add the reason for all the opcodes of this job to be executed. 219 220 """ 221 count = 0 222 for queued_op in self.ops: 223 op = queued_op.input 224 if pickup: 225 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP 226 else: 227 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE 228 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__, 229 reason_src_prefix) 230 reason_text = "job=%d;index=%d" % (self.id, count) 231 reason = getattr(op, "reason", []) 232 reason.append((reason_src, reason_text, utils.EpochNano())) 233 op.reason = reason 234 count = count + 1
235
236 - def __init__(self, queue, job_id, ops, writable):
237 """Constructor for the _QueuedJob. 238 239 @type queue: L{JobQueue} 240 @param queue: our parent queue 241 @type job_id: job_id 242 @param job_id: our job id 243 @type ops: list 244 @param ops: the list of opcodes we hold, which will be encapsulated 245 in _QueuedOpCodes 246 @type writable: bool 247 @param writable: Whether job can be modified 248 249 """ 250 if not ops: 251 raise errors.GenericError("A job needs at least one opcode") 252 253 self.queue = queue 254 self.id = int(job_id) 255 self.ops = [_QueuedOpCode(op) for op in ops] 256 self.AddReasons() 257 self.log_serial = 0 258 self.received_timestamp = TimeStampNow() 259 self.start_timestamp = None 260 self.end_timestamp = None 261 self.archived = False 262 self.livelock = None 263 self.process_id = None 264 265 self._InitInMemory(self, writable) 266 267 assert not self.archived, "New jobs can not be marked as archived"
268 269 @staticmethod
270 - def _InitInMemory(obj, writable):
271 """Initializes in-memory variables. 272 273 """ 274 obj.writable = writable 275 obj.ops_iter = None 276 obj.cur_opctx = None 277 278 # Read-only jobs are not processed and therefore don't need a lock 279 if writable: 280 obj.processor_lock = threading.Lock() 281 else: 282 obj.processor_lock = None
283
284 - def __repr__(self):
285 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 286 "id=%s" % self.id, 287 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 288 289 return "<%s at %#x>" % (" ".join(status), id(self))
290 291 @classmethod
292 - def Restore(cls, queue, state, writable, archived):
293 """Restore a _QueuedJob from serialized state: 294 295 @type queue: L{JobQueue} 296 @param queue: to which queue the restored job belongs 297 @type state: dict 298 @param state: the serialized state 299 @type writable: bool 300 @param writable: Whether job can be modified 301 @type archived: bool 302 @param archived: Whether job was already archived 303 @rtype: _JobQueue 304 @return: the restored _JobQueue instance 305 306 """ 307 obj = _QueuedJob.__new__(cls) 308 obj.queue = queue 309 obj.id = int(state["id"]) 310 obj.received_timestamp = state.get("received_timestamp", None) 311 obj.start_timestamp = state.get("start_timestamp", None) 312 obj.end_timestamp = state.get("end_timestamp", None) 313 obj.archived = archived 314 obj.livelock = state.get("livelock", None) 315 obj.process_id = state.get("process_id", None) 316 if obj.process_id is not None: 317 obj.process_id = int(obj.process_id) 318 319 obj.ops = [] 320 obj.log_serial = 0 321 for op_state in state["ops"]: 322 op = _QueuedOpCode.Restore(op_state) 323 for log_entry in op.log: 324 obj.log_serial = max(obj.log_serial, log_entry[0]) 325 obj.ops.append(op) 326 327 cls._InitInMemory(obj, writable) 328 329 return obj
330
331 - def Serialize(self):
332 """Serialize the _JobQueue instance. 333 334 @rtype: dict 335 @return: the serialized state 336 337 """ 338 return { 339 "id": self.id, 340 "ops": [op.Serialize() for op in self.ops], 341 "start_timestamp": self.start_timestamp, 342 "end_timestamp": self.end_timestamp, 343 "received_timestamp": self.received_timestamp, 344 "livelock": self.livelock, 345 "process_id": self.process_id, 346 }
347
348 - def CalcStatus(self):
349 """Compute the status of this job. 350 351 This function iterates over all the _QueuedOpCodes in the job and 352 based on their status, computes the job status. 353 354 The algorithm is: 355 - if we find a cancelled, or finished with error, the job 356 status will be the same 357 - otherwise, the last opcode with the status one of: 358 - waitlock 359 - canceling 360 - running 361 362 will determine the job status 363 364 - otherwise, it means either all opcodes are queued, or success, 365 and the job status will be the same 366 367 @return: the job status 368 369 """ 370 status = constants.JOB_STATUS_QUEUED 371 372 all_success = True 373 for op in self.ops: 374 if op.status == constants.OP_STATUS_SUCCESS: 375 continue 376 377 all_success = False 378 379 if op.status == constants.OP_STATUS_QUEUED: 380 pass 381 elif op.status == constants.OP_STATUS_WAITING: 382 status = constants.JOB_STATUS_WAITING 383 elif op.status == constants.OP_STATUS_RUNNING: 384 status = constants.JOB_STATUS_RUNNING 385 elif op.status == constants.OP_STATUS_CANCELING: 386 status = constants.JOB_STATUS_CANCELING 387 break 388 elif op.status == constants.OP_STATUS_ERROR: 389 status = constants.JOB_STATUS_ERROR 390 # The whole job fails if one opcode failed 391 break 392 elif op.status == constants.OP_STATUS_CANCELED: 393 status = constants.OP_STATUS_CANCELED 394 break 395 396 if all_success: 397 status = constants.JOB_STATUS_SUCCESS 398 399 return status
400
401 - def CalcPriority(self):
402 """Gets the current priority for this job. 403 404 Only unfinished opcodes are considered. When all are done, the default 405 priority is used. 406 407 @rtype: int 408 409 """ 410 priorities = [op.priority for op in self.ops 411 if op.status not in constants.OPS_FINALIZED] 412 413 if not priorities: 414 # All opcodes are done, assume default priority 415 return constants.OP_PRIO_DEFAULT 416 417 return min(priorities)
418
419 - def GetLogEntries(self, newer_than):
420 """Selectively returns the log entries. 421 422 @type newer_than: None or int 423 @param newer_than: if this is None, return all log entries, 424 otherwise return only the log entries with serial higher 425 than this value 426 @rtype: list 427 @return: the list of the log entries selected 428 429 """ 430 if newer_than is None: 431 serial = -1 432 else: 433 serial = newer_than 434 435 entries = [] 436 for op in self.ops: 437 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 438 439 return entries
440
441 - def MarkUnfinishedOps(self, status, result):
442 """Mark unfinished opcodes with a given status and result. 443 444 This is an utility function for marking all running or waiting to 445 be run opcodes with a given status. Opcodes which are already 446 finalised are not changed. 447 448 @param status: a given opcode status 449 @param result: the opcode result 450 451 """ 452 not_marked = True 453 for op in self.ops: 454 if op.status in constants.OPS_FINALIZED: 455 assert not_marked, "Finalized opcodes found after non-finalized ones" 456 continue 457 op.status = status 458 op.result = result 459 not_marked = False
460
461 - def Finalize(self):
462 """Marks the job as finalized. 463 464 """ 465 self.end_timestamp = TimeStampNow()
466
467 - def Cancel(self):
468 """Marks job as canceled/-ing if possible. 469 470 @rtype: tuple; (bool, string) 471 @return: Boolean describing whether job was successfully canceled or marked 472 as canceling and a text message 473 474 """ 475 status = self.CalcStatus() 476 477 if status == constants.JOB_STATUS_QUEUED: 478 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 479 "Job canceled by request") 480 self.Finalize() 481 return (True, "Job %s canceled" % self.id) 482 483 elif status == constants.JOB_STATUS_WAITING: 484 # The worker will notice the new status and cancel the job 485 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 486 return (True, "Job %s will be canceled" % self.id) 487 488 else: 489 logging.debug("Job %s is no longer waiting in the queue", self.id) 490 return (False, "Job %s is no longer waiting in the queue" % self.id)
491
492 - def ChangePriority(self, priority):
493 """Changes the job priority. 494 495 @type priority: int 496 @param priority: New priority 497 @rtype: tuple; (bool, string) 498 @return: Boolean describing whether job's priority was successfully changed 499 and a text message 500 501 """ 502 status = self.CalcStatus() 503 504 if status in constants.JOBS_FINALIZED: 505 return (False, "Job %s is finished" % self.id) 506 elif status == constants.JOB_STATUS_CANCELING: 507 return (False, "Job %s is cancelling" % self.id) 508 else: 509 assert status in (constants.JOB_STATUS_QUEUED, 510 constants.JOB_STATUS_WAITING, 511 constants.JOB_STATUS_RUNNING) 512 513 changed = False 514 for op in self.ops: 515 if (op.status == constants.OP_STATUS_RUNNING or 516 op.status in constants.OPS_FINALIZED): 517 assert not changed, \ 518 ("Found opcode for which priority should not be changed after" 519 " priority has been changed for previous opcodes") 520 continue 521 522 assert op.status in (constants.OP_STATUS_QUEUED, 523 constants.OP_STATUS_WAITING) 524 525 changed = True 526 527 # Set new priority (doesn't modify opcode input) 528 op.priority = priority 529 530 if changed: 531 return (True, ("Priorities of pending opcodes for job %s have been" 532 " changed to %s" % (self.id, priority))) 533 else: 534 return (False, "Job %s had no pending opcodes" % self.id)
535
536 - def SetPid(self, pid):
537 """Sets the job's process ID 538 539 @type pid: int 540 @param pid: the process ID 541 542 """ 543 status = self.CalcStatus() 544 545 if status in (constants.JOB_STATUS_QUEUED, 546 constants.JOB_STATUS_WAITING): 547 if self.process_id is not None: 548 logging.warning("Replacing the process id %s of job %s with %s", 549 self.process_id, self.id, pid) 550 self.process_id = pid 551 else: 552 logging.warning("Can set pid only for queued/waiting jobs")
553
554 555 -class _OpExecCallbacks(mcpu.OpExecCbBase):
556
557 - def __init__(self, queue, job, op):
558 """Initializes this class. 559 560 @type queue: L{JobQueue} 561 @param queue: Job queue 562 @type job: L{_QueuedJob} 563 @param job: Job object 564 @type op: L{_QueuedOpCode} 565 @param op: OpCode 566 567 """ 568 super(_OpExecCallbacks, self).__init__() 569 570 assert queue, "Queue is missing" 571 assert job, "Job is missing" 572 assert op, "Opcode is missing" 573 574 self._queue = queue 575 self._job = job 576 self._op = op
577
578 - def _CheckCancel(self):
579 """Raises an exception to cancel the job if asked to. 580 581 """ 582 # Cancel here if we were asked to 583 if self._op.status == constants.OP_STATUS_CANCELING: 584 logging.debug("Canceling opcode") 585 raise CancelJob()
586 587 @locking.ssynchronized(_QUEUE, shared=1)
588 - def NotifyStart(self):
589 """Mark the opcode as running, not lock-waiting. 590 591 This is called from the mcpu code as a notifier function, when the LU is 592 finally about to start the Exec() method. Of course, to have end-user 593 visible results, the opcode must be initially (before calling into 594 Processor.ExecOpCode) set to OP_STATUS_WAITING. 595 596 """ 597 assert self._op in self._job.ops 598 assert self._op.status in (constants.OP_STATUS_WAITING, 599 constants.OP_STATUS_CANCELING) 600 601 # Cancel here if we were asked to 602 self._CheckCancel() 603 604 logging.debug("Opcode is now running") 605 606 self._op.status = constants.OP_STATUS_RUNNING 607 self._op.exec_timestamp = TimeStampNow() 608 609 # And finally replicate the job status 610 self._queue.UpdateJobUnlocked(self._job)
611 612 @locking.ssynchronized(_QUEUE, shared=1)
613 - def _AppendFeedback(self, timestamp, log_type, log_msg):
614 """Internal feedback append function, with locks 615 616 """ 617 self._job.log_serial += 1 618 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 619 self._queue.UpdateJobUnlocked(self._job, replicate=False)
620
621 - def Feedback(self, *args):
622 """Append a log entry. 623 624 """ 625 assert len(args) < 3 626 627 if len(args) == 1: 628 log_type = constants.ELOG_MESSAGE 629 log_msg = args[0] 630 else: 631 (log_type, log_msg) = args 632 633 # The time is split to make serialization easier and not lose 634 # precision. 635 timestamp = utils.SplitTime(time.time()) 636 self._AppendFeedback(timestamp, log_type, log_msg)
637
638 - def CurrentPriority(self):
639 """Returns current priority for opcode. 640 641 """ 642 assert self._op.status in (constants.OP_STATUS_WAITING, 643 constants.OP_STATUS_CANCELING) 644 645 # Cancel here if we were asked to 646 self._CheckCancel() 647 648 return self._op.priority
649
650 - def SubmitManyJobs(self, jobs):
651 """Submits jobs for processing. 652 653 See L{JobQueue.SubmitManyJobs}. 654 655 """ 656 # Locking is done in job queue 657 return self._queue.SubmitManyJobs(jobs)
658
659 660 -def _EncodeOpError(err):
661 """Encodes an error which occurred while processing an opcode. 662 663 """ 664 if isinstance(err, errors.GenericError): 665 to_encode = err 666 else: 667 to_encode = errors.OpExecError(str(err)) 668 669 return errors.EncodeException(to_encode)
670
671 672 -class _TimeoutStrategyWrapper:
673 - def __init__(self, fn):
674 """Initializes this class. 675 676 """ 677 self._fn = fn 678 self._next = None
679
680 - def _Advance(self):
681 """Gets the next timeout if necessary. 682 683 """ 684 if self._next is None: 685 self._next = self._fn()
686
687 - def Peek(self):
688 """Returns the next timeout. 689 690 """ 691 self._Advance() 692 return self._next
693
694 - def Next(self):
695 """Returns the current timeout and advances the internal state. 696 697 """ 698 self._Advance() 699 result = self._next 700 self._next = None 701 return result
702
703 704 -class _OpExecContext:
705 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
706 """Initializes this class. 707 708 """ 709 self.op = op 710 self.index = index 711 self.log_prefix = log_prefix 712 self.summary = op.input.Summary() 713 714 # Create local copy to modify 715 if getattr(op.input, opcodes_base.DEPEND_ATTR, None): 716 self.jobdeps = op.input.depends[:] 717 else: 718 self.jobdeps = None 719 720 self._timeout_strategy_factory = timeout_strategy_factory 721 self._ResetTimeoutStrategy()
722
723 - def _ResetTimeoutStrategy(self):
724 """Creates a new timeout strategy. 725 726 """ 727 self._timeout_strategy = \ 728 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
729
730 - def CheckPriorityIncrease(self):
731 """Checks whether priority can and should be increased. 732 733 Called when locks couldn't be acquired. 734 735 """ 736 op = self.op 737 738 # Exhausted all retries and next round should not use blocking acquire 739 # for locks? 740 if (self._timeout_strategy.Peek() is None and 741 op.priority > constants.OP_PRIO_HIGHEST): 742 logging.debug("Increasing priority") 743 op.priority -= 1 744 self._ResetTimeoutStrategy() 745 return True 746 747 return False
748
749 - def GetNextLockTimeout(self):
750 """Returns the next lock acquire timeout. 751 752 """ 753 return self._timeout_strategy.Next()
754
755 756 -class _JobProcessor(object):
757 (DEFER, 758 WAITDEP, 759 FINISHED) = range(1, 4) 760
761 - def __init__(self, queue, opexec_fn, job, 762 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
763 """Initializes this class. 764 765 """ 766 self.queue = queue 767 self.opexec_fn = opexec_fn 768 self.job = job 769 self._timeout_strategy_factory = _timeout_strategy_factory
770 771 @staticmethod
772 - def _FindNextOpcode(job, timeout_strategy_factory):
773 """Locates the next opcode to run. 774 775 @type job: L{_QueuedJob} 776 @param job: Job object 777 @param timeout_strategy_factory: Callable to create new timeout strategy 778 779 """ 780 # Create some sort of a cache to speed up locating next opcode for future 781 # lookups 782 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 783 # pending and one for processed ops. 784 if job.ops_iter is None: 785 job.ops_iter = enumerate(job.ops) 786 787 # Find next opcode to run 788 while True: 789 try: 790 (idx, op) = job.ops_iter.next() 791 except StopIteration: 792 raise errors.ProgrammerError("Called for a finished job") 793 794 if op.status == constants.OP_STATUS_RUNNING: 795 # Found an opcode already marked as running 796 raise errors.ProgrammerError("Called for job marked as running") 797 798 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 799 timeout_strategy_factory) 800 801 if op.status not in constants.OPS_FINALIZED: 802 return opctx 803 804 # This is a job that was partially completed before master daemon 805 # shutdown, so it can be expected that some opcodes are already 806 # completed successfully (if any did error out, then the whole job 807 # should have been aborted and not resubmitted for processing). 808 logging.info("%s: opcode %s already processed, skipping", 809 opctx.log_prefix, opctx.summary)
810 811 @staticmethod
812 - def _MarkWaitlock(job, op):
813 """Marks an opcode as waiting for locks. 814 815 The job's start timestamp is also set if necessary. 816 817 @type job: L{_QueuedJob} 818 @param job: Job object 819 @type op: L{_QueuedOpCode} 820 @param op: Opcode object 821 822 """ 823 assert op in job.ops 824 assert op.status in (constants.OP_STATUS_QUEUED, 825 constants.OP_STATUS_WAITING) 826 827 update = False 828 829 op.result = None 830 831 if op.status == constants.OP_STATUS_QUEUED: 832 op.status = constants.OP_STATUS_WAITING 833 update = True 834 835 if op.start_timestamp is None: 836 op.start_timestamp = TimeStampNow() 837 update = True 838 839 if job.start_timestamp is None: 840 job.start_timestamp = op.start_timestamp 841 update = True 842 843 assert op.status == constants.OP_STATUS_WAITING 844 845 return update
846 847 @staticmethod
848 - def _CheckDependencies(queue, job, opctx):
849 """Checks if an opcode has dependencies and if so, processes them. 850 851 @type queue: L{JobQueue} 852 @param queue: Queue object 853 @type job: L{_QueuedJob} 854 @param job: Job object 855 @type opctx: L{_OpExecContext} 856 @param opctx: Opcode execution context 857 @rtype: bool 858 @return: Whether opcode will be re-scheduled by dependency tracker 859 860 """ 861 op = opctx.op 862 863 result = False 864 865 while opctx.jobdeps: 866 (dep_job_id, dep_status) = opctx.jobdeps[0] 867 868 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 869 dep_status) 870 assert ht.TNonEmptyString(depmsg), "No dependency message" 871 872 logging.info("%s: %s", opctx.log_prefix, depmsg) 873 874 if depresult == _JobDependencyManager.CONTINUE: 875 # Remove dependency and continue 876 opctx.jobdeps.pop(0) 877 878 elif depresult == _JobDependencyManager.WAIT: 879 # Need to wait for notification, dependency tracker will re-add job 880 # to workerpool 881 result = True 882 break 883 884 elif depresult == _JobDependencyManager.CANCEL: 885 # Job was cancelled, cancel this job as well 886 job.Cancel() 887 assert op.status == constants.OP_STATUS_CANCELING 888 break 889 890 elif depresult in (_JobDependencyManager.WRONGSTATUS, 891 _JobDependencyManager.ERROR): 892 # Job failed or there was an error, this job must fail 893 op.status = constants.OP_STATUS_ERROR 894 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 895 break 896 897 else: 898 raise errors.ProgrammerError("Unknown dependency result '%s'" % 899 depresult) 900 901 return result
902
903 - def _ExecOpCodeUnlocked(self, opctx):
904 """Processes one opcode and returns the result. 905 906 """ 907 op = opctx.op 908 909 assert op.status in (constants.OP_STATUS_WAITING, 910 constants.OP_STATUS_CANCELING) 911 912 # The very last check if the job was cancelled before trying to execute 913 if op.status == constants.OP_STATUS_CANCELING: 914 return (constants.OP_STATUS_CANCELING, None) 915 916 timeout = opctx.GetNextLockTimeout() 917 918 try: 919 # Make sure not to hold queue lock while calling ExecOpCode 920 result = self.opexec_fn(op.input, 921 _OpExecCallbacks(self.queue, self.job, op), 922 timeout=timeout) 923 except mcpu.LockAcquireTimeout: 924 assert timeout is not None, "Received timeout for blocking acquire" 925 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 926 927 assert op.status in (constants.OP_STATUS_WAITING, 928 constants.OP_STATUS_CANCELING) 929 930 # Was job cancelled while we were waiting for the lock? 931 if op.status == constants.OP_STATUS_CANCELING: 932 return (constants.OP_STATUS_CANCELING, None) 933 934 # Stay in waitlock while trying to re-acquire lock 935 return (constants.OP_STATUS_WAITING, None) 936 except CancelJob: 937 logging.exception("%s: Canceling job", opctx.log_prefix) 938 assert op.status == constants.OP_STATUS_CANCELING 939 return (constants.OP_STATUS_CANCELING, None) 940 941 except Exception, err: # pylint: disable=W0703 942 logging.exception("%s: Caught exception in %s", 943 opctx.log_prefix, opctx.summary) 944 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 945 else: 946 logging.debug("%s: %s successful", 947 opctx.log_prefix, opctx.summary) 948 return (constants.OP_STATUS_SUCCESS, result)
949
950 - def __call__(self, _nextop_fn=None):
951 """Continues execution of a job. 952 953 @param _nextop_fn: Callback function for tests 954 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 955 be deferred and C{WAITDEP} if the dependency manager 956 (L{_JobDependencyManager}) will re-schedule the job when appropriate 957 958 """ 959 queue = self.queue 960 job = self.job 961 962 logging.debug("Processing job %s", job.id) 963 964 queue.acquire(shared=1) 965 try: 966 opcount = len(job.ops) 967 968 assert job.writable, "Expected writable job" 969 970 # Don't do anything for finalized jobs 971 if job.CalcStatus() in constants.JOBS_FINALIZED: 972 return self.FINISHED 973 974 # Is a previous opcode still pending? 975 if job.cur_opctx: 976 opctx = job.cur_opctx 977 job.cur_opctx = None 978 else: 979 if __debug__ and _nextop_fn: 980 _nextop_fn() 981 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 982 983 op = opctx.op 984 985 # Consistency check 986 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 987 constants.OP_STATUS_CANCELING) 988 for i in job.ops[opctx.index + 1:]) 989 990 assert op.status in (constants.OP_STATUS_QUEUED, 991 constants.OP_STATUS_WAITING, 992 constants.OP_STATUS_CANCELING) 993 994 assert (op.priority <= constants.OP_PRIO_LOWEST and 995 op.priority >= constants.OP_PRIO_HIGHEST) 996 997 waitjob = None 998 999 if op.status != constants.OP_STATUS_CANCELING: 1000 assert op.status in (constants.OP_STATUS_QUEUED, 1001 constants.OP_STATUS_WAITING) 1002 1003 # Prepare to start opcode 1004 if self._MarkWaitlock(job, op): 1005 # Write to disk 1006 queue.UpdateJobUnlocked(job) 1007 1008 assert op.status == constants.OP_STATUS_WAITING 1009 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1010 assert job.start_timestamp and op.start_timestamp 1011 assert waitjob is None 1012 1013 # Check if waiting for a job is necessary 1014 waitjob = self._CheckDependencies(queue, job, opctx) 1015 1016 assert op.status in (constants.OP_STATUS_WAITING, 1017 constants.OP_STATUS_CANCELING, 1018 constants.OP_STATUS_ERROR) 1019 1020 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1021 constants.OP_STATUS_ERROR)): 1022 logging.info("%s: opcode %s waiting for locks", 1023 opctx.log_prefix, opctx.summary) 1024 1025 assert not opctx.jobdeps, "Not all dependencies were removed" 1026 1027 queue.release() 1028 try: 1029 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1030 finally: 1031 queue.acquire(shared=1) 1032 1033 op.status = op_status 1034 op.result = op_result 1035 1036 assert not waitjob 1037 1038 if op.status in (constants.OP_STATUS_WAITING, 1039 constants.OP_STATUS_QUEUED): 1040 # waiting: Couldn't get locks in time 1041 # queued: Queue is shutting down 1042 assert not op.end_timestamp 1043 else: 1044 # Finalize opcode 1045 op.end_timestamp = TimeStampNow() 1046 1047 if op.status == constants.OP_STATUS_CANCELING: 1048 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1049 for i in job.ops[opctx.index:]) 1050 else: 1051 assert op.status in constants.OPS_FINALIZED 1052 1053 if op.status == constants.OP_STATUS_QUEUED: 1054 # Queue is shutting down 1055 assert not waitjob 1056 1057 finalize = False 1058 1059 # Reset context 1060 job.cur_opctx = None 1061 1062 # In no case must the status be finalized here 1063 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1064 1065 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1066 finalize = False 1067 1068 if not waitjob and opctx.CheckPriorityIncrease(): 1069 # Priority was changed, need to update on-disk file 1070 queue.UpdateJobUnlocked(job) 1071 1072 # Keep around for another round 1073 job.cur_opctx = opctx 1074 1075 assert (op.priority <= constants.OP_PRIO_LOWEST and 1076 op.priority >= constants.OP_PRIO_HIGHEST) 1077 1078 # In no case must the status be finalized here 1079 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1080 1081 else: 1082 # Ensure all opcodes so far have been successful 1083 assert (opctx.index == 0 or 1084 compat.all(i.status == constants.OP_STATUS_SUCCESS 1085 for i in job.ops[:opctx.index])) 1086 1087 # Reset context 1088 job.cur_opctx = None 1089 1090 if op.status == constants.OP_STATUS_SUCCESS: 1091 finalize = False 1092 1093 elif op.status == constants.OP_STATUS_ERROR: 1094 # If we get here, we cannot afford to check for any consistency 1095 # any more, we just want to clean up. 1096 # TODO: Actually, it wouldn't be a bad idea to start a timer 1097 # here to kill the whole process. 1098 to_encode = errors.OpExecError("Preceding opcode failed") 1099 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1100 _EncodeOpError(to_encode)) 1101 finalize = True 1102 elif op.status == constants.OP_STATUS_CANCELING: 1103 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1104 "Job canceled by request") 1105 finalize = True 1106 1107 else: 1108 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1109 1110 if opctx.index == (opcount - 1): 1111 # Finalize on last opcode 1112 finalize = True 1113 1114 if finalize: 1115 # All opcodes have been run, finalize job 1116 job.Finalize() 1117 1118 # Write to disk. If the job status is final, this is the final write 1119 # allowed. Once the file has been written, it can be archived anytime. 1120 queue.UpdateJobUnlocked(job) 1121 1122 assert not waitjob 1123 1124 if finalize: 1125 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1126 return self.FINISHED 1127 1128 assert not waitjob or queue.depmgr.JobWaiting(job) 1129 1130 if waitjob: 1131 return self.WAITDEP 1132 else: 1133 return self.DEFER 1134 finally: 1135 assert job.writable, "Job became read-only while being processed" 1136 queue.release()
1137
1138 1139 -def _EvaluateJobProcessorResult(depmgr, job, result):
1140 """Looks at a result from L{_JobProcessor} for a job. 1141 1142 To be used in a L{_JobQueueWorker}. 1143 1144 """ 1145 if result == _JobProcessor.FINISHED: 1146 # Notify waiting jobs 1147 depmgr.NotifyWaiters(job.id) 1148 1149 elif result == _JobProcessor.DEFER: 1150 # Schedule again 1151 raise workerpool.DeferTask(priority=job.CalcPriority()) 1152 1153 elif result == _JobProcessor.WAITDEP: 1154 # No-op, dependency manager will re-schedule 1155 pass 1156 1157 else: 1158 raise errors.ProgrammerError("Job processor returned unknown status %s" % 1159 (result, ))
1160
1161 1162 -class _JobQueueWorker(workerpool.BaseWorker):
1163 """The actual job workers. 1164 1165 """
1166 - def RunTask(self, job): # pylint: disable=W0221
1167 """Job executor. 1168 1169 @type job: L{_QueuedJob} 1170 @param job: the job to be processed 1171 1172 """ 1173 assert job.writable, "Expected writable job" 1174 1175 # Ensure only one worker is active on a single job. If a job registers for 1176 # a dependency job, and the other job notifies before the first worker is 1177 # done, the job can end up in the tasklist more than once. 1178 job.processor_lock.acquire() 1179 try: 1180 return self._RunTaskInner(job) 1181 finally: 1182 job.processor_lock.release()
1183
1184 - def _RunTaskInner(self, job):
1185 """Executes a job. 1186 1187 Must be called with per-job lock acquired. 1188 1189 """ 1190 queue = job.queue 1191 assert queue == self.pool.queue 1192 1193 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) 1194 setname_fn(None) 1195 1196 proc = mcpu.Processor(queue.context, job.id) 1197 1198 # Create wrapper for setting thread name 1199 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, 1200 proc.ExecOpCode) 1201 1202 _EvaluateJobProcessorResult(queue.depmgr, job, 1203 _JobProcessor(queue, wrap_execop_fn, job)())
1204 1205 @staticmethod
1206 - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1207 """Updates the worker thread name to include a short summary of the opcode. 1208 1209 @param setname_fn: Callable setting worker thread name 1210 @param execop_fn: Callable for executing opcode (usually 1211 L{mcpu.Processor.ExecOpCode}) 1212 1213 """ 1214 setname_fn(op) 1215 try: 1216 return execop_fn(op, *args, **kwargs) 1217 finally: 1218 setname_fn(None)
1219 1220 @staticmethod
1221 - def _GetWorkerName(job, op):
1222 """Sets the worker thread name. 1223 1224 @type job: L{_QueuedJob} 1225 @type op: L{opcodes.OpCode} 1226 1227 """ 1228 parts = ["Job%s" % job.id] 1229 1230 if op: 1231 parts.append(op.TinySummary()) 1232 1233 return "/".join(parts)
1234
1235 1236 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1237 """Simple class implementing a job-processing workerpool. 1238 1239 """
1240 - def __init__(self, queue):
1241 super(_JobQueueWorkerPool, self).__init__("Jq", 1242 JOBQUEUE_THREADS, 1243 _JobQueueWorker) 1244 self.queue = queue
1245
1246 1247 -class _JobDependencyManager:
1248 """Keeps track of job dependencies. 1249 1250 """ 1251 (WAIT, 1252 ERROR, 1253 CANCEL, 1254 CONTINUE, 1255 WRONGSTATUS) = range(1, 6) 1256
1257 - def __init__(self, getstatus_fn, enqueue_fn):
1258 """Initializes this class. 1259 1260 """ 1261 self._getstatus_fn = getstatus_fn 1262 self._enqueue_fn = enqueue_fn 1263 1264 self._waiters = {} 1265 self._lock = locking.SharedLock("JobDepMgr")
1266 1267 @locking.ssynchronized(_LOCK, shared=1)
1268 - def GetLockInfo(self, requested): # pylint: disable=W0613
1269 """Retrieves information about waiting jobs. 1270 1271 @type requested: set 1272 @param requested: Requested information, see C{query.LQ_*} 1273 1274 """ 1275 # No need to sort here, that's being done by the lock manager and query 1276 # library. There are no priorities for notifying jobs, hence all show up as 1277 # one item under "pending". 1278 return [("job/%s" % job_id, None, None, 1279 [("job", [job.id for job in waiters])]) 1280 for job_id, waiters in self._waiters.items() 1281 if waiters]
1282 1283 @locking.ssynchronized(_LOCK, shared=1)
1284 - def JobWaiting(self, job):
1285 """Checks if a job is waiting. 1286 1287 """ 1288 return compat.any(job in jobs 1289 for jobs in self._waiters.values())
1290 1291 @locking.ssynchronized(_LOCK)
1292 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1293 """Checks if a dependency job has the requested status. 1294 1295 If the other job is not yet in a finalized status, the calling job will be 1296 notified (re-added to the workerpool) at a later point. 1297 1298 @type job: L{_QueuedJob} 1299 @param job: Job object 1300 @type dep_job_id: int 1301 @param dep_job_id: ID of dependency job 1302 @type dep_status: list 1303 @param dep_status: Required status 1304 1305 """ 1306 assert ht.TJobId(job.id) 1307 assert ht.TJobId(dep_job_id) 1308 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1309 1310 if job.id == dep_job_id: 1311 return (self.ERROR, "Job can't depend on itself") 1312 1313 # Get status of dependency job 1314 try: 1315 status = self._getstatus_fn(dep_job_id) 1316 except errors.JobLost, err: 1317 return (self.ERROR, "Dependency error: %s" % err) 1318 1319 assert status in constants.JOB_STATUS_ALL 1320 1321 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1322 1323 if status not in constants.JOBS_FINALIZED: 1324 # Register for notification and wait for job to finish 1325 job_id_waiters.add(job) 1326 return (self.WAIT, 1327 "Need to wait for job %s, wanted status '%s'" % 1328 (dep_job_id, dep_status)) 1329 1330 # Remove from waiters list 1331 if job in job_id_waiters: 1332 job_id_waiters.remove(job) 1333 1334 if (status == constants.JOB_STATUS_CANCELED and 1335 constants.JOB_STATUS_CANCELED not in dep_status): 1336 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1337 1338 elif not dep_status or status in dep_status: 1339 return (self.CONTINUE, 1340 "Dependency job %s finished with status '%s'" % 1341 (dep_job_id, status)) 1342 1343 else: 1344 return (self.WRONGSTATUS, 1345 "Dependency job %s finished with status '%s'," 1346 " not one of '%s' as required" % 1347 (dep_job_id, status, utils.CommaJoin(dep_status)))
1348
1349 - def _RemoveEmptyWaitersUnlocked(self):
1350 """Remove all jobs without actual waiters. 1351 1352 """ 1353 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1354 if not waiters]: 1355 del self._waiters[job_id]
1356
1357 - def NotifyWaiters(self, job_id):
1358 """Notifies all jobs waiting for a certain job ID. 1359 1360 @attention: Do not call until L{CheckAndRegister} returned a status other 1361 than C{WAITDEP} for C{job_id}, or behaviour is undefined 1362 @type job_id: int 1363 @param job_id: Job ID 1364 1365 """ 1366 assert ht.TJobId(job_id) 1367 1368 self._lock.acquire() 1369 try: 1370 self._RemoveEmptyWaitersUnlocked() 1371 1372 jobs = self._waiters.pop(job_id, None) 1373 finally: 1374 self._lock.release() 1375 1376 if jobs: 1377 # Re-add jobs to workerpool 1378 logging.debug("Re-adding %s jobs which were waiting for job %s", 1379 len(jobs), job_id) 1380 self._enqueue_fn(jobs)
1381
1382 1383 -class JobQueue(object):
1384 """Queue used to manage the jobs. 1385 1386 """
1387 - def __init__(self, context, cfg):
1388 """Constructor for JobQueue. 1389 1390 The constructor will initialize the job queue object and then 1391 start loading the current jobs from disk, either for starting them 1392 (if they were queue) or for aborting them (if they were already 1393 running). 1394 1395 @type context: GanetiContext 1396 @param context: the context object for access to the configuration 1397 data and other ganeti objects 1398 1399 """ 1400 self.primary_jid = None 1401 self.context = context 1402 self._memcache = weakref.WeakValueDictionary() 1403 self._my_hostname = netutils.Hostname.GetSysName() 1404 1405 # The Big JobQueue lock. If a code block or method acquires it in shared 1406 # mode safe it must guarantee concurrency with all the code acquiring it in 1407 # shared mode, including itself. In order not to acquire it at all 1408 # concurrency must be guaranteed with all code acquiring it in shared mode 1409 # and all code acquiring it exclusively. 1410 self._lock = locking.SharedLock("JobQueue") 1411 1412 self.acquire = self._lock.acquire 1413 self.release = self._lock.release 1414 1415 # Read serial file 1416 self._last_serial = jstore.ReadSerial() 1417 assert self._last_serial is not None, ("Serial file was modified between" 1418 " check in jstore and here") 1419 1420 # Get initial list of nodes 1421 self._nodes = dict((n.name, n.primary_ip) 1422 for n in cfg.GetAllNodesInfo().values() 1423 if n.master_candidate) 1424 1425 # Remove master node 1426 self._nodes.pop(self._my_hostname, None) 1427 1428 # TODO: Check consistency across nodes 1429 1430 self._queue_size = None 1431 self._UpdateQueueSizeUnlocked() 1432 assert ht.TInt(self._queue_size) 1433 1434 # Job dependencies 1435 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, 1436 self._EnqueueJobs) 1437 1438 # Setup worker pool 1439 self._wpool = _JobQueueWorkerPool(self)
1440
1441 - def _PickupJobUnlocked(self, job_id):
1442 """Load a job from the job queue 1443 1444 Pick up a job that already is in the job queue and start/resume it. 1445 1446 """ 1447 if self.primary_jid: 1448 logging.warning("Job process asked to pick up %s, but already has %s", 1449 job_id, self.primary_jid) 1450 1451 self.primary_jid = int(job_id) 1452 1453 job = self._LoadJobUnlocked(job_id) 1454 1455 if job is None: 1456 logging.warning("Job %s could not be read", job_id) 1457 return 1458 1459 job.AddReasons(pickup=True) 1460 1461 status = job.CalcStatus() 1462 if status == constants.JOB_STATUS_QUEUED: 1463 job.SetPid(os.getpid()) 1464 self._EnqueueJobsUnlocked([job]) 1465 logging.info("Restarting job %s", job.id) 1466 1467 elif status in (constants.JOB_STATUS_RUNNING, 1468 constants.JOB_STATUS_WAITING, 1469 constants.JOB_STATUS_CANCELING): 1470 logging.warning("Unfinished job %s found: %s", job.id, job) 1471 1472 if status == constants.JOB_STATUS_WAITING: 1473 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1474 job.SetPid(os.getpid()) 1475 self._EnqueueJobsUnlocked([job]) 1476 logging.info("Restarting job %s", job.id) 1477 else: 1478 to_encode = errors.OpExecError("Unclean master daemon shutdown") 1479 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1480 _EncodeOpError(to_encode)) 1481 job.Finalize() 1482 1483 self.UpdateJobUnlocked(job)
1484 1485 @locking.ssynchronized(_LOCK)
1486 - def PickupJob(self, job_id):
1487 self._PickupJobUnlocked(job_id)
1488
1489 - def _GetRpc(self, address_list):
1490 """Gets RPC runner with context. 1491 1492 """ 1493 return rpc.JobQueueRunner(self.context, address_list)
1494 1495 @locking.ssynchronized(_LOCK)
1496 - def AddNode(self, node):
1497 """Register a new node with the queue. 1498 1499 @type node: L{objects.Node} 1500 @param node: the node object to be added 1501 1502 """ 1503 node_name = node.name 1504 assert node_name != self._my_hostname 1505 1506 # Clean queue directory on added node 1507 result = self._GetRpc(None).call_jobqueue_purge(node_name) 1508 msg = result.fail_msg 1509 if msg: 1510 logging.warning("Cannot cleanup queue directory on node %s: %s", 1511 node_name, msg) 1512 1513 if not node.master_candidate: 1514 # remove if existing, ignoring errors 1515 self._nodes.pop(node_name, None) 1516 # and skip the replication of the job ids 1517 return 1518 1519 # Upload the whole queue excluding archived jobs 1520 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1521 1522 # Upload current serial file 1523 files.append(pathutils.JOB_QUEUE_SERIAL_FILE) 1524 1525 # Static address list 1526 addrs = [node.primary_ip] 1527 1528 for file_name in files: 1529 # Read file content 1530 content = utils.ReadFile(file_name) 1531 1532 result = _CallJqUpdate(self._GetRpc(addrs), [node_name], 1533 file_name, content) 1534 msg = result[node_name].fail_msg 1535 if msg: 1536 logging.error("Failed to upload file %s to node %s: %s", 1537 file_name, node_name, msg) 1538 1539 msg = result[node_name].fail_msg 1540 if msg: 1541 logging.error("Failed to set queue drained flag on node %s: %s", 1542 node_name, msg) 1543 1544 self._nodes[node_name] = node.primary_ip
1545 1546 @locking.ssynchronized(_LOCK)
1547 - def RemoveNode(self, node_name):
1548 """Callback called when removing nodes from the cluster. 1549 1550 @type node_name: str 1551 @param node_name: the name of the node to remove 1552 1553 """ 1554 self._nodes.pop(node_name, None)
1555 1556 @staticmethod
1557 - def _CheckRpcResult(result, nodes, failmsg):
1558 """Verifies the status of an RPC call. 1559 1560 Since we aim to keep consistency should this node (the current 1561 master) fail, we will log errors if our rpc fail, and especially 1562 log the case when more than half of the nodes fails. 1563 1564 @param result: the data as returned from the rpc call 1565 @type nodes: list 1566 @param nodes: the list of nodes we made the call to 1567 @type failmsg: str 1568 @param failmsg: the identifier to be used for logging 1569 1570 """ 1571 failed = [] 1572 success = [] 1573 1574 for node in nodes: 1575 msg = result[node].fail_msg 1576 if msg: 1577 failed.append(node) 1578 logging.error("RPC call %s (%s) failed on node %s: %s", 1579 result[node].call, failmsg, node, msg) 1580 else: 1581 success.append(node) 1582 1583 # +1 for the master node 1584 if (len(success) + 1) < len(failed): 1585 # TODO: Handle failing nodes 1586 logging.error("More than half of the nodes failed")
1587
1588 - def _GetNodeIp(self):
1589 """Helper for returning the node name/ip list. 1590 1591 @rtype: (list, list) 1592 @return: a tuple of two lists, the first one with the node 1593 names and the second one with the node addresses 1594 1595 """ 1596 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1597 name_list = self._nodes.keys() 1598 addr_list = [self._nodes[name] for name in name_list] 1599 return name_list, addr_list
1600
1601 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1602 """Writes a file locally and then replicates it to all nodes. 1603 1604 This function will replace the contents of a file on the local 1605 node and then replicate it to all the other nodes we have. 1606 1607 @type file_name: str 1608 @param file_name: the path of the file to be replicated 1609 @type data: str 1610 @param data: the new contents of the file 1611 @type replicate: boolean 1612 @param replicate: whether to spread the changes to the remote nodes 1613 1614 """ 1615 getents = runtime.GetEnts() 1616 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1617 gid=getents.daemons_gid, 1618 mode=constants.JOB_QUEUE_FILES_PERMS) 1619 1620 if replicate: 1621 names, addrs = self._GetNodeIp() 1622 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) 1623 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1624
1625 - def _RenameFilesUnlocked(self, rename):
1626 """Renames a file locally and then replicate the change. 1627 1628 This function will rename a file in the local queue directory 1629 and then replicate this rename to all the other nodes we have. 1630 1631 @type rename: list of (old, new) 1632 @param rename: List containing tuples mapping old to new names 1633 1634 """ 1635 # Rename them locally 1636 for old, new in rename: 1637 utils.RenameFile(old, new, mkdir=True) 1638 1639 # ... and on all nodes 1640 names, addrs = self._GetNodeIp() 1641 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1642 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1643 1644 @staticmethod
1645 - def _GetJobPath(job_id):
1646 """Returns the job file for a given job id. 1647 1648 @type job_id: str 1649 @param job_id: the job identifier 1650 @rtype: str 1651 @return: the path to the job file 1652 1653 """ 1654 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1655 1656 @staticmethod
1657 - def _GetArchivedJobPath(job_id):
1658 """Returns the archived job file for a give job id. 1659 1660 @type job_id: str 1661 @param job_id: the job identifier 1662 @rtype: str 1663 @return: the path to the archived job file 1664 1665 """ 1666 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, 1667 jstore.GetArchiveDirectory(job_id), 1668 "job-%s" % job_id)
1669 1670 @staticmethod
1671 - def _DetermineJobDirectories(archived):
1672 """Build list of directories containing job files. 1673 1674 @type archived: bool 1675 @param archived: Whether to include directories for archived jobs 1676 @rtype: list 1677 1678 """ 1679 result = [pathutils.QUEUE_DIR] 1680 1681 if archived: 1682 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR 1683 result.extend(map(compat.partial(utils.PathJoin, archive_path), 1684 utils.ListVisibleFiles(archive_path))) 1685 1686 return result
1687 1688 @classmethod
1689 - def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1690 """Return all known job IDs. 1691 1692 The method only looks at disk because it's a requirement that all 1693 jobs are present on disk (so in the _memcache we don't have any 1694 extra IDs). 1695 1696 @type sort: boolean 1697 @param sort: perform sorting on the returned job ids 1698 @rtype: list 1699 @return: the list of job IDs 1700 1701 """ 1702 jlist = [] 1703 1704 for path in cls._DetermineJobDirectories(archived): 1705 for filename in utils.ListVisibleFiles(path): 1706 m = constants.JOB_FILE_RE.match(filename) 1707 if m: 1708 jlist.append(int(m.group(1))) 1709 1710 if sort: 1711 jlist.sort() 1712 return jlist
1713
1714 - def _LoadJobUnlocked(self, job_id):
1715 """Loads a job from the disk or memory. 1716 1717 Given a job id, this will return the cached job object if 1718 existing, or try to load the job from the disk. If loading from 1719 disk, it will also add the job to the cache. 1720 1721 @type job_id: int 1722 @param job_id: the job id 1723 @rtype: L{_QueuedJob} or None 1724 @return: either None or the job object 1725 1726 """ 1727 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!" 1728 1729 job = self._memcache.get(job_id, None) 1730 if job: 1731 logging.debug("Found job %s in memcache", job_id) 1732 assert job.writable, "Found read-only job in memcache" 1733 return job 1734 1735 try: 1736 job = self._LoadJobFromDisk(job_id, False) 1737 if job is None: 1738 return job 1739 except errors.JobFileCorrupted: 1740 old_path = self._GetJobPath(job_id) 1741 new_path = self._GetArchivedJobPath(job_id) 1742 if old_path == new_path: 1743 # job already archived (future case) 1744 logging.exception("Can't parse job %s", job_id) 1745 else: 1746 # non-archived case 1747 logging.exception("Can't parse job %s, will archive.", job_id) 1748 self._RenameFilesUnlocked([(old_path, new_path)]) 1749 return None 1750 1751 assert job.writable, "Job just loaded is not writable" 1752 1753 self._memcache[job_id] = job 1754 logging.debug("Added job %s to the cache", job_id) 1755 return job
1756
1757 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1758 """Load the given job file from disk. 1759 1760 Given a job file, read, load and restore it in a _QueuedJob format. 1761 1762 @type job_id: int 1763 @param job_id: job identifier 1764 @type try_archived: bool 1765 @param try_archived: Whether to try loading an archived job 1766 @rtype: L{_QueuedJob} or None 1767 @return: either None or the job object 1768 1769 """ 1770 path_functions = [(self._GetJobPath, False)] 1771 1772 if try_archived: 1773 path_functions.append((self._GetArchivedJobPath, True)) 1774 1775 raw_data = None 1776 archived = None 1777 1778 for (fn, archived) in path_functions: 1779 filepath = fn(job_id) 1780 logging.debug("Loading job from %s", filepath) 1781 try: 1782 raw_data = utils.ReadFile(filepath) 1783 except EnvironmentError, err: 1784 if err.errno != errno.ENOENT: 1785 raise 1786 else: 1787 break 1788 1789 if not raw_data: 1790 logging.debug("No data available for job %s", job_id) 1791 if int(job_id) == self.primary_jid: 1792 logging.warning("My own job file (%s) disappeared;" 1793 " this should only happy at cluster desctruction", 1794 job_id) 1795 if mcpu.lusExecuting[0] == 0: 1796 logging.warning("Not in execution; cleaning up myself due to missing" 1797 " job file") 1798 logging.shutdown() 1799 os._exit(1) # pylint: disable=W0212 1800 return None 1801 1802 if writable is None: 1803 writable = not archived 1804 1805 try: 1806 data = serializer.LoadJson(raw_data) 1807 job = _QueuedJob.Restore(self, data, writable, archived) 1808 except Exception, err: # pylint: disable=W0703 1809 raise errors.JobFileCorrupted(err) 1810 1811 return job
1812
1813 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1814 """Load the given job file from disk. 1815 1816 Given a job file, read, load and restore it in a _QueuedJob format. 1817 In case of error reading the job, it gets returned as None, and the 1818 exception is logged. 1819 1820 @type job_id: int 1821 @param job_id: job identifier 1822 @type try_archived: bool 1823 @param try_archived: Whether to try loading an archived job 1824 @rtype: L{_QueuedJob} or None 1825 @return: either None or the job object 1826 1827 """ 1828 try: 1829 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 1830 except (errors.JobFileCorrupted, EnvironmentError): 1831 logging.exception("Can't load/parse job %s", job_id) 1832 return None
1833
1834 - def _UpdateQueueSizeUnlocked(self):
1835 """Update the queue size. 1836 1837 """ 1838 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1839 1840 @classmethod
1841 - def SubmitManyJobs(cls, jobs):
1842 """Create and store multiple jobs. 1843 1844 """ 1845 return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
1846 1847 @staticmethod
1848 - def _FormatSubmitError(msg, ops):
1849 """Formats errors which occurred while submitting a job. 1850 1851 """ 1852 return ("%s; opcodes %s" % 1853 (msg, utils.CommaJoin(op.Summary() for op in ops)))
1854 1855 @staticmethod
1856 - def _ResolveJobDependencies(resolve_fn, deps):
1857 """Resolves relative job IDs in dependencies. 1858 1859 @type resolve_fn: callable 1860 @param resolve_fn: Function to resolve a relative job ID 1861 @type deps: list 1862 @param deps: Dependencies 1863 @rtype: tuple; (boolean, string or list) 1864 @return: If successful (first tuple item), the returned list contains 1865 resolved job IDs along with the requested status; if not successful, 1866 the second element is an error message 1867 1868 """ 1869 result = [] 1870 1871 for (dep_job_id, dep_status) in deps: 1872 if ht.TRelativeJobId(dep_job_id): 1873 assert ht.TInt(dep_job_id) and dep_job_id < 0 1874 try: 1875 job_id = resolve_fn(dep_job_id) 1876 except IndexError: 1877 # Abort 1878 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 1879 else: 1880 job_id = dep_job_id 1881 1882 result.append((job_id, dep_status)) 1883 1884 return (True, result)
1885 1886 @locking.ssynchronized(_LOCK)
1887 - def _EnqueueJobs(self, jobs):
1888 """Helper function to add jobs to worker pool's queue. 1889 1890 @type jobs: list 1891 @param jobs: List of all jobs 1892 1893 """ 1894 return self._EnqueueJobsUnlocked(jobs)
1895
1896 - def _EnqueueJobsUnlocked(self, jobs):
1897 """Helper function to add jobs to worker pool's queue. 1898 1899 @type jobs: list 1900 @param jobs: List of all jobs 1901 1902 """ 1903 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" 1904 self._wpool.AddManyTasks([(job, ) for job in jobs], 1905 priority=[job.CalcPriority() for job in jobs], 1906 task_id=map(_GetIdAttr, jobs))
1907
1908 - def _GetJobStatusForDependencies(self, job_id):
1909 """Gets the status of a job for dependencies. 1910 1911 @type job_id: int 1912 @param job_id: Job ID 1913 @raise errors.JobLost: If job can't be found 1914 1915 """ 1916 # Not using in-memory cache as doing so would require an exclusive lock 1917 1918 # Try to load from disk 1919 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 1920 1921 if job: 1922 assert not job.writable, "Got writable job" # pylint: disable=E1101 1923 1924 if job: 1925 return job.CalcStatus() 1926 1927 raise errors.JobLost("Job %s not found" % job_id)
1928
1929 - def UpdateJobUnlocked(self, job, replicate=True):
1930 """Update a job's on disk storage. 1931 1932 After a job has been modified, this function needs to be called in 1933 order to write the changes to disk and replicate them to the other 1934 nodes. 1935 1936 @type job: L{_QueuedJob} 1937 @param job: the changed job 1938 @type replicate: boolean 1939 @param replicate: whether to replicate the change to remote nodes 1940 1941 """ 1942 if __debug__: 1943 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 1944 assert (finalized ^ (job.end_timestamp is None)) 1945 assert job.writable, "Can't update read-only job" 1946 assert not job.archived, "Can't update archived job" 1947 1948 filename = self._GetJobPath(job.id) 1949 data = serializer.DumpJson(job.Serialize()) 1950 logging.debug("Writing job %s to %s", job.id, filename) 1951 self._UpdateJobQueueFile(filename, data, replicate)
1952
1953 - def HasJobBeenFinalized(self, job_id):
1954 """Checks if a job has been finalized. 1955 1956 @type job_id: int 1957 @param job_id: Job identifier 1958 @rtype: boolean 1959 @return: True if the job has been finalized, 1960 False if the timeout has been reached, 1961 None if the job doesn't exist 1962 1963 """ 1964 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 1965 if job is not None: 1966 return job.CalcStatus() in constants.JOBS_FINALIZED 1967 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed: 1968 # FIXME: The above variable is a temporary workaround until the Python job 1969 # queue is completely removed. When removing the job queue, also remove 1970 # the variable from LUClusterDestroy. 1971 return True 1972 else: 1973 return None
1974 1975 @locking.ssynchronized(_LOCK)
1976 - def CancelJob(self, job_id):
1977 """Cancels a job. 1978 1979 This will only succeed if the job has not started yet. 1980 1981 @type job_id: int 1982 @param job_id: job ID of job to be cancelled. 1983 1984 """ 1985 logging.info("Cancelling job %s", job_id) 1986 1987 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1988 1989 @locking.ssynchronized(_LOCK)
1990 - def ChangeJobPriority(self, job_id, priority):
1991 """Changes a job's priority. 1992 1993 @type job_id: int 1994 @param job_id: ID of the job whose priority should be changed 1995 @type priority: int 1996 @param priority: New priority 1997 1998 """ 1999 logging.info("Changing priority of job %s to %s", job_id, priority) 2000 2001 if priority not in constants.OP_PRIO_SUBMIT_VALID: 2002 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2003 raise errors.GenericError("Invalid priority %s, allowed are %s" % 2004 (priority, allowed)) 2005 2006 def fn(job): 2007 (success, msg) = job.ChangePriority(priority) 2008 2009 if success: 2010 try: 2011 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) 2012 except workerpool.NoSuchTask: 2013 logging.debug("Job %s is not in workerpool at this time", job.id) 2014 2015 return (success, msg)
2016 2017 return self._ModifyJobUnlocked(job_id, fn)
2018
2019 - def _ModifyJobUnlocked(self, job_id, mod_fn):
2020 """Modifies a job. 2021 2022 @type job_id: int 2023 @param job_id: Job ID 2024 @type mod_fn: callable 2025 @param mod_fn: Modifying function, receiving job object as parameter, 2026 returning tuple of (status boolean, message string) 2027 2028 """ 2029 job = self._LoadJobUnlocked(job_id) 2030 if not job: 2031 logging.debug("Job %s not found", job_id) 2032 return (False, "Job %s not found" % job_id) 2033 2034 assert job.writable, "Can't modify read-only job" 2035 assert not job.archived, "Can't modify archived job" 2036 2037 (success, msg) = mod_fn(job) 2038 2039 if success: 2040 # If the job was finalized (e.g. cancelled), this is the final write 2041 # allowed. The job can be archived anytime. 2042 self.UpdateJobUnlocked(job) 2043 2044 return (success, msg)
2045
2046 - def _ArchiveJobsUnlocked(self, jobs):
2047 """Archives jobs. 2048 2049 @type jobs: list of L{_QueuedJob} 2050 @param jobs: Job objects 2051 @rtype: int 2052 @return: Number of archived jobs 2053 2054 """ 2055 archive_jobs = [] 2056 rename_files = [] 2057 for job in jobs: 2058 assert job.writable, "Can't archive read-only job" 2059 assert not job.archived, "Can't cancel archived job" 2060 2061 if job.CalcStatus() not in constants.JOBS_FINALIZED: 2062 logging.debug("Job %s is not yet done", job.id) 2063 continue 2064 2065 archive_jobs.append(job) 2066 2067 old = self._GetJobPath(job.id) 2068 new = self._GetArchivedJobPath(job.id) 2069 rename_files.append((old, new)) 2070 2071 # TODO: What if 1..n files fail to rename? 2072 self._RenameFilesUnlocked(rename_files) 2073 2074 logging.debug("Successfully archived job(s) %s", 2075 utils.CommaJoin(job.id for job in archive_jobs)) 2076 2077 # Since we haven't quite checked, above, if we succeeded or failed renaming 2078 # the files, we update the cached queue size from the filesystem. When we 2079 # get around to fix the TODO: above, we can use the number of actually 2080 # archived jobs to fix this. 2081 self._UpdateQueueSizeUnlocked() 2082 return len(archive_jobs)
2083
2084 - def _Query(self, fields, qfilter):
2085 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, 2086 namefield="id") 2087 2088 # Archived jobs are only looked at if the "archived" field is referenced 2089 # either as a requested field or in the filter. By default archived jobs 2090 # are ignored. 2091 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData()) 2092 2093 job_ids = qobj.RequestedNames() 2094 2095 list_all = (job_ids is None) 2096 2097 if list_all: 2098 # Since files are added to/removed from the queue atomically, there's no 2099 # risk of getting the job ids in an inconsistent state. 2100 job_ids = self._GetJobIDsUnlocked(archived=include_archived) 2101 2102 jobs = [] 2103 2104 for job_id in job_ids: 2105 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2106 if job is not None or not list_all: 2107 jobs.append((job_id, job)) 2108 2109 return (qobj, jobs, list_all)
2110
2111 - def QueryJobs(self, fields, qfilter):
2112 """Returns a list of jobs in queue. 2113 2114 @type fields: sequence 2115 @param fields: List of wanted fields 2116 @type qfilter: None or query2 filter (list) 2117 @param qfilter: Query filter 2118 2119 """ 2120 (qobj, ctx, _) = self._Query(fields, qfilter) 2121 2122 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2123
2124 - def OldStyleQueryJobs(self, job_ids, fields):
2125 """Returns a list of jobs in queue. 2126 2127 @type job_ids: list 2128 @param job_ids: sequence of job identifiers or None for all 2129 @type fields: list 2130 @param fields: names of fields to return 2131 @rtype: list 2132 @return: list one element per job, each element being list with 2133 the requested fields 2134 2135 """ 2136 # backwards compat: 2137 job_ids = [int(jid) for jid in job_ids] 2138 qfilter = qlang.MakeSimpleFilter("id", job_ids) 2139 2140 (qobj, ctx, _) = self._Query(fields, qfilter) 2141 2142 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2143 2144 @locking.ssynchronized(_LOCK)
2145 - def PrepareShutdown(self):
2146 """Prepare to stop the job queue. 2147 2148 Returns whether there are any jobs currently running. If the latter is the 2149 case, the job queue is not yet ready for shutdown. Once this function 2150 returns C{True} L{Shutdown} can be called without interfering with any job. 2151 2152 @rtype: bool 2153 @return: Whether there are any running jobs 2154 2155 """ 2156 return self._wpool.HasRunningTasks()
2157 2158 @locking.ssynchronized(_LOCK)
2159 - def Shutdown(self):
2160 """Stops the job queue. 2161 2162 This shutdowns all the worker threads an closes the queue. 2163 2164 """ 2165 self._wpool.TerminateWorkers()
2166