Package ganeti :: Package jqueue
[hide private]
[frames] | no frames]

Source Code for Package ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Module implementing the job queue handling. 
  32   
  33  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  34  used by all other classes in this module. 
  35   
  36  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  37      processing jobs 
  38   
  39  """ 
  40   
  41  import logging 
  42  import errno 
  43  import time 
  44  import weakref 
  45  import threading 
  46  import itertools 
  47  import operator 
  48  import os 
  49   
  50  try: 
  51    # pylint: disable=E0611 
  52    from pyinotify import pyinotify 
  53  except ImportError: 
  54    import pyinotify 
  55   
  56  from ganeti import asyncnotifier 
  57  from ganeti import constants 
  58  from ganeti import serializer 
  59  from ganeti import workerpool 
  60  from ganeti import locking 
  61  from ganeti import luxi 
  62  from ganeti import opcodes 
  63  from ganeti import opcodes_base 
  64  from ganeti import errors 
  65  from ganeti import mcpu 
  66  from ganeti import utils 
  67  from ganeti import jstore 
  68  import ganeti.rpc.node as rpc 
  69  from ganeti import runtime 
  70  from ganeti import netutils 
  71  from ganeti import compat 
  72  from ganeti import ht 
  73  from ganeti import query 
  74  from ganeti import qlang 
  75  from ganeti import pathutils 
  76  from ganeti import vcluster 
  77  from ganeti.cmdlib import cluster 
  78   
  79   
  80  JOBQUEUE_THREADS = 1 
  81   
  82  # member lock names to be passed to @ssynchronized decorator 
  83  _LOCK = "_lock" 
  84  _QUEUE = "_queue" 
  85   
  86  #: Retrieves "id" attribute 
  87  _GetIdAttr = operator.attrgetter("id") 
88 89 90 -class CancelJob(Exception):
91 """Special exception to cancel a job. 92 93 """
94
95 96 -def TimeStampNow():
97 """Returns the current timestamp. 98 99 @rtype: tuple 100 @return: the current time in the (seconds, microseconds) format 101 102 """ 103 return utils.SplitTime(time.time())
104
105 106 -def _CallJqUpdate(runner, names, file_name, content):
107 """Updates job queue file after virtualizing filename. 108 109 """ 110 virt_file_name = vcluster.MakeVirtualPath(file_name) 111 return runner.call_jobqueue_update(names, virt_file_name, content)
112
113 114 -class _QueuedOpCode(object):
115 """Encapsulates an opcode object. 116 117 @ivar log: holds the execution log and consists of tuples 118 of the form C{(log_serial, timestamp, level, message)} 119 @ivar input: the OpCode we encapsulate 120 @ivar status: the current status 121 @ivar result: the result of the LU execution 122 @ivar start_timestamp: timestamp for the start of the execution 123 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 124 @ivar stop_timestamp: timestamp for the end of the execution 125 126 """ 127 __slots__ = ["input", "status", "result", "log", "priority", 128 "start_timestamp", "exec_timestamp", "end_timestamp", 129 "__weakref__"] 130
131 - def __init__(self, op):
132 """Initializes instances of this class. 133 134 @type op: L{opcodes.OpCode} 135 @param op: the opcode we encapsulate 136 137 """ 138 self.input = op 139 self.status = constants.OP_STATUS_QUEUED 140 self.result = None 141 self.log = [] 142 self.start_timestamp = None 143 self.exec_timestamp = None 144 self.end_timestamp = None 145 146 # Get initial priority (it might change during the lifetime of this opcode) 147 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
148 149 @classmethod
150 - def Restore(cls, state):
151 """Restore the _QueuedOpCode from the serialized form. 152 153 @type state: dict 154 @param state: the serialized state 155 @rtype: _QueuedOpCode 156 @return: a new _QueuedOpCode instance 157 158 """ 159 obj = _QueuedOpCode.__new__(cls) 160 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 161 obj.status = state["status"] 162 obj.result = state["result"] 163 obj.log = state["log"] 164 obj.start_timestamp = state.get("start_timestamp", None) 165 obj.exec_timestamp = state.get("exec_timestamp", None) 166 obj.end_timestamp = state.get("end_timestamp", None) 167 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 168 return obj
169
170 - def Serialize(self):
171 """Serializes this _QueuedOpCode. 172 173 @rtype: dict 174 @return: the dictionary holding the serialized state 175 176 """ 177 return { 178 "input": self.input.__getstate__(), 179 "status": self.status, 180 "result": self.result, 181 "log": self.log, 182 "start_timestamp": self.start_timestamp, 183 "exec_timestamp": self.exec_timestamp, 184 "end_timestamp": self.end_timestamp, 185 "priority": self.priority, 186 }
187
188 189 -class _QueuedJob(object):
190 """In-memory job representation. 191 192 This is what we use to track the user-submitted jobs. Locking must 193 be taken care of by users of this class. 194 195 @type queue: L{JobQueue} 196 @ivar queue: the parent queue 197 @ivar id: the job ID 198 @type ops: list 199 @ivar ops: the list of _QueuedOpCode that constitute the job 200 @type log_serial: int 201 @ivar log_serial: holds the index for the next log entry 202 @ivar received_timestamp: the timestamp for when the job was received 203 @ivar start_timestmap: the timestamp for start of execution 204 @ivar end_timestamp: the timestamp for end of execution 205 @ivar writable: Whether the job is allowed to be modified 206 207 """ 208 # pylint: disable=W0212 209 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 210 "received_timestamp", "start_timestamp", "end_timestamp", 211 "processor_lock", "writable", "archived", 212 "livelock", "process_id", 213 "__weakref__"] 214
215 - def AddReasons(self, pickup=False):
216 """Extend the reason trail 217 218 Add the reason for all the opcodes of this job to be executed. 219 220 """ 221 count = 0 222 for queued_op in self.ops: 223 op = queued_op.input 224 if pickup: 225 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP 226 else: 227 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE 228 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__, 229 reason_src_prefix) 230 reason_text = "job=%d;index=%d" % (self.id, count) 231 reason = getattr(op, "reason", []) 232 reason.append((reason_src, reason_text, utils.EpochNano())) 233 op.reason = reason 234 count = count + 1
235
236 - def __init__(self, queue, job_id, ops, writable):
237 """Constructor for the _QueuedJob. 238 239 @type queue: L{JobQueue} 240 @param queue: our parent queue 241 @type job_id: job_id 242 @param job_id: our job id 243 @type ops: list 244 @param ops: the list of opcodes we hold, which will be encapsulated 245 in _QueuedOpCodes 246 @type writable: bool 247 @param writable: Whether job can be modified 248 249 """ 250 if not ops: 251 raise errors.GenericError("A job needs at least one opcode") 252 253 self.queue = queue 254 self.id = int(job_id) 255 self.ops = [_QueuedOpCode(op) for op in ops] 256 self.AddReasons() 257 self.log_serial = 0 258 self.received_timestamp = TimeStampNow() 259 self.start_timestamp = None 260 self.end_timestamp = None 261 self.archived = False 262 self.livelock = None 263 self.process_id = None 264 265 self._InitInMemory(self, writable) 266 267 assert not self.archived, "New jobs can not be marked as archived"
268 269 @staticmethod
270 - def _InitInMemory(obj, writable):
271 """Initializes in-memory variables. 272 273 """ 274 obj.writable = writable 275 obj.ops_iter = None 276 obj.cur_opctx = None 277 278 # Read-only jobs are not processed and therefore don't need a lock 279 if writable: 280 obj.processor_lock = threading.Lock() 281 else: 282 obj.processor_lock = None
283
284 - def __repr__(self):
285 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 286 "id=%s" % self.id, 287 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 288 289 return "<%s at %#x>" % (" ".join(status), id(self))
290 291 @classmethod
292 - def Restore(cls, queue, state, writable, archived):
293 """Restore a _QueuedJob from serialized state: 294 295 @type queue: L{JobQueue} 296 @param queue: to which queue the restored job belongs 297 @type state: dict 298 @param state: the serialized state 299 @type writable: bool 300 @param writable: Whether job can be modified 301 @type archived: bool 302 @param archived: Whether job was already archived 303 @rtype: _JobQueue 304 @return: the restored _JobQueue instance 305 306 """ 307 obj = _QueuedJob.__new__(cls) 308 obj.queue = queue 309 obj.id = int(state["id"]) 310 obj.received_timestamp = state.get("received_timestamp", None) 311 obj.start_timestamp = state.get("start_timestamp", None) 312 obj.end_timestamp = state.get("end_timestamp", None) 313 obj.archived = archived 314 obj.livelock = state.get("livelock", None) 315 obj.process_id = state.get("process_id", None) 316 if obj.process_id is not None: 317 obj.process_id = int(obj.process_id) 318 319 obj.ops = [] 320 obj.log_serial = 0 321 for op_state in state["ops"]: 322 op = _QueuedOpCode.Restore(op_state) 323 for log_entry in op.log: 324 obj.log_serial = max(obj.log_serial, log_entry[0]) 325 obj.ops.append(op) 326 327 cls._InitInMemory(obj, writable) 328 329 return obj
330
331 - def Serialize(self):
332 """Serialize the _JobQueue instance. 333 334 @rtype: dict 335 @return: the serialized state 336 337 """ 338 return { 339 "id": self.id, 340 "ops": [op.Serialize() for op in self.ops], 341 "start_timestamp": self.start_timestamp, 342 "end_timestamp": self.end_timestamp, 343 "received_timestamp": self.received_timestamp, 344 "livelock": self.livelock, 345 "process_id": self.process_id, 346 }
347
348 - def CalcStatus(self):
349 """Compute the status of this job. 350 351 This function iterates over all the _QueuedOpCodes in the job and 352 based on their status, computes the job status. 353 354 The algorithm is: 355 - if we find a cancelled, or finished with error, the job 356 status will be the same 357 - otherwise, the last opcode with the status one of: 358 - waitlock 359 - canceling 360 - running 361 362 will determine the job status 363 364 - otherwise, it means either all opcodes are queued, or success, 365 and the job status will be the same 366 367 @return: the job status 368 369 """ 370 status = constants.JOB_STATUS_QUEUED 371 372 all_success = True 373 for op in self.ops: 374 if op.status == constants.OP_STATUS_SUCCESS: 375 continue 376 377 all_success = False 378 379 if op.status == constants.OP_STATUS_QUEUED: 380 pass 381 elif op.status == constants.OP_STATUS_WAITING: 382 status = constants.JOB_STATUS_WAITING 383 elif op.status == constants.OP_STATUS_RUNNING: 384 status = constants.JOB_STATUS_RUNNING 385 elif op.status == constants.OP_STATUS_CANCELING: 386 status = constants.JOB_STATUS_CANCELING 387 break 388 elif op.status == constants.OP_STATUS_ERROR: 389 status = constants.JOB_STATUS_ERROR 390 # The whole job fails if one opcode failed 391 break 392 elif op.status == constants.OP_STATUS_CANCELED: 393 status = constants.OP_STATUS_CANCELED 394 break 395 396 if all_success: 397 status = constants.JOB_STATUS_SUCCESS 398 399 return status
400
401 - def CalcPriority(self):
402 """Gets the current priority for this job. 403 404 Only unfinished opcodes are considered. When all are done, the default 405 priority is used. 406 407 @rtype: int 408 409 """ 410 priorities = [op.priority for op in self.ops 411 if op.status not in constants.OPS_FINALIZED] 412 413 if not priorities: 414 # All opcodes are done, assume default priority 415 return constants.OP_PRIO_DEFAULT 416 417 return min(priorities)
418
419 - def GetLogEntries(self, newer_than):
420 """Selectively returns the log entries. 421 422 @type newer_than: None or int 423 @param newer_than: if this is None, return all log entries, 424 otherwise return only the log entries with serial higher 425 than this value 426 @rtype: list 427 @return: the list of the log entries selected 428 429 """ 430 if newer_than is None: 431 serial = -1 432 else: 433 serial = newer_than 434 435 entries = [] 436 for op in self.ops: 437 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 438 439 return entries
440
441 - def MarkUnfinishedOps(self, status, result):
442 """Mark unfinished opcodes with a given status and result. 443 444 This is an utility function for marking all running or waiting to 445 be run opcodes with a given status. Opcodes which are already 446 finalised are not changed. 447 448 @param status: a given opcode status 449 @param result: the opcode result 450 451 """ 452 not_marked = True 453 for op in self.ops: 454 if op.status in constants.OPS_FINALIZED: 455 assert not_marked, "Finalized opcodes found after non-finalized ones" 456 continue 457 op.status = status 458 op.result = result 459 not_marked = False
460
461 - def Finalize(self):
462 """Marks the job as finalized. 463 464 """ 465 self.end_timestamp = TimeStampNow()
466
467 - def Cancel(self):
468 """Marks job as canceled/-ing if possible. 469 470 @rtype: tuple; (bool, string) 471 @return: Boolean describing whether job was successfully canceled or marked 472 as canceling and a text message 473 474 """ 475 status = self.CalcStatus() 476 477 if status == constants.JOB_STATUS_QUEUED: 478 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 479 "Job canceled by request") 480 self.Finalize() 481 return (True, "Job %s canceled" % self.id) 482 483 elif status == constants.JOB_STATUS_WAITING: 484 # The worker will notice the new status and cancel the job 485 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 486 return (True, "Job %s will be canceled" % self.id) 487 488 else: 489 logging.debug("Job %s is no longer waiting in the queue", self.id) 490 return (False, "Job %s is no longer waiting in the queue" % self.id)
491
492 - def ChangePriority(self, priority):
493 """Changes the job priority. 494 495 @type priority: int 496 @param priority: New priority 497 @rtype: tuple; (bool, string) 498 @return: Boolean describing whether job's priority was successfully changed 499 and a text message 500 501 """ 502 status = self.CalcStatus() 503 504 if status in constants.JOBS_FINALIZED: 505 return (False, "Job %s is finished" % self.id) 506 elif status == constants.JOB_STATUS_CANCELING: 507 return (False, "Job %s is cancelling" % self.id) 508 else: 509 assert status in (constants.JOB_STATUS_QUEUED, 510 constants.JOB_STATUS_WAITING, 511 constants.JOB_STATUS_RUNNING) 512 513 changed = False 514 for op in self.ops: 515 if (op.status == constants.OP_STATUS_RUNNING or 516 op.status in constants.OPS_FINALIZED): 517 assert not changed, \ 518 ("Found opcode for which priority should not be changed after" 519 " priority has been changed for previous opcodes") 520 continue 521 522 assert op.status in (constants.OP_STATUS_QUEUED, 523 constants.OP_STATUS_WAITING) 524 525 changed = True 526 527 # Set new priority (doesn't modify opcode input) 528 op.priority = priority 529 530 if changed: 531 return (True, ("Priorities of pending opcodes for job %s have been" 532 " changed to %s" % (self.id, priority))) 533 else: 534 return (False, "Job %s had no pending opcodes" % self.id)
535
536 - def SetPid(self, pid):
537 """Sets the job's process ID 538 539 @type pid: int 540 @param pid: the process ID 541 542 """ 543 status = self.CalcStatus() 544 545 if status in (constants.JOB_STATUS_QUEUED, 546 constants.JOB_STATUS_WAITING): 547 if self.process_id is not None: 548 logging.warning("Replacing the process id %s of job %s with %s", 549 self.process_id, self.id, pid) 550 self.process_id = pid 551 else: 552 logging.warning("Can set pid only for queued/waiting jobs")
553
554 555 -class _OpExecCallbacks(mcpu.OpExecCbBase):
556
557 - def __init__(self, queue, job, op):
558 """Initializes this class. 559 560 @type queue: L{JobQueue} 561 @param queue: Job queue 562 @type job: L{_QueuedJob} 563 @param job: Job object 564 @type op: L{_QueuedOpCode} 565 @param op: OpCode 566 567 """ 568 super(_OpExecCallbacks, self).__init__() 569 570 assert queue, "Queue is missing" 571 assert job, "Job is missing" 572 assert op, "Opcode is missing" 573 574 self._queue = queue 575 self._job = job 576 self._op = op
577
578 - def _CheckCancel(self):
579 """Raises an exception to cancel the job if asked to. 580 581 """ 582 # Cancel here if we were asked to 583 if self._op.status == constants.OP_STATUS_CANCELING: 584 logging.debug("Canceling opcode") 585 raise CancelJob()
586 587 @locking.ssynchronized(_QUEUE, shared=1)
588 - def NotifyStart(self):
589 """Mark the opcode as running, not lock-waiting. 590 591 This is called from the mcpu code as a notifier function, when the LU is 592 finally about to start the Exec() method. Of course, to have end-user 593 visible results, the opcode must be initially (before calling into 594 Processor.ExecOpCode) set to OP_STATUS_WAITING. 595 596 """ 597 assert self._op in self._job.ops 598 assert self._op.status in (constants.OP_STATUS_WAITING, 599 constants.OP_STATUS_CANCELING) 600 601 # Cancel here if we were asked to 602 self._CheckCancel() 603 604 logging.debug("Opcode is now running") 605 606 self._op.status = constants.OP_STATUS_RUNNING 607 self._op.exec_timestamp = TimeStampNow() 608 609 # And finally replicate the job status 610 self._queue.UpdateJobUnlocked(self._job)
611 612 @locking.ssynchronized(_QUEUE, shared=1)
613 - def NotifyRetry(self):
614 """Mark opcode again as lock-waiting. 615 616 This is called from the mcpu code just after calling PrepareRetry. 617 The opcode will now again acquire locks (more, hopefully). 618 619 """ 620 self._op.status = constants.OP_STATUS_WAITING 621 logging.debug("Opcode will be retried. Back to waiting.")
622 623 @locking.ssynchronized(_QUEUE, shared=1)
624 - def _AppendFeedback(self, timestamp, log_type, log_msg):
625 """Internal feedback append function, with locks 626 627 """ 628 self._job.log_serial += 1 629 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 630 self._queue.UpdateJobUnlocked(self._job, replicate=False)
631
632 - def Feedback(self, *args):
633 """Append a log entry. 634 635 """ 636 assert len(args) < 3 637 638 if len(args) == 1: 639 log_type = constants.ELOG_MESSAGE 640 log_msg = args[0] 641 else: 642 (log_type, log_msg) = args 643 644 # The time is split to make serialization easier and not lose 645 # precision. 646 timestamp = utils.SplitTime(time.time()) 647 self._AppendFeedback(timestamp, log_type, log_msg)
648
649 - def CurrentPriority(self):
650 """Returns current priority for opcode. 651 652 """ 653 assert self._op.status in (constants.OP_STATUS_WAITING, 654 constants.OP_STATUS_CANCELING) 655 656 # Cancel here if we were asked to 657 self._CheckCancel() 658 659 return self._op.priority
660
661 - def SubmitManyJobs(self, jobs):
662 """Submits jobs for processing. 663 664 See L{JobQueue.SubmitManyJobs}. 665 666 """ 667 # Locking is done in job queue 668 return self._queue.SubmitManyJobs(jobs)
669
670 671 -def _EncodeOpError(err):
672 """Encodes an error which occurred while processing an opcode. 673 674 """ 675 if isinstance(err, errors.GenericError): 676 to_encode = err 677 else: 678 to_encode = errors.OpExecError(str(err)) 679 680 return errors.EncodeException(to_encode)
681
682 683 -class _TimeoutStrategyWrapper:
684 - def __init__(self, fn):
685 """Initializes this class. 686 687 """ 688 self._fn = fn 689 self._next = None
690
691 - def _Advance(self):
692 """Gets the next timeout if necessary. 693 694 """ 695 if self._next is None: 696 self._next = self._fn()
697
698 - def Peek(self):
699 """Returns the next timeout. 700 701 """ 702 self._Advance() 703 return self._next
704
705 - def Next(self):
706 """Returns the current timeout and advances the internal state. 707 708 """ 709 self._Advance() 710 result = self._next 711 self._next = None 712 return result
713
714 715 -class _OpExecContext:
716 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
717 """Initializes this class. 718 719 """ 720 self.op = op 721 self.index = index 722 self.log_prefix = log_prefix 723 self.summary = op.input.Summary() 724 725 # Create local copy to modify 726 if getattr(op.input, opcodes_base.DEPEND_ATTR, None): 727 self.jobdeps = op.input.depends[:] 728 else: 729 self.jobdeps = None 730 731 self._timeout_strategy_factory = timeout_strategy_factory 732 self._ResetTimeoutStrategy()
733
734 - def _ResetTimeoutStrategy(self):
735 """Creates a new timeout strategy. 736 737 """ 738 self._timeout_strategy = \ 739 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
740
741 - def CheckPriorityIncrease(self):
742 """Checks whether priority can and should be increased. 743 744 Called when locks couldn't be acquired. 745 746 """ 747 op = self.op 748 749 # Exhausted all retries and next round should not use blocking acquire 750 # for locks? 751 if (self._timeout_strategy.Peek() is None and 752 op.priority > constants.OP_PRIO_HIGHEST): 753 logging.debug("Increasing priority") 754 op.priority -= 1 755 self._ResetTimeoutStrategy() 756 return True 757 758 return False
759
760 - def GetNextLockTimeout(self):
761 """Returns the next lock acquire timeout. 762 763 """ 764 return self._timeout_strategy.Next()
765
766 767 -class _JobProcessor(object):
768 (DEFER, 769 WAITDEP, 770 FINISHED) = range(1, 4) 771
772 - def __init__(self, queue, opexec_fn, job, 773 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
774 """Initializes this class. 775 776 """ 777 self.queue = queue 778 self.opexec_fn = opexec_fn 779 self.job = job 780 self._timeout_strategy_factory = _timeout_strategy_factory
781 782 @staticmethod
783 - def _FindNextOpcode(job, timeout_strategy_factory):
784 """Locates the next opcode to run. 785 786 @type job: L{_QueuedJob} 787 @param job: Job object 788 @param timeout_strategy_factory: Callable to create new timeout strategy 789 790 """ 791 # Create some sort of a cache to speed up locating next opcode for future 792 # lookups 793 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 794 # pending and one for processed ops. 795 if job.ops_iter is None: 796 job.ops_iter = enumerate(job.ops) 797 798 # Find next opcode to run 799 while True: 800 try: 801 (idx, op) = job.ops_iter.next() 802 except StopIteration: 803 raise errors.ProgrammerError("Called for a finished job") 804 805 if op.status == constants.OP_STATUS_RUNNING: 806 # Found an opcode already marked as running 807 raise errors.ProgrammerError("Called for job marked as running") 808 809 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 810 timeout_strategy_factory) 811 812 if op.status not in constants.OPS_FINALIZED: 813 return opctx 814 815 # This is a job that was partially completed before master daemon 816 # shutdown, so it can be expected that some opcodes are already 817 # completed successfully (if any did error out, then the whole job 818 # should have been aborted and not resubmitted for processing). 819 logging.info("%s: opcode %s already processed, skipping", 820 opctx.log_prefix, opctx.summary)
821 822 @staticmethod
823 - def _MarkWaitlock(job, op):
824 """Marks an opcode as waiting for locks. 825 826 The job's start timestamp is also set if necessary. 827 828 @type job: L{_QueuedJob} 829 @param job: Job object 830 @type op: L{_QueuedOpCode} 831 @param op: Opcode object 832 833 """ 834 assert op in job.ops 835 assert op.status in (constants.OP_STATUS_QUEUED, 836 constants.OP_STATUS_WAITING) 837 838 update = False 839 840 op.result = None 841 842 if op.status == constants.OP_STATUS_QUEUED: 843 op.status = constants.OP_STATUS_WAITING 844 update = True 845 846 if op.start_timestamp is None: 847 op.start_timestamp = TimeStampNow() 848 update = True 849 850 if job.start_timestamp is None: 851 job.start_timestamp = op.start_timestamp 852 update = True 853 854 assert op.status == constants.OP_STATUS_WAITING 855 856 return update
857 858 @staticmethod
859 - def _CheckDependencies(queue, job, opctx):
860 """Checks if an opcode has dependencies and if so, processes them. 861 862 @type queue: L{JobQueue} 863 @param queue: Queue object 864 @type job: L{_QueuedJob} 865 @param job: Job object 866 @type opctx: L{_OpExecContext} 867 @param opctx: Opcode execution context 868 @rtype: bool 869 @return: Whether opcode will be re-scheduled by dependency tracker 870 871 """ 872 op = opctx.op 873 874 result = False 875 876 while opctx.jobdeps: 877 (dep_job_id, dep_status) = opctx.jobdeps[0] 878 879 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 880 dep_status) 881 assert ht.TNonEmptyString(depmsg), "No dependency message" 882 883 logging.info("%s: %s", opctx.log_prefix, depmsg) 884 885 if depresult == _JobDependencyManager.CONTINUE: 886 # Remove dependency and continue 887 opctx.jobdeps.pop(0) 888 889 elif depresult == _JobDependencyManager.WAIT: 890 # Need to wait for notification, dependency tracker will re-add job 891 # to workerpool 892 result = True 893 break 894 895 elif depresult == _JobDependencyManager.CANCEL: 896 # Job was cancelled, cancel this job as well 897 job.Cancel() 898 assert op.status == constants.OP_STATUS_CANCELING 899 break 900 901 elif depresult in (_JobDependencyManager.WRONGSTATUS, 902 _JobDependencyManager.ERROR): 903 # Job failed or there was an error, this job must fail 904 op.status = constants.OP_STATUS_ERROR 905 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 906 break 907 908 else: 909 raise errors.ProgrammerError("Unknown dependency result '%s'" % 910 depresult) 911 912 return result
913
914 - def _ExecOpCodeUnlocked(self, opctx):
915 """Processes one opcode and returns the result. 916 917 """ 918 op = opctx.op 919 920 assert op.status in (constants.OP_STATUS_WAITING, 921 constants.OP_STATUS_CANCELING) 922 923 # The very last check if the job was cancelled before trying to execute 924 if op.status == constants.OP_STATUS_CANCELING: 925 return (constants.OP_STATUS_CANCELING, None) 926 927 timeout = opctx.GetNextLockTimeout() 928 929 try: 930 # Make sure not to hold queue lock while calling ExecOpCode 931 result = self.opexec_fn(op.input, 932 _OpExecCallbacks(self.queue, self.job, op), 933 timeout=timeout) 934 except mcpu.LockAcquireTimeout: 935 assert timeout is not None, "Received timeout for blocking acquire" 936 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 937 938 assert op.status in (constants.OP_STATUS_WAITING, 939 constants.OP_STATUS_CANCELING) 940 941 # Was job cancelled while we were waiting for the lock? 942 if op.status == constants.OP_STATUS_CANCELING: 943 return (constants.OP_STATUS_CANCELING, None) 944 945 # Stay in waitlock while trying to re-acquire lock 946 return (constants.OP_STATUS_WAITING, None) 947 except CancelJob: 948 logging.exception("%s: Canceling job", opctx.log_prefix) 949 assert op.status == constants.OP_STATUS_CANCELING 950 return (constants.OP_STATUS_CANCELING, None) 951 952 except Exception, err: # pylint: disable=W0703 953 logging.exception("%s: Caught exception in %s", 954 opctx.log_prefix, opctx.summary) 955 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 956 else: 957 logging.debug("%s: %s successful", 958 opctx.log_prefix, opctx.summary) 959 return (constants.OP_STATUS_SUCCESS, result)
960
961 - def __call__(self, _nextop_fn=None):
962 """Continues execution of a job. 963 964 @param _nextop_fn: Callback function for tests 965 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 966 be deferred and C{WAITDEP} if the dependency manager 967 (L{_JobDependencyManager}) will re-schedule the job when appropriate 968 969 """ 970 queue = self.queue 971 job = self.job 972 973 logging.debug("Processing job %s", job.id) 974 975 queue.acquire(shared=1) 976 try: 977 opcount = len(job.ops) 978 979 assert job.writable, "Expected writable job" 980 981 # Don't do anything for finalized jobs 982 if job.CalcStatus() in constants.JOBS_FINALIZED: 983 return self.FINISHED 984 985 # Is a previous opcode still pending? 986 if job.cur_opctx: 987 opctx = job.cur_opctx 988 job.cur_opctx = None 989 else: 990 if __debug__ and _nextop_fn: 991 _nextop_fn() 992 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 993 994 op = opctx.op 995 996 # Consistency check 997 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 998 constants.OP_STATUS_CANCELING) 999 for i in job.ops[opctx.index + 1:]) 1000 1001 assert op.status in (constants.OP_STATUS_QUEUED, 1002 constants.OP_STATUS_WAITING, 1003 constants.OP_STATUS_CANCELING) 1004 1005 assert (op.priority <= constants.OP_PRIO_LOWEST and 1006 op.priority >= constants.OP_PRIO_HIGHEST) 1007 1008 waitjob = None 1009 1010 if op.status != constants.OP_STATUS_CANCELING: 1011 assert op.status in (constants.OP_STATUS_QUEUED, 1012 constants.OP_STATUS_WAITING) 1013 1014 # Prepare to start opcode 1015 if self._MarkWaitlock(job, op): 1016 # Write to disk 1017 queue.UpdateJobUnlocked(job) 1018 1019 assert op.status == constants.OP_STATUS_WAITING 1020 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1021 assert job.start_timestamp and op.start_timestamp 1022 assert waitjob is None 1023 1024 # Check if waiting for a job is necessary 1025 waitjob = self._CheckDependencies(queue, job, opctx) 1026 1027 assert op.status in (constants.OP_STATUS_WAITING, 1028 constants.OP_STATUS_CANCELING, 1029 constants.OP_STATUS_ERROR) 1030 1031 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1032 constants.OP_STATUS_ERROR)): 1033 logging.info("%s: opcode %s waiting for locks", 1034 opctx.log_prefix, opctx.summary) 1035 1036 assert not opctx.jobdeps, "Not all dependencies were removed" 1037 1038 queue.release() 1039 try: 1040 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1041 finally: 1042 queue.acquire(shared=1) 1043 1044 op.status = op_status 1045 op.result = op_result 1046 1047 assert not waitjob 1048 1049 if op.status in (constants.OP_STATUS_WAITING, 1050 constants.OP_STATUS_QUEUED): 1051 # waiting: Couldn't get locks in time 1052 # queued: Queue is shutting down 1053 assert not op.end_timestamp 1054 else: 1055 # Finalize opcode 1056 op.end_timestamp = TimeStampNow() 1057 1058 if op.status == constants.OP_STATUS_CANCELING: 1059 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1060 for i in job.ops[opctx.index:]) 1061 else: 1062 assert op.status in constants.OPS_FINALIZED 1063 1064 if op.status == constants.OP_STATUS_QUEUED: 1065 # Queue is shutting down 1066 assert not waitjob 1067 1068 finalize = False 1069 1070 # Reset context 1071 job.cur_opctx = None 1072 1073 # In no case must the status be finalized here 1074 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1075 1076 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1077 finalize = False 1078 1079 if not waitjob and opctx.CheckPriorityIncrease(): 1080 # Priority was changed, need to update on-disk file 1081 queue.UpdateJobUnlocked(job) 1082 1083 # Keep around for another round 1084 job.cur_opctx = opctx 1085 1086 assert (op.priority <= constants.OP_PRIO_LOWEST and 1087 op.priority >= constants.OP_PRIO_HIGHEST) 1088 1089 # In no case must the status be finalized here 1090 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1091 1092 else: 1093 # Ensure all opcodes so far have been successful 1094 assert (opctx.index == 0 or 1095 compat.all(i.status == constants.OP_STATUS_SUCCESS 1096 for i in job.ops[:opctx.index])) 1097 1098 # Reset context 1099 job.cur_opctx = None 1100 1101 if op.status == constants.OP_STATUS_SUCCESS: 1102 finalize = False 1103 1104 elif op.status == constants.OP_STATUS_ERROR: 1105 # If we get here, we cannot afford to check for any consistency 1106 # any more, we just want to clean up. 1107 # TODO: Actually, it wouldn't be a bad idea to start a timer 1108 # here to kill the whole process. 1109 to_encode = errors.OpExecError("Preceding opcode failed") 1110 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1111 _EncodeOpError(to_encode)) 1112 finalize = True 1113 elif op.status == constants.OP_STATUS_CANCELING: 1114 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1115 "Job canceled by request") 1116 finalize = True 1117 1118 else: 1119 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1120 1121 if opctx.index == (opcount - 1): 1122 # Finalize on last opcode 1123 finalize = True 1124 1125 if finalize: 1126 # All opcodes have been run, finalize job 1127 job.Finalize() 1128 1129 # Write to disk. If the job status is final, this is the final write 1130 # allowed. Once the file has been written, it can be archived anytime. 1131 queue.UpdateJobUnlocked(job) 1132 1133 assert not waitjob 1134 1135 if finalize: 1136 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1137 return self.FINISHED 1138 1139 assert not waitjob or queue.depmgr.JobWaiting(job) 1140 1141 if waitjob: 1142 return self.WAITDEP 1143 else: 1144 return self.DEFER 1145 finally: 1146 assert job.writable, "Job became read-only while being processed" 1147 queue.release()
1148
1149 1150 -def _EvaluateJobProcessorResult(depmgr, job, result):
1151 """Looks at a result from L{_JobProcessor} for a job. 1152 1153 To be used in a L{_JobQueueWorker}. 1154 1155 """ 1156 if result == _JobProcessor.FINISHED: 1157 # Notify waiting jobs 1158 depmgr.NotifyWaiters(job.id) 1159 1160 elif result == _JobProcessor.DEFER: 1161 # Schedule again 1162 raise workerpool.DeferTask(priority=job.CalcPriority()) 1163 1164 elif result == _JobProcessor.WAITDEP: 1165 # No-op, dependency manager will re-schedule 1166 pass 1167 1168 else: 1169 raise errors.ProgrammerError("Job processor returned unknown status %s" % 1170 (result, ))
1171
1172 1173 -class _JobQueueWorker(workerpool.BaseWorker):
1174 """The actual job workers. 1175 1176 """
1177 - def RunTask(self, job): # pylint: disable=W0221
1178 """Job executor. 1179 1180 @type job: L{_QueuedJob} 1181 @param job: the job to be processed 1182 1183 """ 1184 assert job.writable, "Expected writable job" 1185 1186 # Ensure only one worker is active on a single job. If a job registers for 1187 # a dependency job, and the other job notifies before the first worker is 1188 # done, the job can end up in the tasklist more than once. 1189 job.processor_lock.acquire() 1190 try: 1191 return self._RunTaskInner(job) 1192 finally: 1193 job.processor_lock.release()
1194
1195 - def _RunTaskInner(self, job):
1196 """Executes a job. 1197 1198 Must be called with per-job lock acquired. 1199 1200 """ 1201 queue = job.queue 1202 assert queue == self.pool.queue 1203 1204 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) 1205 setname_fn(None) 1206 1207 proc = mcpu.Processor(queue.context, job.id) 1208 1209 # Create wrapper for setting thread name 1210 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, 1211 proc.ExecOpCode) 1212 1213 _EvaluateJobProcessorResult(queue.depmgr, job, 1214 _JobProcessor(queue, wrap_execop_fn, job)())
1215 1216 @staticmethod
1217 - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1218 """Updates the worker thread name to include a short summary of the opcode. 1219 1220 @param setname_fn: Callable setting worker thread name 1221 @param execop_fn: Callable for executing opcode (usually 1222 L{mcpu.Processor.ExecOpCode}) 1223 1224 """ 1225 setname_fn(op) 1226 try: 1227 return execop_fn(op, *args, **kwargs) 1228 finally: 1229 setname_fn(None)
1230 1231 @staticmethod
1232 - def _GetWorkerName(job, op):
1233 """Sets the worker thread name. 1234 1235 @type job: L{_QueuedJob} 1236 @type op: L{opcodes.OpCode} 1237 1238 """ 1239 parts = ["Job%s" % job.id] 1240 1241 if op: 1242 parts.append(op.TinySummary()) 1243 1244 return "/".join(parts)
1245
1246 1247 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1248 """Simple class implementing a job-processing workerpool. 1249 1250 """
1251 - def __init__(self, queue):
1252 super(_JobQueueWorkerPool, self).__init__("Jq", 1253 JOBQUEUE_THREADS, 1254 _JobQueueWorker) 1255 self.queue = queue
1256
1257 1258 -class _JobDependencyManager:
1259 """Keeps track of job dependencies. 1260 1261 """ 1262 (WAIT, 1263 ERROR, 1264 CANCEL, 1265 CONTINUE, 1266 WRONGSTATUS) = range(1, 6) 1267
1268 - def __init__(self, getstatus_fn, enqueue_fn):
1269 """Initializes this class. 1270 1271 """ 1272 self._getstatus_fn = getstatus_fn 1273 self._enqueue_fn = enqueue_fn 1274 1275 self._waiters = {} 1276 self._lock = locking.SharedLock("JobDepMgr")
1277 1278 @locking.ssynchronized(_LOCK, shared=1)
1279 - def GetLockInfo(self, requested): # pylint: disable=W0613
1280 """Retrieves information about waiting jobs. 1281 1282 @type requested: set 1283 @param requested: Requested information, see C{query.LQ_*} 1284 1285 """ 1286 # No need to sort here, that's being done by the lock manager and query 1287 # library. There are no priorities for notifying jobs, hence all show up as 1288 # one item under "pending". 1289 return [("job/%s" % job_id, None, None, 1290 [("job", [job.id for job in waiters])]) 1291 for job_id, waiters in self._waiters.items() 1292 if waiters]
1293 1294 @locking.ssynchronized(_LOCK, shared=1)
1295 - def JobWaiting(self, job):
1296 """Checks if a job is waiting. 1297 1298 """ 1299 return compat.any(job in jobs 1300 for jobs in self._waiters.values())
1301 1302 @locking.ssynchronized(_LOCK)
1303 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1304 """Checks if a dependency job has the requested status. 1305 1306 If the other job is not yet in a finalized status, the calling job will be 1307 notified (re-added to the workerpool) at a later point. 1308 1309 @type job: L{_QueuedJob} 1310 @param job: Job object 1311 @type dep_job_id: int 1312 @param dep_job_id: ID of dependency job 1313 @type dep_status: list 1314 @param dep_status: Required status 1315 1316 """ 1317 assert ht.TJobId(job.id) 1318 assert ht.TJobId(dep_job_id) 1319 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1320 1321 if job.id == dep_job_id: 1322 return (self.ERROR, "Job can't depend on itself") 1323 1324 # Get status of dependency job 1325 try: 1326 status = self._getstatus_fn(dep_job_id) 1327 except errors.JobLost, err: 1328 return (self.ERROR, "Dependency error: %s" % err) 1329 1330 assert status in constants.JOB_STATUS_ALL 1331 1332 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1333 1334 if status not in constants.JOBS_FINALIZED: 1335 # Register for notification and wait for job to finish 1336 job_id_waiters.add(job) 1337 return (self.WAIT, 1338 "Need to wait for job %s, wanted status '%s'" % 1339 (dep_job_id, dep_status)) 1340 1341 # Remove from waiters list 1342 if job in job_id_waiters: 1343 job_id_waiters.remove(job) 1344 1345 if (status == constants.JOB_STATUS_CANCELED and 1346 constants.JOB_STATUS_CANCELED not in dep_status): 1347 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1348 1349 elif not dep_status or status in dep_status: 1350 return (self.CONTINUE, 1351 "Dependency job %s finished with status '%s'" % 1352 (dep_job_id, status)) 1353 1354 else: 1355 return (self.WRONGSTATUS, 1356 "Dependency job %s finished with status '%s'," 1357 " not one of '%s' as required" % 1358 (dep_job_id, status, utils.CommaJoin(dep_status)))
1359
1360 - def _RemoveEmptyWaitersUnlocked(self):
1361 """Remove all jobs without actual waiters. 1362 1363 """ 1364 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1365 if not waiters]: 1366 del self._waiters[job_id]
1367
1368 - def NotifyWaiters(self, job_id):
1369 """Notifies all jobs waiting for a certain job ID. 1370 1371 @attention: Do not call until L{CheckAndRegister} returned a status other 1372 than C{WAITDEP} for C{job_id}, or behaviour is undefined 1373 @type job_id: int 1374 @param job_id: Job ID 1375 1376 """ 1377 assert ht.TJobId(job_id) 1378 1379 self._lock.acquire() 1380 try: 1381 self._RemoveEmptyWaitersUnlocked() 1382 1383 jobs = self._waiters.pop(job_id, None) 1384 finally: 1385 self._lock.release() 1386 1387 if jobs: 1388 # Re-add jobs to workerpool 1389 logging.debug("Re-adding %s jobs which were waiting for job %s", 1390 len(jobs), job_id) 1391 self._enqueue_fn(jobs)
1392
1393 1394 -class JobQueue(object):
1395 """Queue used to manage the jobs. 1396 1397 """
1398 - def __init__(self, context, cfg):
1399 """Constructor for JobQueue. 1400 1401 The constructor will initialize the job queue object and then 1402 start loading the current jobs from disk, either for starting them 1403 (if they were queue) or for aborting them (if they were already 1404 running). 1405 1406 @type context: GanetiContext 1407 @param context: the context object for access to the configuration 1408 data and other ganeti objects 1409 1410 """ 1411 self.primary_jid = None 1412 self.context = context 1413 self._memcache = weakref.WeakValueDictionary() 1414 self._my_hostname = netutils.Hostname.GetSysName() 1415 1416 # The Big JobQueue lock. If a code block or method acquires it in shared 1417 # mode safe it must guarantee concurrency with all the code acquiring it in 1418 # shared mode, including itself. In order not to acquire it at all 1419 # concurrency must be guaranteed with all code acquiring it in shared mode 1420 # and all code acquiring it exclusively. 1421 self._lock = locking.SharedLock("JobQueue") 1422 1423 self.acquire = self._lock.acquire 1424 self.release = self._lock.release 1425 1426 # Read serial file 1427 self._last_serial = jstore.ReadSerial() 1428 assert self._last_serial is not None, ("Serial file was modified between" 1429 " check in jstore and here") 1430 1431 # Get initial list of nodes 1432 self._nodes = dict((n.name, n.primary_ip) 1433 for n in cfg.GetAllNodesInfo().values() 1434 if n.master_candidate) 1435 1436 # Remove master node 1437 self._nodes.pop(self._my_hostname, None) 1438 1439 # TODO: Check consistency across nodes 1440 1441 self._queue_size = None 1442 self._UpdateQueueSizeUnlocked() 1443 assert ht.TInt(self._queue_size) 1444 1445 # Job dependencies 1446 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, 1447 self._EnqueueJobs) 1448 1449 # Setup worker pool 1450 self._wpool = _JobQueueWorkerPool(self)
1451
1452 - def _PickupJobUnlocked(self, job_id):
1453 """Load a job from the job queue 1454 1455 Pick up a job that already is in the job queue and start/resume it. 1456 1457 """ 1458 if self.primary_jid: 1459 logging.warning("Job process asked to pick up %s, but already has %s", 1460 job_id, self.primary_jid) 1461 1462 self.primary_jid = int(job_id) 1463 1464 job = self._LoadJobUnlocked(job_id) 1465 1466 if job is None: 1467 logging.warning("Job %s could not be read", job_id) 1468 return 1469 1470 job.AddReasons(pickup=True) 1471 1472 status = job.CalcStatus() 1473 if status == constants.JOB_STATUS_QUEUED: 1474 job.SetPid(os.getpid()) 1475 self._EnqueueJobsUnlocked([job]) 1476 logging.info("Restarting job %s", job.id) 1477 1478 elif status in (constants.JOB_STATUS_RUNNING, 1479 constants.JOB_STATUS_WAITING, 1480 constants.JOB_STATUS_CANCELING): 1481 logging.warning("Unfinished job %s found: %s", job.id, job) 1482 1483 if status == constants.JOB_STATUS_WAITING: 1484 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1485 job.SetPid(os.getpid()) 1486 self._EnqueueJobsUnlocked([job]) 1487 logging.info("Restarting job %s", job.id) 1488 else: 1489 to_encode = errors.OpExecError("Unclean master daemon shutdown") 1490 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1491 _EncodeOpError(to_encode)) 1492 job.Finalize() 1493 1494 self.UpdateJobUnlocked(job)
1495 1496 @locking.ssynchronized(_LOCK)
1497 - def PickupJob(self, job_id):
1498 self._PickupJobUnlocked(job_id)
1499
1500 - def _GetRpc(self, address_list):
1501 """Gets RPC runner with context. 1502 1503 """ 1504 return rpc.JobQueueRunner(self.context, address_list)
1505 1506 @locking.ssynchronized(_LOCK)
1507 - def AddNode(self, node):
1508 """Register a new node with the queue. 1509 1510 @type node: L{objects.Node} 1511 @param node: the node object to be added 1512 1513 """ 1514 node_name = node.name 1515 assert node_name != self._my_hostname 1516 1517 # Clean queue directory on added node 1518 result = self._GetRpc(None).call_jobqueue_purge(node_name) 1519 msg = result.fail_msg 1520 if msg: 1521 logging.warning("Cannot cleanup queue directory on node %s: %s", 1522 node_name, msg) 1523 1524 if not node.master_candidate: 1525 # remove if existing, ignoring errors 1526 self._nodes.pop(node_name, None) 1527 # and skip the replication of the job ids 1528 return 1529 1530 # Upload the whole queue excluding archived jobs 1531 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1532 1533 # Upload current serial file 1534 files.append(pathutils.JOB_QUEUE_SERIAL_FILE) 1535 1536 # Static address list 1537 addrs = [node.primary_ip] 1538 1539 for file_name in files: 1540 # Read file content 1541 content = utils.ReadFile(file_name) 1542 1543 result = _CallJqUpdate(self._GetRpc(addrs), [node_name], 1544 file_name, content) 1545 msg = result[node_name].fail_msg 1546 if msg: 1547 logging.error("Failed to upload file %s to node %s: %s", 1548 file_name, node_name, msg) 1549 1550 msg = result[node_name].fail_msg 1551 if msg: 1552 logging.error("Failed to set queue drained flag on node %s: %s", 1553 node_name, msg) 1554 1555 self._nodes[node_name] = node.primary_ip
1556 1557 @locking.ssynchronized(_LOCK)
1558 - def RemoveNode(self, node_name):
1559 """Callback called when removing nodes from the cluster. 1560 1561 @type node_name: str 1562 @param node_name: the name of the node to remove 1563 1564 """ 1565 self._nodes.pop(node_name, None)
1566 1567 @staticmethod
1568 - def _CheckRpcResult(result, nodes, failmsg):
1569 """Verifies the status of an RPC call. 1570 1571 Since we aim to keep consistency should this node (the current 1572 master) fail, we will log errors if our rpc fail, and especially 1573 log the case when more than half of the nodes fails. 1574 1575 @param result: the data as returned from the rpc call 1576 @type nodes: list 1577 @param nodes: the list of nodes we made the call to 1578 @type failmsg: str 1579 @param failmsg: the identifier to be used for logging 1580 1581 """ 1582 failed = [] 1583 success = [] 1584 1585 for node in nodes: 1586 msg = result[node].fail_msg 1587 if msg: 1588 failed.append(node) 1589 logging.error("RPC call %s (%s) failed on node %s: %s", 1590 result[node].call, failmsg, node, msg) 1591 else: 1592 success.append(node) 1593 1594 # +1 for the master node 1595 if (len(success) + 1) < len(failed): 1596 # TODO: Handle failing nodes 1597 logging.error("More than half of the nodes failed")
1598
1599 - def _GetNodeIp(self):
1600 """Helper for returning the node name/ip list. 1601 1602 @rtype: (list, list) 1603 @return: a tuple of two lists, the first one with the node 1604 names and the second one with the node addresses 1605 1606 """ 1607 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1608 name_list = self._nodes.keys() 1609 addr_list = [self._nodes[name] for name in name_list] 1610 return name_list, addr_list
1611
1612 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1613 """Writes a file locally and then replicates it to all nodes. 1614 1615 This function will replace the contents of a file on the local 1616 node and then replicate it to all the other nodes we have. 1617 1618 @type file_name: str 1619 @param file_name: the path of the file to be replicated 1620 @type data: str 1621 @param data: the new contents of the file 1622 @type replicate: boolean 1623 @param replicate: whether to spread the changes to the remote nodes 1624 1625 """ 1626 getents = runtime.GetEnts() 1627 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1628 gid=getents.daemons_gid, 1629 mode=constants.JOB_QUEUE_FILES_PERMS) 1630 1631 if replicate: 1632 names, addrs = self._GetNodeIp() 1633 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) 1634 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1635
1636 - def _RenameFilesUnlocked(self, rename):
1637 """Renames a file locally and then replicate the change. 1638 1639 This function will rename a file in the local queue directory 1640 and then replicate this rename to all the other nodes we have. 1641 1642 @type rename: list of (old, new) 1643 @param rename: List containing tuples mapping old to new names 1644 1645 """ 1646 # Rename them locally 1647 for old, new in rename: 1648 utils.RenameFile(old, new, mkdir=True) 1649 1650 # ... and on all nodes 1651 names, addrs = self._GetNodeIp() 1652 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1653 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1654 1655 @staticmethod
1656 - def _GetJobPath(job_id):
1657 """Returns the job file for a given job id. 1658 1659 @type job_id: str 1660 @param job_id: the job identifier 1661 @rtype: str 1662 @return: the path to the job file 1663 1664 """ 1665 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1666 1667 @staticmethod
1668 - def _GetArchivedJobPath(job_id):
1669 """Returns the archived job file for a give job id. 1670 1671 @type job_id: str 1672 @param job_id: the job identifier 1673 @rtype: str 1674 @return: the path to the archived job file 1675 1676 """ 1677 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, 1678 jstore.GetArchiveDirectory(job_id), 1679 "job-%s" % job_id)
1680 1681 @staticmethod
1682 - def _DetermineJobDirectories(archived):
1683 """Build list of directories containing job files. 1684 1685 @type archived: bool 1686 @param archived: Whether to include directories for archived jobs 1687 @rtype: list 1688 1689 """ 1690 result = [pathutils.QUEUE_DIR] 1691 1692 if archived: 1693 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR 1694 result.extend(map(compat.partial(utils.PathJoin, archive_path), 1695 utils.ListVisibleFiles(archive_path))) 1696 1697 return result
1698 1699 @classmethod
1700 - def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1701 """Return all known job IDs. 1702 1703 The method only looks at disk because it's a requirement that all 1704 jobs are present on disk (so in the _memcache we don't have any 1705 extra IDs). 1706 1707 @type sort: boolean 1708 @param sort: perform sorting on the returned job ids 1709 @rtype: list 1710 @return: the list of job IDs 1711 1712 """ 1713 jlist = [] 1714 1715 for path in cls._DetermineJobDirectories(archived): 1716 for filename in utils.ListVisibleFiles(path): 1717 m = constants.JOB_FILE_RE.match(filename) 1718 if m: 1719 jlist.append(int(m.group(1))) 1720 1721 if sort: 1722 jlist.sort() 1723 return jlist
1724
1725 - def _LoadJobUnlocked(self, job_id):
1726 """Loads a job from the disk or memory. 1727 1728 Given a job id, this will return the cached job object if 1729 existing, or try to load the job from the disk. If loading from 1730 disk, it will also add the job to the cache. 1731 1732 @type job_id: int 1733 @param job_id: the job id 1734 @rtype: L{_QueuedJob} or None 1735 @return: either None or the job object 1736 1737 """ 1738 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!" 1739 1740 job = self._memcache.get(job_id, None) 1741 if job: 1742 logging.debug("Found job %s in memcache", job_id) 1743 assert job.writable, "Found read-only job in memcache" 1744 return job 1745 1746 try: 1747 job = self._LoadJobFromDisk(job_id, False) 1748 if job is None: 1749 return job 1750 except errors.JobFileCorrupted: 1751 old_path = self._GetJobPath(job_id) 1752 new_path = self._GetArchivedJobPath(job_id) 1753 if old_path == new_path: 1754 # job already archived (future case) 1755 logging.exception("Can't parse job %s", job_id) 1756 else: 1757 # non-archived case 1758 logging.exception("Can't parse job %s, will archive.", job_id) 1759 self._RenameFilesUnlocked([(old_path, new_path)]) 1760 return None 1761 1762 assert job.writable, "Job just loaded is not writable" 1763 1764 self._memcache[job_id] = job 1765 logging.debug("Added job %s to the cache", job_id) 1766 return job
1767
1768 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1769 """Load the given job file from disk. 1770 1771 Given a job file, read, load and restore it in a _QueuedJob format. 1772 1773 @type job_id: int 1774 @param job_id: job identifier 1775 @type try_archived: bool 1776 @param try_archived: Whether to try loading an archived job 1777 @rtype: L{_QueuedJob} or None 1778 @return: either None or the job object 1779 1780 """ 1781 path_functions = [(self._GetJobPath, False)] 1782 1783 if try_archived: 1784 path_functions.append((self._GetArchivedJobPath, True)) 1785 1786 raw_data = None 1787 archived = None 1788 1789 for (fn, archived) in path_functions: 1790 filepath = fn(job_id) 1791 logging.debug("Loading job from %s", filepath) 1792 try: 1793 raw_data = utils.ReadFile(filepath) 1794 except EnvironmentError, err: 1795 if err.errno != errno.ENOENT: 1796 raise 1797 else: 1798 break 1799 1800 if not raw_data: 1801 logging.debug("No data available for job %s", job_id) 1802 if int(job_id) == self.primary_jid: 1803 logging.warning("My own job file (%s) disappeared;" 1804 " this should only happy at cluster desctruction", 1805 job_id) 1806 if mcpu.lusExecuting[0] == 0: 1807 logging.warning("Not in execution; cleaning up myself due to missing" 1808 " job file") 1809 logging.shutdown() 1810 os._exit(1) # pylint: disable=W0212 1811 return None 1812 1813 if writable is None: 1814 writable = not archived 1815 1816 try: 1817 data = serializer.LoadJson(raw_data) 1818 job = _QueuedJob.Restore(self, data, writable, archived) 1819 except Exception, err: # pylint: disable=W0703 1820 raise errors.JobFileCorrupted(err) 1821 1822 return job
1823
1824 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1825 """Load the given job file from disk. 1826 1827 Given a job file, read, load and restore it in a _QueuedJob format. 1828 In case of error reading the job, it gets returned as None, and the 1829 exception is logged. 1830 1831 @type job_id: int 1832 @param job_id: job identifier 1833 @type try_archived: bool 1834 @param try_archived: Whether to try loading an archived job 1835 @rtype: L{_QueuedJob} or None 1836 @return: either None or the job object 1837 1838 """ 1839 try: 1840 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 1841 except (errors.JobFileCorrupted, EnvironmentError): 1842 logging.exception("Can't load/parse job %s", job_id) 1843 return None
1844
1845 - def _UpdateQueueSizeUnlocked(self):
1846 """Update the queue size. 1847 1848 """ 1849 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1850 1851 @classmethod
1852 - def SubmitManyJobs(cls, jobs):
1853 """Create and store multiple jobs. 1854 1855 """ 1856 return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
1857 1858 @staticmethod
1859 - def _FormatSubmitError(msg, ops):
1860 """Formats errors which occurred while submitting a job. 1861 1862 """ 1863 return ("%s; opcodes %s" % 1864 (msg, utils.CommaJoin(op.Summary() for op in ops)))
1865 1866 @staticmethod
1867 - def _ResolveJobDependencies(resolve_fn, deps):
1868 """Resolves relative job IDs in dependencies. 1869 1870 @type resolve_fn: callable 1871 @param resolve_fn: Function to resolve a relative job ID 1872 @type deps: list 1873 @param deps: Dependencies 1874 @rtype: tuple; (boolean, string or list) 1875 @return: If successful (first tuple item), the returned list contains 1876 resolved job IDs along with the requested status; if not successful, 1877 the second element is an error message 1878 1879 """ 1880 result = [] 1881 1882 for (dep_job_id, dep_status) in deps: 1883 if ht.TRelativeJobId(dep_job_id): 1884 assert ht.TInt(dep_job_id) and dep_job_id < 0 1885 try: 1886 job_id = resolve_fn(dep_job_id) 1887 except IndexError: 1888 # Abort 1889 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 1890 else: 1891 job_id = dep_job_id 1892 1893 result.append((job_id, dep_status)) 1894 1895 return (True, result)
1896 1897 @locking.ssynchronized(_LOCK)
1898 - def _EnqueueJobs(self, jobs):
1899 """Helper function to add jobs to worker pool's queue. 1900 1901 @type jobs: list 1902 @param jobs: List of all jobs 1903 1904 """ 1905 return self._EnqueueJobsUnlocked(jobs)
1906
1907 - def _EnqueueJobsUnlocked(self, jobs):
1908 """Helper function to add jobs to worker pool's queue. 1909 1910 @type jobs: list 1911 @param jobs: List of all jobs 1912 1913 """ 1914 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" 1915 self._wpool.AddManyTasks([(job, ) for job in jobs], 1916 priority=[job.CalcPriority() for job in jobs], 1917 task_id=map(_GetIdAttr, jobs))
1918
1919 - def _GetJobStatusForDependencies(self, job_id):
1920 """Gets the status of a job for dependencies. 1921 1922 @type job_id: int 1923 @param job_id: Job ID 1924 @raise errors.JobLost: If job can't be found 1925 1926 """ 1927 # Not using in-memory cache as doing so would require an exclusive lock 1928 1929 # Try to load from disk 1930 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 1931 1932 if job: 1933 assert not job.writable, "Got writable job" # pylint: disable=E1101 1934 1935 if job: 1936 return job.CalcStatus() 1937 1938 raise errors.JobLost("Job %s not found" % job_id)
1939
1940 - def UpdateJobUnlocked(self, job, replicate=True):
1941 """Update a job's on disk storage. 1942 1943 After a job has been modified, this function needs to be called in 1944 order to write the changes to disk and replicate them to the other 1945 nodes. 1946 1947 @type job: L{_QueuedJob} 1948 @param job: the changed job 1949 @type replicate: boolean 1950 @param replicate: whether to replicate the change to remote nodes 1951 1952 """ 1953 if __debug__: 1954 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 1955 assert (finalized ^ (job.end_timestamp is None)) 1956 assert job.writable, "Can't update read-only job" 1957 assert not job.archived, "Can't update archived job" 1958 1959 filename = self._GetJobPath(job.id) 1960 data = serializer.DumpJson(job.Serialize()) 1961 logging.debug("Writing job %s to %s", job.id, filename) 1962 self._UpdateJobQueueFile(filename, data, replicate)
1963
1964 - def HasJobBeenFinalized(self, job_id):
1965 """Checks if a job has been finalized. 1966 1967 @type job_id: int 1968 @param job_id: Job identifier 1969 @rtype: boolean 1970 @return: True if the job has been finalized, 1971 False if the timeout has been reached, 1972 None if the job doesn't exist 1973 1974 """ 1975 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 1976 if job is not None: 1977 return job.CalcStatus() in constants.JOBS_FINALIZED 1978 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed: 1979 # FIXME: The above variable is a temporary workaround until the Python job 1980 # queue is completely removed. When removing the job queue, also remove 1981 # the variable from LUClusterDestroy. 1982 return True 1983 else: 1984 return None
1985 1986 @locking.ssynchronized(_LOCK)
1987 - def CancelJob(self, job_id):
1988 """Cancels a job. 1989 1990 This will only succeed if the job has not started yet. 1991 1992 @type job_id: int 1993 @param job_id: job ID of job to be cancelled. 1994 1995 """ 1996 logging.info("Cancelling job %s", job_id) 1997 1998 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1999 2000 @locking.ssynchronized(_LOCK)
2001 - def ChangeJobPriority(self, job_id, priority):
2002 """Changes a job's priority. 2003 2004 @type job_id: int 2005 @param job_id: ID of the job whose priority should be changed 2006 @type priority: int 2007 @param priority: New priority 2008 2009 """ 2010 logging.info("Changing priority of job %s to %s", job_id, priority) 2011 2012 if priority not in constants.OP_PRIO_SUBMIT_VALID: 2013 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2014 raise errors.GenericError("Invalid priority %s, allowed are %s" % 2015 (priority, allowed)) 2016 2017 def fn(job): 2018 (success, msg) = job.ChangePriority(priority) 2019 2020 if success: 2021 try: 2022 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) 2023 except workerpool.NoSuchTask: 2024 logging.debug("Job %s is not in workerpool at this time", job.id) 2025 2026 return (success, msg)
2027 2028 return self._ModifyJobUnlocked(job_id, fn)
2029
2030 - def _ModifyJobUnlocked(self, job_id, mod_fn):
2031 """Modifies a job. 2032 2033 @type job_id: int 2034 @param job_id: Job ID 2035 @type mod_fn: callable 2036 @param mod_fn: Modifying function, receiving job object as parameter, 2037 returning tuple of (status boolean, message string) 2038 2039 """ 2040 job = self._LoadJobUnlocked(job_id) 2041 if not job: 2042 logging.debug("Job %s not found", job_id) 2043 return (False, "Job %s not found" % job_id) 2044 2045 assert job.writable, "Can't modify read-only job" 2046 assert not job.archived, "Can't modify archived job" 2047 2048 (success, msg) = mod_fn(job) 2049 2050 if success: 2051 # If the job was finalized (e.g. cancelled), this is the final write 2052 # allowed. The job can be archived anytime. 2053 self.UpdateJobUnlocked(job) 2054 2055 return (success, msg)
2056
2057 - def _ArchiveJobsUnlocked(self, jobs):
2058 """Archives jobs. 2059 2060 @type jobs: list of L{_QueuedJob} 2061 @param jobs: Job objects 2062 @rtype: int 2063 @return: Number of archived jobs 2064 2065 """ 2066 archive_jobs = [] 2067 rename_files = [] 2068 for job in jobs: 2069 assert job.writable, "Can't archive read-only job" 2070 assert not job.archived, "Can't cancel archived job" 2071 2072 if job.CalcStatus() not in constants.JOBS_FINALIZED: 2073 logging.debug("Job %s is not yet done", job.id) 2074 continue 2075 2076 archive_jobs.append(job) 2077 2078 old = self._GetJobPath(job.id) 2079 new = self._GetArchivedJobPath(job.id) 2080 rename_files.append((old, new)) 2081 2082 # TODO: What if 1..n files fail to rename? 2083 self._RenameFilesUnlocked(rename_files) 2084 2085 logging.debug("Successfully archived job(s) %s", 2086 utils.CommaJoin(job.id for job in archive_jobs)) 2087 2088 # Since we haven't quite checked, above, if we succeeded or failed renaming 2089 # the files, we update the cached queue size from the filesystem. When we 2090 # get around to fix the TODO: above, we can use the number of actually 2091 # archived jobs to fix this. 2092 self._UpdateQueueSizeUnlocked() 2093 return len(archive_jobs)
2094
2095 - def _Query(self, fields, qfilter):
2096 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, 2097 namefield="id") 2098 2099 # Archived jobs are only looked at if the "archived" field is referenced 2100 # either as a requested field or in the filter. By default archived jobs 2101 # are ignored. 2102 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData()) 2103 2104 job_ids = qobj.RequestedNames() 2105 2106 list_all = (job_ids is None) 2107 2108 if list_all: 2109 # Since files are added to/removed from the queue atomically, there's no 2110 # risk of getting the job ids in an inconsistent state. 2111 job_ids = self._GetJobIDsUnlocked(archived=include_archived) 2112 2113 jobs = [] 2114 2115 for job_id in job_ids: 2116 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2117 if job is not None or not list_all: 2118 jobs.append((job_id, job)) 2119 2120 return (qobj, jobs, list_all)
2121
2122 - def QueryJobs(self, fields, qfilter):
2123 """Returns a list of jobs in queue. 2124 2125 @type fields: sequence 2126 @param fields: List of wanted fields 2127 @type qfilter: None or query2 filter (list) 2128 @param qfilter: Query filter 2129 2130 """ 2131 (qobj, ctx, _) = self._Query(fields, qfilter) 2132 2133 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2134
2135 - def OldStyleQueryJobs(self, job_ids, fields):
2136 """Returns a list of jobs in queue. 2137 2138 @type job_ids: list 2139 @param job_ids: sequence of job identifiers or None for all 2140 @type fields: list 2141 @param fields: names of fields to return 2142 @rtype: list 2143 @return: list one element per job, each element being list with 2144 the requested fields 2145 2146 """ 2147 # backwards compat: 2148 job_ids = [int(jid) for jid in job_ids] 2149 qfilter = qlang.MakeSimpleFilter("id", job_ids) 2150 2151 (qobj, ctx, _) = self._Query(fields, qfilter) 2152 2153 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2154 2155 @locking.ssynchronized(_LOCK)
2156 - def PrepareShutdown(self):
2157 """Prepare to stop the job queue. 2158 2159 Returns whether there are any jobs currently running. If the latter is the 2160 case, the job queue is not yet ready for shutdown. Once this function 2161 returns C{True} L{Shutdown} can be called without interfering with any job. 2162 2163 @rtype: bool 2164 @return: Whether there are any running jobs 2165 2166 """ 2167 return self._wpool.HasRunningTasks()
2168 2169 @locking.ssynchronized(_LOCK)
2170 - def Shutdown(self):
2171 """Stops the job queue. 2172 2173 This shutdowns all the worker threads an closes the queue. 2174 2175 """ 2176 self._wpool.TerminateWorkers()
2177