Package ganeti :: Package jqueue
[hide private]
[frames] | no frames]

Source Code for Package ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Module implementing the job queue handling. 
  32   
  33  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  34  used by all other classes in this module. 
  35   
  36  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  37      processing jobs 
  38   
  39  """ 
  40   
  41  import logging 
  42  import errno 
  43  import time 
  44  import weakref 
  45  import threading 
  46  import itertools 
  47  import operator 
  48  import os 
  49   
  50  try: 
  51    # pylint: disable=E0611 
  52    from pyinotify import pyinotify 
  53  except ImportError: 
  54    import pyinotify 
  55   
  56  from ganeti import asyncnotifier 
  57  from ganeti import constants 
  58  from ganeti import serializer 
  59  from ganeti import workerpool 
  60  from ganeti import locking 
  61  from ganeti import luxi 
  62  from ganeti import opcodes 
  63  from ganeti import opcodes_base 
  64  from ganeti import errors 
  65  from ganeti import mcpu 
  66  from ganeti import utils 
  67  from ganeti import jstore 
  68  import ganeti.rpc.node as rpc 
  69  from ganeti import runtime 
  70  from ganeti import netutils 
  71  from ganeti import compat 
  72  from ganeti import ht 
  73  from ganeti import query 
  74  from ganeti import qlang 
  75  from ganeti import pathutils 
  76  from ganeti import vcluster 
  77  from ganeti.cmdlib import cluster 
  78   
  79   
  80  JOBQUEUE_THREADS = 1 
  81   
  82  # member lock names to be passed to @ssynchronized decorator 
  83  _LOCK = "_lock" 
  84  _QUEUE = "_queue" 
  85   
  86  #: Retrieves "id" attribute 
  87  _GetIdAttr = operator.attrgetter("id") 
88 89 90 -class CancelJob(Exception):
91 """Special exception to cancel a job. 92 93 """
94
95 96 -def TimeStampNow():
97 """Returns the current timestamp. 98 99 @rtype: tuple 100 @return: the current time in the (seconds, microseconds) format 101 102 """ 103 return utils.SplitTime(time.time())
104
105 106 -def _CallJqUpdate(runner, names, file_name, content):
107 """Updates job queue file after virtualizing filename. 108 109 """ 110 virt_file_name = vcluster.MakeVirtualPath(file_name) 111 return runner.call_jobqueue_update(names, virt_file_name, content)
112
113 114 -class _QueuedOpCode(object):
115 """Encapsulates an opcode object. 116 117 @ivar log: holds the execution log and consists of tuples 118 of the form C{(log_serial, timestamp, level, message)} 119 @ivar input: the OpCode we encapsulate 120 @ivar status: the current status 121 @ivar result: the result of the LU execution 122 @ivar start_timestamp: timestamp for the start of the execution 123 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 124 @ivar stop_timestamp: timestamp for the end of the execution 125 126 """ 127 __slots__ = ["input", "status", "result", "log", "priority", 128 "start_timestamp", "exec_timestamp", "end_timestamp", 129 "__weakref__"] 130
131 - def __init__(self, op):
132 """Initializes instances of this class. 133 134 @type op: L{opcodes.OpCode} 135 @param op: the opcode we encapsulate 136 137 """ 138 self.input = op 139 self.status = constants.OP_STATUS_QUEUED 140 self.result = None 141 self.log = [] 142 self.start_timestamp = None 143 self.exec_timestamp = None 144 self.end_timestamp = None 145 146 # Get initial priority (it might change during the lifetime of this opcode) 147 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
148 149 @classmethod
150 - def Restore(cls, state):
151 """Restore the _QueuedOpCode from the serialized form. 152 153 @type state: dict 154 @param state: the serialized state 155 @rtype: _QueuedOpCode 156 @return: a new _QueuedOpCode instance 157 158 """ 159 obj = _QueuedOpCode.__new__(cls) 160 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 161 obj.status = state["status"] 162 obj.result = state["result"] 163 obj.log = state["log"] 164 obj.start_timestamp = state.get("start_timestamp", None) 165 obj.exec_timestamp = state.get("exec_timestamp", None) 166 obj.end_timestamp = state.get("end_timestamp", None) 167 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 168 return obj
169
170 - def Serialize(self):
171 """Serializes this _QueuedOpCode. 172 173 @rtype: dict 174 @return: the dictionary holding the serialized state 175 176 """ 177 return { 178 "input": self.input.__getstate__(), 179 "status": self.status, 180 "result": self.result, 181 "log": self.log, 182 "start_timestamp": self.start_timestamp, 183 "exec_timestamp": self.exec_timestamp, 184 "end_timestamp": self.end_timestamp, 185 "priority": self.priority, 186 }
187
188 189 -class _QueuedJob(object):
190 """In-memory job representation. 191 192 This is what we use to track the user-submitted jobs. Locking must 193 be taken care of by users of this class. 194 195 @type queue: L{JobQueue} 196 @ivar queue: the parent queue 197 @ivar id: the job ID 198 @type ops: list 199 @ivar ops: the list of _QueuedOpCode that constitute the job 200 @type log_serial: int 201 @ivar log_serial: holds the index for the next log entry 202 @ivar received_timestamp: the timestamp for when the job was received 203 @ivar start_timestmap: the timestamp for start of execution 204 @ivar end_timestamp: the timestamp for end of execution 205 @ivar writable: Whether the job is allowed to be modified 206 207 """ 208 # pylint: disable=W0212 209 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 210 "received_timestamp", "start_timestamp", "end_timestamp", 211 "processor_lock", "writable", "archived", 212 "livelock", "process_id", 213 "__weakref__"] 214
215 - def AddReasons(self, pickup=False):
216 """Extend the reason trail 217 218 Add the reason for all the opcodes of this job to be executed. 219 220 """ 221 count = 0 222 for queued_op in self.ops: 223 op = queued_op.input 224 if pickup: 225 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP 226 else: 227 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE 228 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__, 229 reason_src_prefix) 230 reason_text = "job=%d;index=%d" % (self.id, count) 231 reason = getattr(op, "reason", []) 232 reason.append((reason_src, reason_text, utils.EpochNano())) 233 op.reason = reason 234 count = count + 1
235
236 - def __init__(self, queue, job_id, ops, writable):
237 """Constructor for the _QueuedJob. 238 239 @type queue: L{JobQueue} 240 @param queue: our parent queue 241 @type job_id: job_id 242 @param job_id: our job id 243 @type ops: list 244 @param ops: the list of opcodes we hold, which will be encapsulated 245 in _QueuedOpCodes 246 @type writable: bool 247 @param writable: Whether job can be modified 248 249 """ 250 if not ops: 251 raise errors.GenericError("A job needs at least one opcode") 252 253 self.queue = queue 254 self.id = int(job_id) 255 self.ops = [_QueuedOpCode(op) for op in ops] 256 self.AddReasons() 257 self.log_serial = 0 258 self.received_timestamp = TimeStampNow() 259 self.start_timestamp = None 260 self.end_timestamp = None 261 self.archived = False 262 self.livelock = None 263 self.process_id = None 264 265 self._InitInMemory(self, writable) 266 267 assert not self.archived, "New jobs can not be marked as archived"
268 269 @staticmethod
270 - def _InitInMemory(obj, writable):
271 """Initializes in-memory variables. 272 273 """ 274 obj.writable = writable 275 obj.ops_iter = None 276 obj.cur_opctx = None 277 278 # Read-only jobs are not processed and therefore don't need a lock 279 if writable: 280 obj.processor_lock = threading.Lock() 281 else: 282 obj.processor_lock = None
283
284 - def __repr__(self):
285 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 286 "id=%s" % self.id, 287 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 288 289 return "<%s at %#x>" % (" ".join(status), id(self))
290 291 @classmethod
292 - def Restore(cls, queue, state, writable, archived):
293 """Restore a _QueuedJob from serialized state: 294 295 @type queue: L{JobQueue} 296 @param queue: to which queue the restored job belongs 297 @type state: dict 298 @param state: the serialized state 299 @type writable: bool 300 @param writable: Whether job can be modified 301 @type archived: bool 302 @param archived: Whether job was already archived 303 @rtype: _JobQueue 304 @return: the restored _JobQueue instance 305 306 """ 307 obj = _QueuedJob.__new__(cls) 308 obj.queue = queue 309 obj.id = int(state["id"]) 310 obj.received_timestamp = state.get("received_timestamp", None) 311 obj.start_timestamp = state.get("start_timestamp", None) 312 obj.end_timestamp = state.get("end_timestamp", None) 313 obj.archived = archived 314 obj.livelock = state.get("livelock", None) 315 obj.process_id = state.get("process_id", None) 316 if obj.process_id is not None: 317 obj.process_id = int(obj.process_id) 318 319 obj.ops = [] 320 obj.log_serial = 0 321 for op_state in state["ops"]: 322 op = _QueuedOpCode.Restore(op_state) 323 for log_entry in op.log: 324 obj.log_serial = max(obj.log_serial, log_entry[0]) 325 obj.ops.append(op) 326 327 cls._InitInMemory(obj, writable) 328 329 return obj
330
331 - def Serialize(self):
332 """Serialize the _JobQueue instance. 333 334 @rtype: dict 335 @return: the serialized state 336 337 """ 338 return { 339 "id": self.id, 340 "ops": [op.Serialize() for op in self.ops], 341 "start_timestamp": self.start_timestamp, 342 "end_timestamp": self.end_timestamp, 343 "received_timestamp": self.received_timestamp, 344 "livelock": self.livelock, 345 "process_id": self.process_id, 346 }
347
348 - def CalcStatus(self):
349 """Compute the status of this job. 350 351 This function iterates over all the _QueuedOpCodes in the job and 352 based on their status, computes the job status. 353 354 The algorithm is: 355 - if we find a cancelled, or finished with error, the job 356 status will be the same 357 - otherwise, the last opcode with the status one of: 358 - waitlock 359 - canceling 360 - running 361 362 will determine the job status 363 364 - otherwise, it means either all opcodes are queued, or success, 365 and the job status will be the same 366 367 @return: the job status 368 369 """ 370 status = constants.JOB_STATUS_QUEUED 371 372 all_success = True 373 for op in self.ops: 374 if op.status == constants.OP_STATUS_SUCCESS: 375 continue 376 377 all_success = False 378 379 if op.status == constants.OP_STATUS_QUEUED: 380 pass 381 elif op.status == constants.OP_STATUS_WAITING: 382 status = constants.JOB_STATUS_WAITING 383 elif op.status == constants.OP_STATUS_RUNNING: 384 status = constants.JOB_STATUS_RUNNING 385 elif op.status == constants.OP_STATUS_CANCELING: 386 status = constants.JOB_STATUS_CANCELING 387 break 388 elif op.status == constants.OP_STATUS_ERROR: 389 status = constants.JOB_STATUS_ERROR 390 # The whole job fails if one opcode failed 391 break 392 elif op.status == constants.OP_STATUS_CANCELED: 393 status = constants.OP_STATUS_CANCELED 394 break 395 396 if all_success: 397 status = constants.JOB_STATUS_SUCCESS 398 399 return status
400
401 - def CalcPriority(self):
402 """Gets the current priority for this job. 403 404 Only unfinished opcodes are considered. When all are done, the default 405 priority is used. 406 407 @rtype: int 408 409 """ 410 priorities = [op.priority for op in self.ops 411 if op.status not in constants.OPS_FINALIZED] 412 413 if not priorities: 414 # All opcodes are done, assume default priority 415 return constants.OP_PRIO_DEFAULT 416 417 return min(priorities)
418
419 - def GetLogEntries(self, newer_than):
420 """Selectively returns the log entries. 421 422 @type newer_than: None or int 423 @param newer_than: if this is None, return all log entries, 424 otherwise return only the log entries with serial higher 425 than this value 426 @rtype: list 427 @return: the list of the log entries selected 428 429 """ 430 if newer_than is None: 431 serial = -1 432 else: 433 serial = newer_than 434 435 entries = [] 436 for op in self.ops: 437 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 438 439 return entries
440
441 - def MarkUnfinishedOps(self, status, result):
442 """Mark unfinished opcodes with a given status and result. 443 444 This is an utility function for marking all running or waiting to 445 be run opcodes with a given status. Opcodes which are already 446 finalised are not changed. 447 448 @param status: a given opcode status 449 @param result: the opcode result 450 451 """ 452 not_marked = True 453 for op in self.ops: 454 if op.status in constants.OPS_FINALIZED: 455 assert not_marked, "Finalized opcodes found after non-finalized ones" 456 continue 457 op.status = status 458 op.result = result 459 not_marked = False
460
461 - def Finalize(self):
462 """Marks the job as finalized. 463 464 """ 465 self.end_timestamp = TimeStampNow()
466
467 - def Cancel(self):
468 """Marks job as canceled/-ing if possible. 469 470 @rtype: tuple; (bool, string) 471 @return: Boolean describing whether job was successfully canceled or marked 472 as canceling and a text message 473 474 """ 475 status = self.CalcStatus() 476 477 if status == constants.JOB_STATUS_QUEUED: 478 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 479 "Job canceled by request") 480 self.Finalize() 481 return (True, "Job %s canceled" % self.id) 482 483 elif status == constants.JOB_STATUS_WAITING: 484 # The worker will notice the new status and cancel the job 485 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 486 return (True, "Job %s will be canceled" % self.id) 487 488 else: 489 logging.debug("Job %s is no longer waiting in the queue", self.id) 490 return (False, "Job %s is no longer waiting in the queue" % self.id)
491
492 - def ChangePriority(self, priority):
493 """Changes the job priority. 494 495 @type priority: int 496 @param priority: New priority 497 @rtype: tuple; (bool, string) 498 @return: Boolean describing whether job's priority was successfully changed 499 and a text message 500 501 """ 502 status = self.CalcStatus() 503 504 if status in constants.JOBS_FINALIZED: 505 return (False, "Job %s is finished" % self.id) 506 elif status == constants.JOB_STATUS_CANCELING: 507 return (False, "Job %s is cancelling" % self.id) 508 else: 509 assert status in (constants.JOB_STATUS_QUEUED, 510 constants.JOB_STATUS_WAITING, 511 constants.JOB_STATUS_RUNNING) 512 513 changed = False 514 for op in self.ops: 515 if (op.status == constants.OP_STATUS_RUNNING or 516 op.status in constants.OPS_FINALIZED): 517 assert not changed, \ 518 ("Found opcode for which priority should not be changed after" 519 " priority has been changed for previous opcodes") 520 continue 521 522 assert op.status in (constants.OP_STATUS_QUEUED, 523 constants.OP_STATUS_WAITING) 524 525 changed = True 526 527 # Set new priority (doesn't modify opcode input) 528 op.priority = priority 529 530 if changed: 531 return (True, ("Priorities of pending opcodes for job %s have been" 532 " changed to %s" % (self.id, priority))) 533 else: 534 return (False, "Job %s had no pending opcodes" % self.id)
535
536 - def SetPid(self, pid):
537 """Sets the job's process ID 538 539 @type pid: int 540 @param pid: the process ID 541 542 """ 543 status = self.CalcStatus() 544 545 if status in (constants.JOB_STATUS_QUEUED, 546 constants.JOB_STATUS_WAITING): 547 if self.process_id is not None: 548 logging.warning("Replacing the process id %s of job %s with %s", 549 self.process_id, self.id, pid) 550 self.process_id = pid 551 else: 552 logging.warning("Can set pid only for queued/waiting jobs")
553
554 555 -class _OpExecCallbacks(mcpu.OpExecCbBase):
556
557 - def __init__(self, queue, job, op):
558 """Initializes this class. 559 560 @type queue: L{JobQueue} 561 @param queue: Job queue 562 @type job: L{_QueuedJob} 563 @param job: Job object 564 @type op: L{_QueuedOpCode} 565 @param op: OpCode 566 567 """ 568 super(_OpExecCallbacks, self).__init__() 569 570 assert queue, "Queue is missing" 571 assert job, "Job is missing" 572 assert op, "Opcode is missing" 573 574 self._queue = queue 575 self._job = job 576 self._op = op
577
578 - def _CheckCancel(self):
579 """Raises an exception to cancel the job if asked to. 580 581 """ 582 # Cancel here if we were asked to 583 if self._op.status == constants.OP_STATUS_CANCELING: 584 logging.debug("Canceling opcode") 585 raise CancelJob()
586 587 @locking.ssynchronized(_QUEUE, shared=1)
588 - def NotifyStart(self):
589 """Mark the opcode as running, not lock-waiting. 590 591 This is called from the mcpu code as a notifier function, when the LU is 592 finally about to start the Exec() method. Of course, to have end-user 593 visible results, the opcode must be initially (before calling into 594 Processor.ExecOpCode) set to OP_STATUS_WAITING. 595 596 """ 597 assert self._op in self._job.ops 598 assert self._op.status in (constants.OP_STATUS_WAITING, 599 constants.OP_STATUS_CANCELING) 600 601 # Cancel here if we were asked to 602 self._CheckCancel() 603 604 logging.debug("Opcode is now running") 605 606 self._op.status = constants.OP_STATUS_RUNNING 607 self._op.exec_timestamp = TimeStampNow() 608 609 # And finally replicate the job status 610 self._queue.UpdateJobUnlocked(self._job)
611 612 @locking.ssynchronized(_QUEUE, shared=1)
613 - def NotifyRetry(self):
614 """Mark opcode again as lock-waiting. 615 616 This is called from the mcpu code just after calling PrepareRetry. 617 The opcode will now again acquire locks (more, hopefully). 618 619 """ 620 self._op.status = constants.OP_STATUS_WAITING 621 logging.debug("Opcode will be retried. Back to waiting.")
622 623 @locking.ssynchronized(_QUEUE, shared=1)
624 - def _AppendFeedback(self, timestamp, log_type, log_msgs):
625 """Internal feedback append function, with locks 626 627 @type timestamp: tuple (int, int) 628 @param timestamp: timestamp of the log message 629 630 @type log_type: string 631 @param log_type: log type (one of Types.ELogType) 632 633 @type log_msgs: any 634 @param log_msgs: log data to append 635 """ 636 637 # This should be removed once Feedback() has a clean interface. 638 # Feedback can be called with anything, we interpret ELogMessageList as 639 # messages that have to be individually added to the log list, but pushed 640 # in a single update. Other msgtypes are only transparently passed forward. 641 if log_type == constants.ELOG_MESSAGE_LIST: 642 log_type = constants.ELOG_MESSAGE 643 else: 644 log_msgs = [log_msgs] 645 646 for msg in log_msgs: 647 self._job.log_serial += 1 648 self._op.log.append((self._job.log_serial, timestamp, log_type, msg)) 649 self._queue.UpdateJobUnlocked(self._job, replicate=False)
650 651 # TODO: Cleanup calling conventions, make them explicit
652 - def Feedback(self, *args):
653 """Append a log entry. 654 655 Calling conventions: 656 arg[0]: (optional) string, message type (Types.ELogType) 657 arg[1]: data to be interpreted as a message 658 """ 659 assert len(args) < 3 660 661 # TODO: Use separate keyword arguments for a single string vs. a list. 662 if len(args) == 1: 663 log_type = constants.ELOG_MESSAGE 664 log_msg = args[0] 665 else: 666 (log_type, log_msg) = args 667 668 # The time is split to make serialization easier and not lose 669 # precision. 670 timestamp = utils.SplitTime(time.time()) 671 self._AppendFeedback(timestamp, log_type, log_msg)
672
673 - def CurrentPriority(self):
674 """Returns current priority for opcode. 675 676 """ 677 assert self._op.status in (constants.OP_STATUS_WAITING, 678 constants.OP_STATUS_CANCELING) 679 680 # Cancel here if we were asked to 681 self._CheckCancel() 682 683 return self._op.priority
684
685 - def SubmitManyJobs(self, jobs):
686 """Submits jobs for processing. 687 688 See L{JobQueue.SubmitManyJobs}. 689 690 """ 691 # Locking is done in job queue 692 return self._queue.SubmitManyJobs(jobs)
693
694 695 -def _EncodeOpError(err):
696 """Encodes an error which occurred while processing an opcode. 697 698 """ 699 if isinstance(err, errors.GenericError): 700 to_encode = err 701 else: 702 to_encode = errors.OpExecError(str(err)) 703 704 return errors.EncodeException(to_encode)
705
706 707 -class _TimeoutStrategyWrapper:
708 - def __init__(self, fn):
709 """Initializes this class. 710 711 """ 712 self._fn = fn 713 self._next = None
714
715 - def _Advance(self):
716 """Gets the next timeout if necessary. 717 718 """ 719 if self._next is None: 720 self._next = self._fn()
721
722 - def Peek(self):
723 """Returns the next timeout. 724 725 """ 726 self._Advance() 727 return self._next
728
729 - def Next(self):
730 """Returns the current timeout and advances the internal state. 731 732 """ 733 self._Advance() 734 result = self._next 735 self._next = None 736 return result
737
738 739 -class _OpExecContext:
740 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
741 """Initializes this class. 742 743 """ 744 self.op = op 745 self.index = index 746 self.log_prefix = log_prefix 747 self.summary = op.input.Summary() 748 749 # Create local copy to modify 750 if getattr(op.input, opcodes_base.DEPEND_ATTR, None): 751 self.jobdeps = op.input.depends[:] 752 else: 753 self.jobdeps = None 754 755 self._timeout_strategy_factory = timeout_strategy_factory 756 self._ResetTimeoutStrategy()
757
758 - def _ResetTimeoutStrategy(self):
759 """Creates a new timeout strategy. 760 761 """ 762 self._timeout_strategy = \ 763 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
764
765 - def CheckPriorityIncrease(self):
766 """Checks whether priority can and should be increased. 767 768 Called when locks couldn't be acquired. 769 770 """ 771 op = self.op 772 773 # Exhausted all retries and next round should not use blocking acquire 774 # for locks? 775 if (self._timeout_strategy.Peek() is None and 776 op.priority > constants.OP_PRIO_HIGHEST): 777 logging.debug("Increasing priority") 778 op.priority -= 1 779 self._ResetTimeoutStrategy() 780 return True 781 782 return False
783
784 - def GetNextLockTimeout(self):
785 """Returns the next lock acquire timeout. 786 787 """ 788 return self._timeout_strategy.Next()
789
790 791 -class _JobProcessor(object):
792 (DEFER, 793 WAITDEP, 794 FINISHED) = range(1, 4) 795
796 - def __init__(self, queue, opexec_fn, job, 797 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
798 """Initializes this class. 799 800 """ 801 self.queue = queue 802 self.opexec_fn = opexec_fn 803 self.job = job 804 self._timeout_strategy_factory = _timeout_strategy_factory
805 806 @staticmethod
807 - def _FindNextOpcode(job, timeout_strategy_factory):
808 """Locates the next opcode to run. 809 810 @type job: L{_QueuedJob} 811 @param job: Job object 812 @param timeout_strategy_factory: Callable to create new timeout strategy 813 814 """ 815 # Create some sort of a cache to speed up locating next opcode for future 816 # lookups 817 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 818 # pending and one for processed ops. 819 if job.ops_iter is None: 820 job.ops_iter = enumerate(job.ops) 821 822 # Find next opcode to run 823 while True: 824 try: 825 (idx, op) = job.ops_iter.next() 826 except StopIteration: 827 raise errors.ProgrammerError("Called for a finished job") 828 829 if op.status == constants.OP_STATUS_RUNNING: 830 # Found an opcode already marked as running 831 raise errors.ProgrammerError("Called for job marked as running") 832 833 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 834 timeout_strategy_factory) 835 836 if op.status not in constants.OPS_FINALIZED: 837 return opctx 838 839 # This is a job that was partially completed before master daemon 840 # shutdown, so it can be expected that some opcodes are already 841 # completed successfully (if any did error out, then the whole job 842 # should have been aborted and not resubmitted for processing). 843 logging.info("%s: opcode %s already processed, skipping", 844 opctx.log_prefix, opctx.summary)
845 846 @staticmethod
847 - def _MarkWaitlock(job, op):
848 """Marks an opcode as waiting for locks. 849 850 The job's start timestamp is also set if necessary. 851 852 @type job: L{_QueuedJob} 853 @param job: Job object 854 @type op: L{_QueuedOpCode} 855 @param op: Opcode object 856 857 """ 858 assert op in job.ops 859 assert op.status in (constants.OP_STATUS_QUEUED, 860 constants.OP_STATUS_WAITING) 861 862 update = False 863 864 op.result = None 865 866 if op.status == constants.OP_STATUS_QUEUED: 867 op.status = constants.OP_STATUS_WAITING 868 update = True 869 870 if op.start_timestamp is None: 871 op.start_timestamp = TimeStampNow() 872 update = True 873 874 if job.start_timestamp is None: 875 job.start_timestamp = op.start_timestamp 876 update = True 877 878 assert op.status == constants.OP_STATUS_WAITING 879 880 return update
881 882 @staticmethod
883 - def _CheckDependencies(queue, job, opctx):
884 """Checks if an opcode has dependencies and if so, processes them. 885 886 @type queue: L{JobQueue} 887 @param queue: Queue object 888 @type job: L{_QueuedJob} 889 @param job: Job object 890 @type opctx: L{_OpExecContext} 891 @param opctx: Opcode execution context 892 @rtype: bool 893 @return: Whether opcode will be re-scheduled by dependency tracker 894 895 """ 896 op = opctx.op 897 898 result = False 899 900 while opctx.jobdeps: 901 (dep_job_id, dep_status) = opctx.jobdeps[0] 902 903 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, 904 dep_status) 905 assert ht.TNonEmptyString(depmsg), "No dependency message" 906 907 logging.info("%s: %s", opctx.log_prefix, depmsg) 908 909 if depresult == _JobDependencyManager.CONTINUE: 910 # Remove dependency and continue 911 opctx.jobdeps.pop(0) 912 913 elif depresult == _JobDependencyManager.WAIT: 914 # Need to wait for notification, dependency tracker will re-add job 915 # to workerpool 916 result = True 917 break 918 919 elif depresult == _JobDependencyManager.CANCEL: 920 # Job was cancelled, cancel this job as well 921 job.Cancel() 922 assert op.status == constants.OP_STATUS_CANCELING 923 break 924 925 elif depresult in (_JobDependencyManager.WRONGSTATUS, 926 _JobDependencyManager.ERROR): 927 # Job failed or there was an error, this job must fail 928 op.status = constants.OP_STATUS_ERROR 929 op.result = _EncodeOpError(errors.OpExecError(depmsg)) 930 break 931 932 else: 933 raise errors.ProgrammerError("Unknown dependency result '%s'" % 934 depresult) 935 936 return result
937
938 - def _ExecOpCodeUnlocked(self, opctx):
939 """Processes one opcode and returns the result. 940 941 """ 942 op = opctx.op 943 944 assert op.status in (constants.OP_STATUS_WAITING, 945 constants.OP_STATUS_CANCELING) 946 947 # The very last check if the job was cancelled before trying to execute 948 if op.status == constants.OP_STATUS_CANCELING: 949 return (constants.OP_STATUS_CANCELING, None) 950 951 timeout = opctx.GetNextLockTimeout() 952 953 try: 954 # Make sure not to hold queue lock while calling ExecOpCode 955 result = self.opexec_fn(op.input, 956 _OpExecCallbacks(self.queue, self.job, op), 957 timeout=timeout) 958 except mcpu.LockAcquireTimeout: 959 assert timeout is not None, "Received timeout for blocking acquire" 960 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 961 962 assert op.status in (constants.OP_STATUS_WAITING, 963 constants.OP_STATUS_CANCELING) 964 965 # Was job cancelled while we were waiting for the lock? 966 if op.status == constants.OP_STATUS_CANCELING: 967 return (constants.OP_STATUS_CANCELING, None) 968 969 # Stay in waitlock while trying to re-acquire lock 970 return (constants.OP_STATUS_WAITING, None) 971 except CancelJob: 972 logging.exception("%s: Canceling job", opctx.log_prefix) 973 assert op.status == constants.OP_STATUS_CANCELING 974 return (constants.OP_STATUS_CANCELING, None) 975 976 except Exception, err: # pylint: disable=W0703 977 logging.exception("%s: Caught exception in %s", 978 opctx.log_prefix, opctx.summary) 979 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 980 else: 981 logging.debug("%s: %s successful", 982 opctx.log_prefix, opctx.summary) 983 return (constants.OP_STATUS_SUCCESS, result)
984
985 - def __call__(self, _nextop_fn=None):
986 """Continues execution of a job. 987 988 @param _nextop_fn: Callback function for tests 989 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should 990 be deferred and C{WAITDEP} if the dependency manager 991 (L{_JobDependencyManager}) will re-schedule the job when appropriate 992 993 """ 994 queue = self.queue 995 job = self.job 996 997 logging.debug("Processing job %s", job.id) 998 999 queue.acquire(shared=1) 1000 try: 1001 opcount = len(job.ops) 1002 1003 assert job.writable, "Expected writable job" 1004 1005 # Don't do anything for finalized jobs 1006 if job.CalcStatus() in constants.JOBS_FINALIZED: 1007 return self.FINISHED 1008 1009 # Is a previous opcode still pending? 1010 if job.cur_opctx: 1011 opctx = job.cur_opctx 1012 job.cur_opctx = None 1013 else: 1014 if __debug__ and _nextop_fn: 1015 _nextop_fn() 1016 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 1017 1018 op = opctx.op 1019 1020 # Consistency check 1021 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 1022 constants.OP_STATUS_CANCELING) 1023 for i in job.ops[opctx.index + 1:]) 1024 1025 assert op.status in (constants.OP_STATUS_QUEUED, 1026 constants.OP_STATUS_WAITING, 1027 constants.OP_STATUS_CANCELING) 1028 1029 assert (op.priority <= constants.OP_PRIO_LOWEST and 1030 op.priority >= constants.OP_PRIO_HIGHEST) 1031 1032 waitjob = None 1033 1034 if op.status != constants.OP_STATUS_CANCELING: 1035 assert op.status in (constants.OP_STATUS_QUEUED, 1036 constants.OP_STATUS_WAITING) 1037 1038 # Prepare to start opcode 1039 if self._MarkWaitlock(job, op): 1040 # Write to disk 1041 queue.UpdateJobUnlocked(job) 1042 1043 assert op.status == constants.OP_STATUS_WAITING 1044 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1045 assert job.start_timestamp and op.start_timestamp 1046 assert waitjob is None 1047 1048 # Check if waiting for a job is necessary 1049 waitjob = self._CheckDependencies(queue, job, opctx) 1050 1051 assert op.status in (constants.OP_STATUS_WAITING, 1052 constants.OP_STATUS_CANCELING, 1053 constants.OP_STATUS_ERROR) 1054 1055 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, 1056 constants.OP_STATUS_ERROR)): 1057 logging.info("%s: opcode %s waiting for locks", 1058 opctx.log_prefix, opctx.summary) 1059 1060 assert not opctx.jobdeps, "Not all dependencies were removed" 1061 1062 queue.release() 1063 try: 1064 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1065 finally: 1066 queue.acquire(shared=1) 1067 1068 op.status = op_status 1069 op.result = op_result 1070 1071 assert not waitjob 1072 1073 if op.status in (constants.OP_STATUS_WAITING, 1074 constants.OP_STATUS_QUEUED): 1075 # waiting: Couldn't get locks in time 1076 # queued: Queue is shutting down 1077 assert not op.end_timestamp 1078 else: 1079 # Finalize opcode 1080 op.end_timestamp = TimeStampNow() 1081 1082 if op.status == constants.OP_STATUS_CANCELING: 1083 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1084 for i in job.ops[opctx.index:]) 1085 else: 1086 assert op.status in constants.OPS_FINALIZED 1087 1088 if op.status == constants.OP_STATUS_QUEUED: 1089 # Queue is shutting down 1090 assert not waitjob 1091 1092 finalize = False 1093 1094 # Reset context 1095 job.cur_opctx = None 1096 1097 # In no case must the status be finalized here 1098 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1099 1100 elif op.status == constants.OP_STATUS_WAITING or waitjob: 1101 finalize = False 1102 1103 if not waitjob and opctx.CheckPriorityIncrease(): 1104 # Priority was changed, need to update on-disk file 1105 queue.UpdateJobUnlocked(job) 1106 1107 # Keep around for another round 1108 job.cur_opctx = opctx 1109 1110 assert (op.priority <= constants.OP_PRIO_LOWEST and 1111 op.priority >= constants.OP_PRIO_HIGHEST) 1112 1113 # In no case must the status be finalized here 1114 assert job.CalcStatus() == constants.JOB_STATUS_WAITING 1115 1116 else: 1117 # Ensure all opcodes so far have been successful 1118 assert (opctx.index == 0 or 1119 compat.all(i.status == constants.OP_STATUS_SUCCESS 1120 for i in job.ops[:opctx.index])) 1121 1122 # Reset context 1123 job.cur_opctx = None 1124 1125 if op.status == constants.OP_STATUS_SUCCESS: 1126 finalize = False 1127 1128 elif op.status == constants.OP_STATUS_ERROR: 1129 # If we get here, we cannot afford to check for any consistency 1130 # any more, we just want to clean up. 1131 # TODO: Actually, it wouldn't be a bad idea to start a timer 1132 # here to kill the whole process. 1133 to_encode = errors.OpExecError("Preceding opcode failed") 1134 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1135 _EncodeOpError(to_encode)) 1136 finalize = True 1137 elif op.status == constants.OP_STATUS_CANCELING: 1138 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1139 "Job canceled by request") 1140 finalize = True 1141 1142 else: 1143 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1144 1145 if opctx.index == (opcount - 1): 1146 # Finalize on last opcode 1147 finalize = True 1148 1149 if finalize: 1150 # All opcodes have been run, finalize job 1151 job.Finalize() 1152 1153 # Write to disk. If the job status is final, this is the final write 1154 # allowed. Once the file has been written, it can be archived anytime. 1155 queue.UpdateJobUnlocked(job) 1156 1157 assert not waitjob 1158 1159 if finalize: 1160 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1161 return self.FINISHED 1162 1163 assert not waitjob or queue.depmgr.JobWaiting(job) 1164 1165 if waitjob: 1166 return self.WAITDEP 1167 else: 1168 return self.DEFER 1169 finally: 1170 assert job.writable, "Job became read-only while being processed" 1171 queue.release()
1172
1173 1174 -def _EvaluateJobProcessorResult(depmgr, job, result):
1175 """Looks at a result from L{_JobProcessor} for a job. 1176 1177 To be used in a L{_JobQueueWorker}. 1178 1179 """ 1180 if result == _JobProcessor.FINISHED: 1181 # Notify waiting jobs 1182 depmgr.NotifyWaiters(job.id) 1183 1184 elif result == _JobProcessor.DEFER: 1185 # Schedule again 1186 raise workerpool.DeferTask(priority=job.CalcPriority()) 1187 1188 elif result == _JobProcessor.WAITDEP: 1189 # No-op, dependency manager will re-schedule 1190 pass 1191 1192 else: 1193 raise errors.ProgrammerError("Job processor returned unknown status %s" % 1194 (result, ))
1195
1196 1197 -class _JobQueueWorker(workerpool.BaseWorker):
1198 """The actual job workers. 1199 1200 """
1201 - def RunTask(self, job): # pylint: disable=W0221
1202 """Job executor. 1203 1204 @type job: L{_QueuedJob} 1205 @param job: the job to be processed 1206 1207 """ 1208 assert job.writable, "Expected writable job" 1209 1210 # Ensure only one worker is active on a single job. If a job registers for 1211 # a dependency job, and the other job notifies before the first worker is 1212 # done, the job can end up in the tasklist more than once. 1213 job.processor_lock.acquire() 1214 try: 1215 return self._RunTaskInner(job) 1216 finally: 1217 job.processor_lock.release()
1218
1219 - def _RunTaskInner(self, job):
1220 """Executes a job. 1221 1222 Must be called with per-job lock acquired. 1223 1224 """ 1225 queue = job.queue 1226 assert queue == self.pool.queue 1227 1228 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) 1229 setname_fn(None) 1230 1231 proc = mcpu.Processor(queue.context, job.id) 1232 1233 # Create wrapper for setting thread name 1234 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, 1235 proc.ExecOpCode) 1236 1237 _EvaluateJobProcessorResult(queue.depmgr, job, 1238 _JobProcessor(queue, wrap_execop_fn, job)())
1239 1240 @staticmethod
1241 - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1242 """Updates the worker thread name to include a short summary of the opcode. 1243 1244 @param setname_fn: Callable setting worker thread name 1245 @param execop_fn: Callable for executing opcode (usually 1246 L{mcpu.Processor.ExecOpCode}) 1247 1248 """ 1249 setname_fn(op) 1250 try: 1251 return execop_fn(op, *args, **kwargs) 1252 finally: 1253 setname_fn(None)
1254 1255 @staticmethod
1256 - def _GetWorkerName(job, op):
1257 """Sets the worker thread name. 1258 1259 @type job: L{_QueuedJob} 1260 @type op: L{opcodes.OpCode} 1261 1262 """ 1263 parts = ["Job%s" % job.id] 1264 1265 if op: 1266 parts.append(op.TinySummary()) 1267 1268 return "/".join(parts)
1269
1270 1271 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1272 """Simple class implementing a job-processing workerpool. 1273 1274 """
1275 - def __init__(self, queue):
1276 super(_JobQueueWorkerPool, self).__init__("Jq", 1277 JOBQUEUE_THREADS, 1278 _JobQueueWorker) 1279 self.queue = queue
1280
1281 1282 -class _JobDependencyManager:
1283 """Keeps track of job dependencies. 1284 1285 """ 1286 (WAIT, 1287 ERROR, 1288 CANCEL, 1289 CONTINUE, 1290 WRONGSTATUS) = range(1, 6) 1291
1292 - def __init__(self, getstatus_fn, enqueue_fn):
1293 """Initializes this class. 1294 1295 """ 1296 self._getstatus_fn = getstatus_fn 1297 self._enqueue_fn = enqueue_fn 1298 1299 self._waiters = {} 1300 self._lock = locking.SharedLock("JobDepMgr")
1301 1302 @locking.ssynchronized(_LOCK, shared=1)
1303 - def GetLockInfo(self, requested): # pylint: disable=W0613
1304 """Retrieves information about waiting jobs. 1305 1306 @type requested: set 1307 @param requested: Requested information, see C{query.LQ_*} 1308 1309 """ 1310 # No need to sort here, that's being done by the lock manager and query 1311 # library. There are no priorities for notifying jobs, hence all show up as 1312 # one item under "pending". 1313 return [("job/%s" % job_id, None, None, 1314 [("job", [job.id for job in waiters])]) 1315 for job_id, waiters in self._waiters.items() 1316 if waiters]
1317 1318 @locking.ssynchronized(_LOCK, shared=1)
1319 - def JobWaiting(self, job):
1320 """Checks if a job is waiting. 1321 1322 """ 1323 return compat.any(job in jobs 1324 for jobs in self._waiters.values())
1325 1326 @locking.ssynchronized(_LOCK)
1327 - def CheckAndRegister(self, job, dep_job_id, dep_status):
1328 """Checks if a dependency job has the requested status. 1329 1330 If the other job is not yet in a finalized status, the calling job will be 1331 notified (re-added to the workerpool) at a later point. 1332 1333 @type job: L{_QueuedJob} 1334 @param job: Job object 1335 @type dep_job_id: int 1336 @param dep_job_id: ID of dependency job 1337 @type dep_status: list 1338 @param dep_status: Required status 1339 1340 """ 1341 assert ht.TJobId(job.id) 1342 assert ht.TJobId(dep_job_id) 1343 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) 1344 1345 if job.id == dep_job_id: 1346 return (self.ERROR, "Job can't depend on itself") 1347 1348 # Get status of dependency job 1349 try: 1350 status = self._getstatus_fn(dep_job_id) 1351 except errors.JobLost, err: 1352 return (self.ERROR, "Dependency error: %s" % err) 1353 1354 assert status in constants.JOB_STATUS_ALL 1355 1356 job_id_waiters = self._waiters.setdefault(dep_job_id, set()) 1357 1358 if status not in constants.JOBS_FINALIZED: 1359 # Register for notification and wait for job to finish 1360 job_id_waiters.add(job) 1361 return (self.WAIT, 1362 "Need to wait for job %s, wanted status '%s'" % 1363 (dep_job_id, dep_status)) 1364 1365 # Remove from waiters list 1366 if job in job_id_waiters: 1367 job_id_waiters.remove(job) 1368 1369 if (status == constants.JOB_STATUS_CANCELED and 1370 constants.JOB_STATUS_CANCELED not in dep_status): 1371 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) 1372 1373 elif not dep_status or status in dep_status: 1374 return (self.CONTINUE, 1375 "Dependency job %s finished with status '%s'" % 1376 (dep_job_id, status)) 1377 1378 else: 1379 return (self.WRONGSTATUS, 1380 "Dependency job %s finished with status '%s'," 1381 " not one of '%s' as required" % 1382 (dep_job_id, status, utils.CommaJoin(dep_status)))
1383
1384 - def _RemoveEmptyWaitersUnlocked(self):
1385 """Remove all jobs without actual waiters. 1386 1387 """ 1388 for job_id in [job_id for (job_id, waiters) in self._waiters.items() 1389 if not waiters]: 1390 del self._waiters[job_id]
1391
1392 - def NotifyWaiters(self, job_id):
1393 """Notifies all jobs waiting for a certain job ID. 1394 1395 @attention: Do not call until L{CheckAndRegister} returned a status other 1396 than C{WAITDEP} for C{job_id}, or behaviour is undefined 1397 @type job_id: int 1398 @param job_id: Job ID 1399 1400 """ 1401 assert ht.TJobId(job_id) 1402 1403 self._lock.acquire() 1404 try: 1405 self._RemoveEmptyWaitersUnlocked() 1406 1407 jobs = self._waiters.pop(job_id, None) 1408 finally: 1409 self._lock.release() 1410 1411 if jobs: 1412 # Re-add jobs to workerpool 1413 logging.debug("Re-adding %s jobs which were waiting for job %s", 1414 len(jobs), job_id) 1415 self._enqueue_fn(jobs)
1416
1417 1418 -class JobQueue(object):
1419 """Queue used to manage the jobs. 1420 1421 """
1422 - def __init__(self, context, cfg):
1423 """Constructor for JobQueue. 1424 1425 The constructor will initialize the job queue object and then 1426 start loading the current jobs from disk, either for starting them 1427 (if they were queue) or for aborting them (if they were already 1428 running). 1429 1430 @type context: GanetiContext 1431 @param context: the context object for access to the configuration 1432 data and other ganeti objects 1433 1434 """ 1435 self.primary_jid = None 1436 self.context = context 1437 self._memcache = weakref.WeakValueDictionary() 1438 self._my_hostname = netutils.Hostname.GetSysName() 1439 1440 # The Big JobQueue lock. If a code block or method acquires it in shared 1441 # mode safe it must guarantee concurrency with all the code acquiring it in 1442 # shared mode, including itself. In order not to acquire it at all 1443 # concurrency must be guaranteed with all code acquiring it in shared mode 1444 # and all code acquiring it exclusively. 1445 self._lock = locking.SharedLock("JobQueue") 1446 1447 self.acquire = self._lock.acquire 1448 self.release = self._lock.release 1449 1450 # Read serial file 1451 self._last_serial = jstore.ReadSerial() 1452 assert self._last_serial is not None, ("Serial file was modified between" 1453 " check in jstore and here") 1454 1455 # Get initial list of nodes 1456 self._nodes = dict((n.name, n.primary_ip) 1457 for n in cfg.GetAllNodesInfo().values() 1458 if n.master_candidate) 1459 1460 # Remove master node 1461 self._nodes.pop(self._my_hostname, None) 1462 1463 # TODO: Check consistency across nodes 1464 1465 self._queue_size = None 1466 self._UpdateQueueSizeUnlocked() 1467 assert ht.TInt(self._queue_size) 1468 1469 # Job dependencies 1470 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, 1471 self._EnqueueJobs) 1472 1473 # Setup worker pool 1474 self._wpool = _JobQueueWorkerPool(self)
1475
1476 - def _PickupJobUnlocked(self, job_id):
1477 """Load a job from the job queue 1478 1479 Pick up a job that already is in the job queue and start/resume it. 1480 1481 """ 1482 if self.primary_jid: 1483 logging.warning("Job process asked to pick up %s, but already has %s", 1484 job_id, self.primary_jid) 1485 1486 self.primary_jid = int(job_id) 1487 1488 job = self._LoadJobUnlocked(job_id) 1489 1490 if job is None: 1491 logging.warning("Job %s could not be read", job_id) 1492 return 1493 1494 job.AddReasons(pickup=True) 1495 1496 status = job.CalcStatus() 1497 if status == constants.JOB_STATUS_QUEUED: 1498 job.SetPid(os.getpid()) 1499 self._EnqueueJobsUnlocked([job]) 1500 logging.info("Restarting job %s", job.id) 1501 1502 elif status in (constants.JOB_STATUS_RUNNING, 1503 constants.JOB_STATUS_WAITING, 1504 constants.JOB_STATUS_CANCELING): 1505 logging.warning("Unfinished job %s found: %s", job.id, job) 1506 1507 if status == constants.JOB_STATUS_WAITING: 1508 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1509 job.SetPid(os.getpid()) 1510 self._EnqueueJobsUnlocked([job]) 1511 logging.info("Restarting job %s", job.id) 1512 else: 1513 to_encode = errors.OpExecError("Unclean master daemon shutdown") 1514 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1515 _EncodeOpError(to_encode)) 1516 job.Finalize() 1517 1518 self.UpdateJobUnlocked(job)
1519 1520 @locking.ssynchronized(_LOCK)
1521 - def PickupJob(self, job_id):
1522 self._PickupJobUnlocked(job_id)
1523
1524 - def _GetRpc(self, address_list):
1525 """Gets RPC runner with context. 1526 1527 """ 1528 return rpc.JobQueueRunner(self.context, address_list)
1529 1530 @locking.ssynchronized(_LOCK)
1531 - def AddNode(self, node):
1532 """Register a new node with the queue. 1533 1534 @type node: L{objects.Node} 1535 @param node: the node object to be added 1536 1537 """ 1538 node_name = node.name 1539 assert node_name != self._my_hostname 1540 1541 # Clean queue directory on added node 1542 result = self._GetRpc(None).call_jobqueue_purge(node_name) 1543 msg = result.fail_msg 1544 if msg: 1545 logging.warning("Cannot cleanup queue directory on node %s: %s", 1546 node_name, msg) 1547 1548 if not node.master_candidate: 1549 # remove if existing, ignoring errors 1550 self._nodes.pop(node_name, None) 1551 # and skip the replication of the job ids 1552 return 1553 1554 # Upload the whole queue excluding archived jobs 1555 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1556 1557 # Upload current serial file 1558 files.append(pathutils.JOB_QUEUE_SERIAL_FILE) 1559 1560 # Static address list 1561 addrs = [node.primary_ip] 1562 1563 for file_name in files: 1564 # Read file content 1565 content = utils.ReadFile(file_name) 1566 1567 result = _CallJqUpdate(self._GetRpc(addrs), [node_name], 1568 file_name, content) 1569 msg = result[node_name].fail_msg 1570 if msg: 1571 logging.error("Failed to upload file %s to node %s: %s", 1572 file_name, node_name, msg) 1573 1574 msg = result[node_name].fail_msg 1575 if msg: 1576 logging.error("Failed to set queue drained flag on node %s: %s", 1577 node_name, msg) 1578 1579 self._nodes[node_name] = node.primary_ip
1580 1581 @locking.ssynchronized(_LOCK)
1582 - def RemoveNode(self, node_name):
1583 """Callback called when removing nodes from the cluster. 1584 1585 @type node_name: str 1586 @param node_name: the name of the node to remove 1587 1588 """ 1589 self._nodes.pop(node_name, None)
1590 1591 @staticmethod
1592 - def _CheckRpcResult(result, nodes, failmsg):
1593 """Verifies the status of an RPC call. 1594 1595 Since we aim to keep consistency should this node (the current 1596 master) fail, we will log errors if our rpc fail, and especially 1597 log the case when more than half of the nodes fails. 1598 1599 @param result: the data as returned from the rpc call 1600 @type nodes: list 1601 @param nodes: the list of nodes we made the call to 1602 @type failmsg: str 1603 @param failmsg: the identifier to be used for logging 1604 1605 """ 1606 failed = [] 1607 success = [] 1608 1609 for node in nodes: 1610 msg = result[node].fail_msg 1611 if msg: 1612 failed.append(node) 1613 logging.error("RPC call %s (%s) failed on node %s: %s", 1614 result[node].call, failmsg, node, msg) 1615 else: 1616 success.append(node) 1617 1618 # +1 for the master node 1619 if (len(success) + 1) < len(failed): 1620 # TODO: Handle failing nodes 1621 logging.error("More than half of the nodes failed")
1622
1623 - def _GetNodeIp(self):
1624 """Helper for returning the node name/ip list. 1625 1626 @rtype: (list, list) 1627 @return: a tuple of two lists, the first one with the node 1628 names and the second one with the node addresses 1629 1630 """ 1631 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1632 name_list = self._nodes.keys() 1633 addr_list = [self._nodes[name] for name in name_list] 1634 return name_list, addr_list
1635
1636 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1637 """Writes a file locally and then replicates it to all nodes. 1638 1639 This function will replace the contents of a file on the local 1640 node and then replicate it to all the other nodes we have. 1641 1642 @type file_name: str 1643 @param file_name: the path of the file to be replicated 1644 @type data: str 1645 @param data: the new contents of the file 1646 @type replicate: boolean 1647 @param replicate: whether to spread the changes to the remote nodes 1648 1649 """ 1650 getents = runtime.GetEnts() 1651 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1652 gid=getents.daemons_gid, 1653 mode=constants.JOB_QUEUE_FILES_PERMS) 1654 1655 if replicate: 1656 names, addrs = self._GetNodeIp() 1657 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) 1658 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1659
1660 - def _RenameFilesUnlocked(self, rename):
1661 """Renames a file locally and then replicate the change. 1662 1663 This function will rename a file in the local queue directory 1664 and then replicate this rename to all the other nodes we have. 1665 1666 @type rename: list of (old, new) 1667 @param rename: List containing tuples mapping old to new names 1668 1669 """ 1670 # Rename them locally 1671 for old, new in rename: 1672 utils.RenameFile(old, new, mkdir=True) 1673 1674 # ... and on all nodes 1675 names, addrs = self._GetNodeIp() 1676 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) 1677 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1678 1679 @staticmethod
1680 - def _GetJobPath(job_id):
1681 """Returns the job file for a given job id. 1682 1683 @type job_id: str 1684 @param job_id: the job identifier 1685 @rtype: str 1686 @return: the path to the job file 1687 1688 """ 1689 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1690 1691 @staticmethod
1692 - def _GetArchivedJobPath(job_id):
1693 """Returns the archived job file for a give job id. 1694 1695 @type job_id: str 1696 @param job_id: the job identifier 1697 @rtype: str 1698 @return: the path to the archived job file 1699 1700 """ 1701 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, 1702 jstore.GetArchiveDirectory(job_id), 1703 "job-%s" % job_id)
1704 1705 @staticmethod
1706 - def _DetermineJobDirectories(archived):
1707 """Build list of directories containing job files. 1708 1709 @type archived: bool 1710 @param archived: Whether to include directories for archived jobs 1711 @rtype: list 1712 1713 """ 1714 result = [pathutils.QUEUE_DIR] 1715 1716 if archived: 1717 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR 1718 result.extend(map(compat.partial(utils.PathJoin, archive_path), 1719 utils.ListVisibleFiles(archive_path))) 1720 1721 return result
1722 1723 @classmethod
1724 - def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1725 """Return all known job IDs. 1726 1727 The method only looks at disk because it's a requirement that all 1728 jobs are present on disk (so in the _memcache we don't have any 1729 extra IDs). 1730 1731 @type sort: boolean 1732 @param sort: perform sorting on the returned job ids 1733 @rtype: list 1734 @return: the list of job IDs 1735 1736 """ 1737 jlist = [] 1738 1739 for path in cls._DetermineJobDirectories(archived): 1740 for filename in utils.ListVisibleFiles(path): 1741 m = constants.JOB_FILE_RE.match(filename) 1742 if m: 1743 jlist.append(int(m.group(1))) 1744 1745 if sort: 1746 jlist.sort() 1747 return jlist
1748
1749 - def _LoadJobUnlocked(self, job_id):
1750 """Loads a job from the disk or memory. 1751 1752 Given a job id, this will return the cached job object if 1753 existing, or try to load the job from the disk. If loading from 1754 disk, it will also add the job to the cache. 1755 1756 @type job_id: int 1757 @param job_id: the job id 1758 @rtype: L{_QueuedJob} or None 1759 @return: either None or the job object 1760 1761 """ 1762 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!" 1763 1764 job = self._memcache.get(job_id, None) 1765 if job: 1766 logging.debug("Found job %s in memcache", job_id) 1767 assert job.writable, "Found read-only job in memcache" 1768 return job 1769 1770 try: 1771 job = self._LoadJobFromDisk(job_id, False) 1772 if job is None: 1773 return job 1774 except errors.JobFileCorrupted: 1775 old_path = self._GetJobPath(job_id) 1776 new_path = self._GetArchivedJobPath(job_id) 1777 if old_path == new_path: 1778 # job already archived (future case) 1779 logging.exception("Can't parse job %s", job_id) 1780 else: 1781 # non-archived case 1782 logging.exception("Can't parse job %s, will archive.", job_id) 1783 self._RenameFilesUnlocked([(old_path, new_path)]) 1784 return None 1785 1786 assert job.writable, "Job just loaded is not writable" 1787 1788 self._memcache[job_id] = job 1789 logging.debug("Added job %s to the cache", job_id) 1790 return job
1791
1792 - def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1793 """Load the given job file from disk. 1794 1795 Given a job file, read, load and restore it in a _QueuedJob format. 1796 1797 @type job_id: int 1798 @param job_id: job identifier 1799 @type try_archived: bool 1800 @param try_archived: Whether to try loading an archived job 1801 @rtype: L{_QueuedJob} or None 1802 @return: either None or the job object 1803 1804 """ 1805 path_functions = [(self._GetJobPath, False)] 1806 1807 if try_archived: 1808 path_functions.append((self._GetArchivedJobPath, True)) 1809 1810 raw_data = None 1811 archived = None 1812 1813 for (fn, archived) in path_functions: 1814 filepath = fn(job_id) 1815 logging.debug("Loading job from %s", filepath) 1816 try: 1817 raw_data = utils.ReadFile(filepath) 1818 except EnvironmentError, err: 1819 if err.errno != errno.ENOENT: 1820 raise 1821 else: 1822 break 1823 1824 if not raw_data: 1825 logging.debug("No data available for job %s", job_id) 1826 if int(job_id) == self.primary_jid: 1827 logging.warning("My own job file (%s) disappeared;" 1828 " this should only happy at cluster desctruction", 1829 job_id) 1830 if mcpu.lusExecuting[0] == 0: 1831 logging.warning("Not in execution; cleaning up myself due to missing" 1832 " job file") 1833 logging.shutdown() 1834 os._exit(1) # pylint: disable=W0212 1835 return None 1836 1837 if writable is None: 1838 writable = not archived 1839 1840 try: 1841 data = serializer.LoadJson(raw_data) 1842 job = _QueuedJob.Restore(self, data, writable, archived) 1843 except Exception, err: # pylint: disable=W0703 1844 raise errors.JobFileCorrupted(err) 1845 1846 return job
1847
1848 - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1849 """Load the given job file from disk. 1850 1851 Given a job file, read, load and restore it in a _QueuedJob format. 1852 In case of error reading the job, it gets returned as None, and the 1853 exception is logged. 1854 1855 @type job_id: int 1856 @param job_id: job identifier 1857 @type try_archived: bool 1858 @param try_archived: Whether to try loading an archived job 1859 @rtype: L{_QueuedJob} or None 1860 @return: either None or the job object 1861 1862 """ 1863 try: 1864 return self._LoadJobFromDisk(job_id, try_archived, writable=writable) 1865 except (errors.JobFileCorrupted, EnvironmentError): 1866 logging.exception("Can't load/parse job %s", job_id) 1867 return None
1868
1869 - def _UpdateQueueSizeUnlocked(self):
1870 """Update the queue size. 1871 1872 """ 1873 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1874 1875 @classmethod
1876 - def SubmitManyJobs(cls, jobs):
1877 """Create and store multiple jobs. 1878 1879 """ 1880 return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
1881 1882 @staticmethod
1883 - def _FormatSubmitError(msg, ops):
1884 """Formats errors which occurred while submitting a job. 1885 1886 """ 1887 return ("%s; opcodes %s" % 1888 (msg, utils.CommaJoin(op.Summary() for op in ops)))
1889 1890 @staticmethod
1891 - def _ResolveJobDependencies(resolve_fn, deps):
1892 """Resolves relative job IDs in dependencies. 1893 1894 @type resolve_fn: callable 1895 @param resolve_fn: Function to resolve a relative job ID 1896 @type deps: list 1897 @param deps: Dependencies 1898 @rtype: tuple; (boolean, string or list) 1899 @return: If successful (first tuple item), the returned list contains 1900 resolved job IDs along with the requested status; if not successful, 1901 the second element is an error message 1902 1903 """ 1904 result = [] 1905 1906 for (dep_job_id, dep_status) in deps: 1907 if ht.TRelativeJobId(dep_job_id): 1908 assert ht.TInt(dep_job_id) and dep_job_id < 0 1909 try: 1910 job_id = resolve_fn(dep_job_id) 1911 except IndexError: 1912 # Abort 1913 return (False, "Unable to resolve relative job ID %s" % dep_job_id) 1914 else: 1915 job_id = dep_job_id 1916 1917 result.append((job_id, dep_status)) 1918 1919 return (True, result)
1920 1921 @locking.ssynchronized(_LOCK)
1922 - def _EnqueueJobs(self, jobs):
1923 """Helper function to add jobs to worker pool's queue. 1924 1925 @type jobs: list 1926 @param jobs: List of all jobs 1927 1928 """ 1929 return self._EnqueueJobsUnlocked(jobs)
1930
1931 - def _EnqueueJobsUnlocked(self, jobs):
1932 """Helper function to add jobs to worker pool's queue. 1933 1934 @type jobs: list 1935 @param jobs: List of all jobs 1936 1937 """ 1938 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" 1939 self._wpool.AddManyTasks([(job, ) for job in jobs], 1940 priority=[job.CalcPriority() for job in jobs], 1941 task_id=map(_GetIdAttr, jobs))
1942
1943 - def _GetJobStatusForDependencies(self, job_id):
1944 """Gets the status of a job for dependencies. 1945 1946 @type job_id: int 1947 @param job_id: Job ID 1948 @raise errors.JobLost: If job can't be found 1949 1950 """ 1951 # Not using in-memory cache as doing so would require an exclusive lock 1952 1953 # Try to load from disk 1954 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 1955 1956 if job: 1957 assert not job.writable, "Got writable job" # pylint: disable=E1101 1958 1959 if job: 1960 return job.CalcStatus() 1961 1962 raise errors.JobLost("Job %s not found" % job_id)
1963
1964 - def UpdateJobUnlocked(self, job, replicate=True):
1965 """Update a job's on disk storage. 1966 1967 After a job has been modified, this function needs to be called in 1968 order to write the changes to disk and replicate them to the other 1969 nodes. 1970 1971 @type job: L{_QueuedJob} 1972 @param job: the changed job 1973 @type replicate: boolean 1974 @param replicate: whether to replicate the change to remote nodes 1975 1976 """ 1977 if __debug__: 1978 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 1979 assert (finalized ^ (job.end_timestamp is None)) 1980 assert job.writable, "Can't update read-only job" 1981 assert not job.archived, "Can't update archived job" 1982 1983 filename = self._GetJobPath(job.id) 1984 data = serializer.DumpJson(job.Serialize()) 1985 logging.debug("Writing job %s to %s", job.id, filename) 1986 self._UpdateJobQueueFile(filename, data, replicate)
1987
1988 - def HasJobBeenFinalized(self, job_id):
1989 """Checks if a job has been finalized. 1990 1991 @type job_id: int 1992 @param job_id: Job identifier 1993 @rtype: boolean 1994 @return: True if the job has been finalized, 1995 False if the timeout has been reached, 1996 None if the job doesn't exist 1997 1998 """ 1999 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2000 if job is not None: 2001 return job.CalcStatus() in constants.JOBS_FINALIZED 2002 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed: 2003 # FIXME: The above variable is a temporary workaround until the Python job 2004 # queue is completely removed. When removing the job queue, also remove 2005 # the variable from LUClusterDestroy. 2006 return True 2007 else: 2008 return None
2009 2010 @locking.ssynchronized(_LOCK)
2011 - def CancelJob(self, job_id):
2012 """Cancels a job. 2013 2014 This will only succeed if the job has not started yet. 2015 2016 @type job_id: int 2017 @param job_id: job ID of job to be cancelled. 2018 2019 """ 2020 logging.info("Cancelling job %s", job_id) 2021 2022 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2023 2024 @locking.ssynchronized(_LOCK)
2025 - def ChangeJobPriority(self, job_id, priority):
2026 """Changes a job's priority. 2027 2028 @type job_id: int 2029 @param job_id: ID of the job whose priority should be changed 2030 @type priority: int 2031 @param priority: New priority 2032 2033 """ 2034 logging.info("Changing priority of job %s to %s", job_id, priority) 2035 2036 if priority not in constants.OP_PRIO_SUBMIT_VALID: 2037 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 2038 raise errors.GenericError("Invalid priority %s, allowed are %s" % 2039 (priority, allowed)) 2040 2041 def fn(job): 2042 (success, msg) = job.ChangePriority(priority) 2043 2044 if success: 2045 try: 2046 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) 2047 except workerpool.NoSuchTask: 2048 logging.debug("Job %s is not in workerpool at this time", job.id) 2049 2050 return (success, msg)
2051 2052 return self._ModifyJobUnlocked(job_id, fn)
2053
2054 - def _ModifyJobUnlocked(self, job_id, mod_fn):
2055 """Modifies a job. 2056 2057 @type job_id: int 2058 @param job_id: Job ID 2059 @type mod_fn: callable 2060 @param mod_fn: Modifying function, receiving job object as parameter, 2061 returning tuple of (status boolean, message string) 2062 2063 """ 2064 job = self._LoadJobUnlocked(job_id) 2065 if not job: 2066 logging.debug("Job %s not found", job_id) 2067 return (False, "Job %s not found" % job_id) 2068 2069 assert job.writable, "Can't modify read-only job" 2070 assert not job.archived, "Can't modify archived job" 2071 2072 (success, msg) = mod_fn(job) 2073 2074 if success: 2075 # If the job was finalized (e.g. cancelled), this is the final write 2076 # allowed. The job can be archived anytime. 2077 self.UpdateJobUnlocked(job) 2078 2079 return (success, msg)
2080
2081 - def _ArchiveJobsUnlocked(self, jobs):
2082 """Archives jobs. 2083 2084 @type jobs: list of L{_QueuedJob} 2085 @param jobs: Job objects 2086 @rtype: int 2087 @return: Number of archived jobs 2088 2089 """ 2090 archive_jobs = [] 2091 rename_files = [] 2092 for job in jobs: 2093 assert job.writable, "Can't archive read-only job" 2094 assert not job.archived, "Can't cancel archived job" 2095 2096 if job.CalcStatus() not in constants.JOBS_FINALIZED: 2097 logging.debug("Job %s is not yet done", job.id) 2098 continue 2099 2100 archive_jobs.append(job) 2101 2102 old = self._GetJobPath(job.id) 2103 new = self._GetArchivedJobPath(job.id) 2104 rename_files.append((old, new)) 2105 2106 # TODO: What if 1..n files fail to rename? 2107 self._RenameFilesUnlocked(rename_files) 2108 2109 logging.debug("Successfully archived job(s) %s", 2110 utils.CommaJoin(job.id for job in archive_jobs)) 2111 2112 # Since we haven't quite checked, above, if we succeeded or failed renaming 2113 # the files, we update the cached queue size from the filesystem. When we 2114 # get around to fix the TODO: above, we can use the number of actually 2115 # archived jobs to fix this. 2116 self._UpdateQueueSizeUnlocked() 2117 return len(archive_jobs)
2118
2119 - def _Query(self, fields, qfilter):
2120 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, 2121 namefield="id") 2122 2123 # Archived jobs are only looked at if the "archived" field is referenced 2124 # either as a requested field or in the filter. By default archived jobs 2125 # are ignored. 2126 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData()) 2127 2128 job_ids = qobj.RequestedNames() 2129 2130 list_all = (job_ids is None) 2131 2132 if list_all: 2133 # Since files are added to/removed from the queue atomically, there's no 2134 # risk of getting the job ids in an inconsistent state. 2135 job_ids = self._GetJobIDsUnlocked(archived=include_archived) 2136 2137 jobs = [] 2138 2139 for job_id in job_ids: 2140 job = self.SafeLoadJobFromDisk(job_id, True, writable=False) 2141 if job is not None or not list_all: 2142 jobs.append((job_id, job)) 2143 2144 return (qobj, jobs, list_all)
2145
2146 - def QueryJobs(self, fields, qfilter):
2147 """Returns a list of jobs in queue. 2148 2149 @type fields: sequence 2150 @param fields: List of wanted fields 2151 @type qfilter: None or query2 filter (list) 2152 @param qfilter: Query filter 2153 2154 """ 2155 (qobj, ctx, _) = self._Query(fields, qfilter) 2156 2157 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2158
2159 - def OldStyleQueryJobs(self, job_ids, fields):
2160 """Returns a list of jobs in queue. 2161 2162 @type job_ids: list 2163 @param job_ids: sequence of job identifiers or None for all 2164 @type fields: list 2165 @param fields: names of fields to return 2166 @rtype: list 2167 @return: list one element per job, each element being list with 2168 the requested fields 2169 2170 """ 2171 # backwards compat: 2172 job_ids = [int(jid) for jid in job_ids] 2173 qfilter = qlang.MakeSimpleFilter("id", job_ids) 2174 2175 (qobj, ctx, _) = self._Query(fields, qfilter) 2176 2177 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2178 2179 @locking.ssynchronized(_LOCK)
2180 - def PrepareShutdown(self):
2181 """Prepare to stop the job queue. 2182 2183 Returns whether there are any jobs currently running. If the latter is the 2184 case, the job queue is not yet ready for shutdown. Once this function 2185 returns C{True} L{Shutdown} can be called without interfering with any job. 2186 2187 @rtype: bool 2188 @return: Whether there are any running jobs 2189 2190 """ 2191 return self._wpool.HasRunningTasks()
2192 2193 @locking.ssynchronized(_LOCK)
2194 - def Shutdown(self):
2195 """Stops the job queue. 2196 2197 This shutdowns all the worker threads an closes the queue. 2198 2199 """ 2200 self._wpool.TerminateWorkers()
2201