Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the job queue handling. 
  23   
  24  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  25  used by all other classes in this module. 
  26   
  27  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  28      processing jobs 
  29   
  30  """ 
  31   
  32  import os 
  33  import logging 
  34  import errno 
  35  import re 
  36  import time 
  37  import weakref 
  38   
  39  try: 
  40    # pylint: disable-msg=E0611 
  41    from pyinotify import pyinotify 
  42  except ImportError: 
  43    import pyinotify 
  44   
  45  from ganeti import asyncnotifier 
  46  from ganeti import constants 
  47  from ganeti import serializer 
  48  from ganeti import workerpool 
  49  from ganeti import locking 
  50  from ganeti import opcodes 
  51  from ganeti import errors 
  52  from ganeti import mcpu 
  53  from ganeti import utils 
  54  from ganeti import jstore 
  55  from ganeti import rpc 
  56  from ganeti import runtime 
  57  from ganeti import netutils 
  58  from ganeti import compat 
  59   
  60   
  61  JOBQUEUE_THREADS = 25 
  62  JOBS_PER_ARCHIVE_DIRECTORY = 10000 
  63   
  64  # member lock names to be passed to @ssynchronized decorator 
  65  _LOCK = "_lock" 
  66  _QUEUE = "_queue" 
67 68 69 -class CancelJob(Exception):
70 """Special exception to cancel a job. 71 72 """
73
74 75 -def TimeStampNow():
76 """Returns the current timestamp. 77 78 @rtype: tuple 79 @return: the current time in the (seconds, microseconds) format 80 81 """ 82 return utils.SplitTime(time.time())
83
84 85 -class _QueuedOpCode(object):
86 """Encapsulates an opcode object. 87 88 @ivar log: holds the execution log and consists of tuples 89 of the form C{(log_serial, timestamp, level, message)} 90 @ivar input: the OpCode we encapsulate 91 @ivar status: the current status 92 @ivar result: the result of the LU execution 93 @ivar start_timestamp: timestamp for the start of the execution 94 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 95 @ivar stop_timestamp: timestamp for the end of the execution 96 97 """ 98 __slots__ = ["input", "status", "result", "log", "priority", 99 "start_timestamp", "exec_timestamp", "end_timestamp", 100 "__weakref__"] 101
102 - def __init__(self, op):
103 """Constructor for the _QuededOpCode. 104 105 @type op: L{opcodes.OpCode} 106 @param op: the opcode we encapsulate 107 108 """ 109 self.input = op 110 self.status = constants.OP_STATUS_QUEUED 111 self.result = None 112 self.log = [] 113 self.start_timestamp = None 114 self.exec_timestamp = None 115 self.end_timestamp = None 116 117 # Get initial priority (it might change during the lifetime of this opcode) 118 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119 120 @classmethod
121 - def Restore(cls, state):
122 """Restore the _QueuedOpCode from the serialized form. 123 124 @type state: dict 125 @param state: the serialized state 126 @rtype: _QueuedOpCode 127 @return: a new _QueuedOpCode instance 128 129 """ 130 obj = _QueuedOpCode.__new__(cls) 131 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 132 obj.status = state["status"] 133 obj.result = state["result"] 134 obj.log = state["log"] 135 obj.start_timestamp = state.get("start_timestamp", None) 136 obj.exec_timestamp = state.get("exec_timestamp", None) 137 obj.end_timestamp = state.get("end_timestamp", None) 138 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 139 return obj
140
141 - def Serialize(self):
142 """Serializes this _QueuedOpCode. 143 144 @rtype: dict 145 @return: the dictionary holding the serialized state 146 147 """ 148 return { 149 "input": self.input.__getstate__(), 150 "status": self.status, 151 "result": self.result, 152 "log": self.log, 153 "start_timestamp": self.start_timestamp, 154 "exec_timestamp": self.exec_timestamp, 155 "end_timestamp": self.end_timestamp, 156 "priority": self.priority, 157 }
158
159 160 -class _QueuedJob(object):
161 """In-memory job representation. 162 163 This is what we use to track the user-submitted jobs. Locking must 164 be taken care of by users of this class. 165 166 @type queue: L{JobQueue} 167 @ivar queue: the parent queue 168 @ivar id: the job ID 169 @type ops: list 170 @ivar ops: the list of _QueuedOpCode that constitute the job 171 @type log_serial: int 172 @ivar log_serial: holds the index for the next log entry 173 @ivar received_timestamp: the timestamp for when the job was received 174 @ivar start_timestmap: the timestamp for start of execution 175 @ivar end_timestamp: the timestamp for end of execution 176 177 """ 178 # pylint: disable-msg=W0212 179 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 180 "received_timestamp", "start_timestamp", "end_timestamp", 181 "__weakref__"] 182
183 - def __init__(self, queue, job_id, ops):
184 """Constructor for the _QueuedJob. 185 186 @type queue: L{JobQueue} 187 @param queue: our parent queue 188 @type job_id: job_id 189 @param job_id: our job id 190 @type ops: list 191 @param ops: the list of opcodes we hold, which will be encapsulated 192 in _QueuedOpCodes 193 194 """ 195 if not ops: 196 raise errors.GenericError("A job needs at least one opcode") 197 198 self.queue = queue 199 self.id = job_id 200 self.ops = [_QueuedOpCode(op) for op in ops] 201 self.log_serial = 0 202 self.received_timestamp = TimeStampNow() 203 self.start_timestamp = None 204 self.end_timestamp = None 205 206 self._InitInMemory(self)
207 208 @staticmethod
209 - def _InitInMemory(obj):
210 """Initializes in-memory variables. 211 212 """ 213 obj.ops_iter = None 214 obj.cur_opctx = None
215
216 - def __repr__(self):
217 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 218 "id=%s" % self.id, 219 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 220 221 return "<%s at %#x>" % (" ".join(status), id(self))
222 223 @classmethod
224 - def Restore(cls, queue, state):
225 """Restore a _QueuedJob from serialized state: 226 227 @type queue: L{JobQueue} 228 @param queue: to which queue the restored job belongs 229 @type state: dict 230 @param state: the serialized state 231 @rtype: _JobQueue 232 @return: the restored _JobQueue instance 233 234 """ 235 obj = _QueuedJob.__new__(cls) 236 obj.queue = queue 237 obj.id = state["id"] 238 obj.received_timestamp = state.get("received_timestamp", None) 239 obj.start_timestamp = state.get("start_timestamp", None) 240 obj.end_timestamp = state.get("end_timestamp", None) 241 242 obj.ops = [] 243 obj.log_serial = 0 244 for op_state in state["ops"]: 245 op = _QueuedOpCode.Restore(op_state) 246 for log_entry in op.log: 247 obj.log_serial = max(obj.log_serial, log_entry[0]) 248 obj.ops.append(op) 249 250 cls._InitInMemory(obj) 251 252 return obj
253
254 - def Serialize(self):
255 """Serialize the _JobQueue instance. 256 257 @rtype: dict 258 @return: the serialized state 259 260 """ 261 return { 262 "id": self.id, 263 "ops": [op.Serialize() for op in self.ops], 264 "start_timestamp": self.start_timestamp, 265 "end_timestamp": self.end_timestamp, 266 "received_timestamp": self.received_timestamp, 267 }
268
269 - def CalcStatus(self):
270 """Compute the status of this job. 271 272 This function iterates over all the _QueuedOpCodes in the job and 273 based on their status, computes the job status. 274 275 The algorithm is: 276 - if we find a cancelled, or finished with error, the job 277 status will be the same 278 - otherwise, the last opcode with the status one of: 279 - waitlock 280 - canceling 281 - running 282 283 will determine the job status 284 285 - otherwise, it means either all opcodes are queued, or success, 286 and the job status will be the same 287 288 @return: the job status 289 290 """ 291 status = constants.JOB_STATUS_QUEUED 292 293 all_success = True 294 for op in self.ops: 295 if op.status == constants.OP_STATUS_SUCCESS: 296 continue 297 298 all_success = False 299 300 if op.status == constants.OP_STATUS_QUEUED: 301 pass 302 elif op.status == constants.OP_STATUS_WAITLOCK: 303 status = constants.JOB_STATUS_WAITLOCK 304 elif op.status == constants.OP_STATUS_RUNNING: 305 status = constants.JOB_STATUS_RUNNING 306 elif op.status == constants.OP_STATUS_CANCELING: 307 status = constants.JOB_STATUS_CANCELING 308 break 309 elif op.status == constants.OP_STATUS_ERROR: 310 status = constants.JOB_STATUS_ERROR 311 # The whole job fails if one opcode failed 312 break 313 elif op.status == constants.OP_STATUS_CANCELED: 314 status = constants.OP_STATUS_CANCELED 315 break 316 317 if all_success: 318 status = constants.JOB_STATUS_SUCCESS 319 320 return status
321
322 - def CalcPriority(self):
323 """Gets the current priority for this job. 324 325 Only unfinished opcodes are considered. When all are done, the default 326 priority is used. 327 328 @rtype: int 329 330 """ 331 priorities = [op.priority for op in self.ops 332 if op.status not in constants.OPS_FINALIZED] 333 334 if not priorities: 335 # All opcodes are done, assume default priority 336 return constants.OP_PRIO_DEFAULT 337 338 return min(priorities)
339
340 - def GetLogEntries(self, newer_than):
341 """Selectively returns the log entries. 342 343 @type newer_than: None or int 344 @param newer_than: if this is None, return all log entries, 345 otherwise return only the log entries with serial higher 346 than this value 347 @rtype: list 348 @return: the list of the log entries selected 349 350 """ 351 if newer_than is None: 352 serial = -1 353 else: 354 serial = newer_than 355 356 entries = [] 357 for op in self.ops: 358 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 359 360 return entries
361
362 - def GetInfo(self, fields):
363 """Returns information about a job. 364 365 @type fields: list 366 @param fields: names of fields to return 367 @rtype: list 368 @return: list with one element for each field 369 @raise errors.OpExecError: when an invalid field 370 has been passed 371 372 """ 373 row = [] 374 for fname in fields: 375 if fname == "id": 376 row.append(self.id) 377 elif fname == "status": 378 row.append(self.CalcStatus()) 379 elif fname == "priority": 380 row.append(self.CalcPriority()) 381 elif fname == "ops": 382 row.append([op.input.__getstate__() for op in self.ops]) 383 elif fname == "opresult": 384 row.append([op.result for op in self.ops]) 385 elif fname == "opstatus": 386 row.append([op.status for op in self.ops]) 387 elif fname == "oplog": 388 row.append([op.log for op in self.ops]) 389 elif fname == "opstart": 390 row.append([op.start_timestamp for op in self.ops]) 391 elif fname == "opexec": 392 row.append([op.exec_timestamp for op in self.ops]) 393 elif fname == "opend": 394 row.append([op.end_timestamp for op in self.ops]) 395 elif fname == "oppriority": 396 row.append([op.priority for op in self.ops]) 397 elif fname == "received_ts": 398 row.append(self.received_timestamp) 399 elif fname == "start_ts": 400 row.append(self.start_timestamp) 401 elif fname == "end_ts": 402 row.append(self.end_timestamp) 403 elif fname == "summary": 404 row.append([op.input.Summary() for op in self.ops]) 405 else: 406 raise errors.OpExecError("Invalid self query field '%s'" % fname) 407 return row
408
409 - def MarkUnfinishedOps(self, status, result):
410 """Mark unfinished opcodes with a given status and result. 411 412 This is an utility function for marking all running or waiting to 413 be run opcodes with a given status. Opcodes which are already 414 finalised are not changed. 415 416 @param status: a given opcode status 417 @param result: the opcode result 418 419 """ 420 not_marked = True 421 for op in self.ops: 422 if op.status in constants.OPS_FINALIZED: 423 assert not_marked, "Finalized opcodes found after non-finalized ones" 424 continue 425 op.status = status 426 op.result = result 427 not_marked = False
428
429 - def Finalize(self):
430 """Marks the job as finalized. 431 432 """ 433 self.end_timestamp = TimeStampNow()
434
435 - def Cancel(self):
436 """Marks job as canceled/-ing if possible. 437 438 @rtype: tuple; (bool, string) 439 @return: Boolean describing whether job was successfully canceled or marked 440 as canceling and a text message 441 442 """ 443 status = self.CalcStatus() 444 445 if status == constants.JOB_STATUS_QUEUED: 446 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 447 "Job canceled by request") 448 self.Finalize() 449 return (True, "Job %s canceled" % self.id) 450 451 elif status == constants.JOB_STATUS_WAITLOCK: 452 # The worker will notice the new status and cancel the job 453 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 454 return (True, "Job %s will be canceled" % self.id) 455 456 else: 457 logging.debug("Job %s is no longer waiting in the queue", self.id) 458 return (False, "Job %s is no longer waiting in the queue" % self.id)
459
460 461 -class _OpExecCallbacks(mcpu.OpExecCbBase):
462 - def __init__(self, queue, job, op):
463 """Initializes this class. 464 465 @type queue: L{JobQueue} 466 @param queue: Job queue 467 @type job: L{_QueuedJob} 468 @param job: Job object 469 @type op: L{_QueuedOpCode} 470 @param op: OpCode 471 472 """ 473 assert queue, "Queue is missing" 474 assert job, "Job is missing" 475 assert op, "Opcode is missing" 476 477 self._queue = queue 478 self._job = job 479 self._op = op
480
481 - def _CheckCancel(self):
482 """Raises an exception to cancel the job if asked to. 483 484 """ 485 # Cancel here if we were asked to 486 if self._op.status == constants.OP_STATUS_CANCELING: 487 logging.debug("Canceling opcode") 488 raise CancelJob()
489 490 @locking.ssynchronized(_QUEUE, shared=1)
491 - def NotifyStart(self):
492 """Mark the opcode as running, not lock-waiting. 493 494 This is called from the mcpu code as a notifier function, when the LU is 495 finally about to start the Exec() method. Of course, to have end-user 496 visible results, the opcode must be initially (before calling into 497 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. 498 499 """ 500 assert self._op in self._job.ops 501 assert self._op.status in (constants.OP_STATUS_WAITLOCK, 502 constants.OP_STATUS_CANCELING) 503 504 # Cancel here if we were asked to 505 self._CheckCancel() 506 507 logging.debug("Opcode is now running") 508 509 self._op.status = constants.OP_STATUS_RUNNING 510 self._op.exec_timestamp = TimeStampNow() 511 512 # And finally replicate the job status 513 self._queue.UpdateJobUnlocked(self._job)
514 515 @locking.ssynchronized(_QUEUE, shared=1)
516 - def _AppendFeedback(self, timestamp, log_type, log_msg):
517 """Internal feedback append function, with locks 518 519 """ 520 self._job.log_serial += 1 521 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 522 self._queue.UpdateJobUnlocked(self._job, replicate=False)
523
524 - def Feedback(self, *args):
525 """Append a log entry. 526 527 """ 528 assert len(args) < 3 529 530 if len(args) == 1: 531 log_type = constants.ELOG_MESSAGE 532 log_msg = args[0] 533 else: 534 (log_type, log_msg) = args 535 536 # The time is split to make serialization easier and not lose 537 # precision. 538 timestamp = utils.SplitTime(time.time()) 539 self._AppendFeedback(timestamp, log_type, log_msg)
540
541 - def CheckCancel(self):
542 """Check whether job has been cancelled. 543 544 """ 545 assert self._op.status in (constants.OP_STATUS_WAITLOCK, 546 constants.OP_STATUS_CANCELING) 547 548 # Cancel here if we were asked to 549 self._CheckCancel()
550
551 552 -class _JobChangesChecker(object):
553 - def __init__(self, fields, prev_job_info, prev_log_serial):
554 """Initializes this class. 555 556 @type fields: list of strings 557 @param fields: Fields requested by LUXI client 558 @type prev_job_info: string 559 @param prev_job_info: previous job info, as passed by the LUXI client 560 @type prev_log_serial: string 561 @param prev_log_serial: previous job serial, as passed by the LUXI client 562 563 """ 564 self._fields = fields 565 self._prev_job_info = prev_job_info 566 self._prev_log_serial = prev_log_serial
567
568 - def __call__(self, job):
569 """Checks whether job has changed. 570 571 @type job: L{_QueuedJob} 572 @param job: Job object 573 574 """ 575 status = job.CalcStatus() 576 job_info = job.GetInfo(self._fields) 577 log_entries = job.GetLogEntries(self._prev_log_serial) 578 579 # Serializing and deserializing data can cause type changes (e.g. from 580 # tuple to list) or precision loss. We're doing it here so that we get 581 # the same modifications as the data received from the client. Without 582 # this, the comparison afterwards might fail without the data being 583 # significantly different. 584 # TODO: we just deserialized from disk, investigate how to make sure that 585 # the job info and log entries are compatible to avoid this further step. 586 # TODO: Doing something like in testutils.py:UnifyValueType might be more 587 # efficient, though floats will be tricky 588 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 589 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 590 591 # Don't even try to wait if the job is no longer running, there will be 592 # no changes. 593 if (status not in (constants.JOB_STATUS_QUEUED, 594 constants.JOB_STATUS_RUNNING, 595 constants.JOB_STATUS_WAITLOCK) or 596 job_info != self._prev_job_info or 597 (log_entries and self._prev_log_serial != log_entries[0][0])): 598 logging.debug("Job %s changed", job.id) 599 return (job_info, log_entries) 600 601 return None
602
603 604 -class _JobFileChangesWaiter(object):
605 - def __init__(self, filename):
606 """Initializes this class. 607 608 @type filename: string 609 @param filename: Path to job file 610 @raises errors.InotifyError: if the notifier cannot be setup 611 612 """ 613 self._wm = pyinotify.WatchManager() 614 self._inotify_handler = \ 615 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) 616 self._notifier = \ 617 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) 618 try: 619 self._inotify_handler.enable() 620 except Exception: 621 # pyinotify doesn't close file descriptors automatically 622 self._notifier.stop() 623 raise
624
625 - def _OnInotify(self, notifier_enabled):
626 """Callback for inotify. 627 628 """ 629 if not notifier_enabled: 630 self._inotify_handler.enable()
631
632 - def Wait(self, timeout):
633 """Waits for the job file to change. 634 635 @type timeout: float 636 @param timeout: Timeout in seconds 637 @return: Whether there have been events 638 639 """ 640 assert timeout >= 0 641 have_events = self._notifier.check_events(timeout * 1000) 642 if have_events: 643 self._notifier.read_events() 644 self._notifier.process_events() 645 return have_events
646
647 - def Close(self):
648 """Closes underlying notifier and its file descriptor. 649 650 """ 651 self._notifier.stop()
652
653 654 -class _JobChangesWaiter(object):
655 - def __init__(self, filename):
656 """Initializes this class. 657 658 @type filename: string 659 @param filename: Path to job file 660 661 """ 662 self._filewaiter = None 663 self._filename = filename
664
665 - def Wait(self, timeout):
666 """Waits for a job to change. 667 668 @type timeout: float 669 @param timeout: Timeout in seconds 670 @return: Whether there have been events 671 672 """ 673 if self._filewaiter: 674 return self._filewaiter.Wait(timeout) 675 676 # Lazy setup: Avoid inotify setup cost when job file has already changed. 677 # If this point is reached, return immediately and let caller check the job 678 # file again in case there were changes since the last check. This avoids a 679 # race condition. 680 self._filewaiter = _JobFileChangesWaiter(self._filename) 681 682 return True
683
684 - def Close(self):
685 """Closes underlying waiter. 686 687 """ 688 if self._filewaiter: 689 self._filewaiter.Close()
690
691 692 -class _WaitForJobChangesHelper(object):
693 """Helper class using inotify to wait for changes in a job file. 694 695 This class takes a previous job status and serial, and alerts the client when 696 the current job status has changed. 697 698 """ 699 @staticmethod
700 - def _CheckForChanges(job_load_fn, check_fn):
701 job = job_load_fn() 702 if not job: 703 raise errors.JobLost() 704 705 result = check_fn(job) 706 if result is None: 707 raise utils.RetryAgain() 708 709 return result
710
711 - def __call__(self, filename, job_load_fn, 712 fields, prev_job_info, prev_log_serial, timeout):
713 """Waits for changes on a job. 714 715 @type filename: string 716 @param filename: File on which to wait for changes 717 @type job_load_fn: callable 718 @param job_load_fn: Function to load job 719 @type fields: list of strings 720 @param fields: Which fields to check for changes 721 @type prev_job_info: list or None 722 @param prev_job_info: Last job information returned 723 @type prev_log_serial: int 724 @param prev_log_serial: Last job message serial number 725 @type timeout: float 726 @param timeout: maximum time to wait in seconds 727 728 """ 729 try: 730 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) 731 waiter = _JobChangesWaiter(filename) 732 try: 733 return utils.Retry(compat.partial(self._CheckForChanges, 734 job_load_fn, check_fn), 735 utils.RETRY_REMAINING_TIME, timeout, 736 wait_fn=waiter.Wait) 737 finally: 738 waiter.Close() 739 except (errors.InotifyError, errors.JobLost): 740 return None 741 except utils.RetryTimeout: 742 return constants.JOB_NOTCHANGED
743
744 745 -def _EncodeOpError(err):
746 """Encodes an error which occurred while processing an opcode. 747 748 """ 749 if isinstance(err, errors.GenericError): 750 to_encode = err 751 else: 752 to_encode = errors.OpExecError(str(err)) 753 754 return errors.EncodeException(to_encode)
755
756 757 -class _TimeoutStrategyWrapper:
758 - def __init__(self, fn):
759 """Initializes this class. 760 761 """ 762 self._fn = fn 763 self._next = None
764
765 - def _Advance(self):
766 """Gets the next timeout if necessary. 767 768 """ 769 if self._next is None: 770 self._next = self._fn()
771
772 - def Peek(self):
773 """Returns the next timeout. 774 775 """ 776 self._Advance() 777 return self._next
778
779 - def Next(self):
780 """Returns the current timeout and advances the internal state. 781 782 """ 783 self._Advance() 784 result = self._next 785 self._next = None 786 return result
787
788 789 -class _OpExecContext:
790 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
791 """Initializes this class. 792 793 """ 794 self.op = op 795 self.index = index 796 self.log_prefix = log_prefix 797 self.summary = op.input.Summary() 798 799 self._timeout_strategy_factory = timeout_strategy_factory 800 self._ResetTimeoutStrategy()
801
802 - def _ResetTimeoutStrategy(self):
803 """Creates a new timeout strategy. 804 805 """ 806 self._timeout_strategy = \ 807 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
808
809 - def CheckPriorityIncrease(self):
810 """Checks whether priority can and should be increased. 811 812 Called when locks couldn't be acquired. 813 814 """ 815 op = self.op 816 817 # Exhausted all retries and next round should not use blocking acquire 818 # for locks? 819 if (self._timeout_strategy.Peek() is None and 820 op.priority > constants.OP_PRIO_HIGHEST): 821 logging.debug("Increasing priority") 822 op.priority -= 1 823 self._ResetTimeoutStrategy() 824 return True 825 826 return False
827
828 - def GetNextLockTimeout(self):
829 """Returns the next lock acquire timeout. 830 831 """ 832 return self._timeout_strategy.Next()
833
834 835 -class _JobProcessor(object):
836 - def __init__(self, queue, opexec_fn, job, 837 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
838 """Initializes this class. 839 840 """ 841 self.queue = queue 842 self.opexec_fn = opexec_fn 843 self.job = job 844 self._timeout_strategy_factory = _timeout_strategy_factory
845 846 @staticmethod
847 - def _FindNextOpcode(job, timeout_strategy_factory):
848 """Locates the next opcode to run. 849 850 @type job: L{_QueuedJob} 851 @param job: Job object 852 @param timeout_strategy_factory: Callable to create new timeout strategy 853 854 """ 855 # Create some sort of a cache to speed up locating next opcode for future 856 # lookups 857 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 858 # pending and one for processed ops. 859 if job.ops_iter is None: 860 job.ops_iter = enumerate(job.ops) 861 862 # Find next opcode to run 863 while True: 864 try: 865 (idx, op) = job.ops_iter.next() 866 except StopIteration: 867 raise errors.ProgrammerError("Called for a finished job") 868 869 if op.status == constants.OP_STATUS_RUNNING: 870 # Found an opcode already marked as running 871 raise errors.ProgrammerError("Called for job marked as running") 872 873 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 874 timeout_strategy_factory) 875 876 if op.status not in constants.OPS_FINALIZED: 877 return opctx 878 879 # This is a job that was partially completed before master daemon 880 # shutdown, so it can be expected that some opcodes are already 881 # completed successfully (if any did error out, then the whole job 882 # should have been aborted and not resubmitted for processing). 883 logging.info("%s: opcode %s already processed, skipping", 884 opctx.log_prefix, opctx.summary)
885 886 @staticmethod
887 - def _MarkWaitlock(job, op):
888 """Marks an opcode as waiting for locks. 889 890 The job's start timestamp is also set if necessary. 891 892 @type job: L{_QueuedJob} 893 @param job: Job object 894 @type op: L{_QueuedOpCode} 895 @param op: Opcode object 896 897 """ 898 assert op in job.ops 899 assert op.status in (constants.OP_STATUS_QUEUED, 900 constants.OP_STATUS_WAITLOCK) 901 902 update = False 903 904 op.result = None 905 906 if op.status == constants.OP_STATUS_QUEUED: 907 op.status = constants.OP_STATUS_WAITLOCK 908 update = True 909 910 if op.start_timestamp is None: 911 op.start_timestamp = TimeStampNow() 912 update = True 913 914 if job.start_timestamp is None: 915 job.start_timestamp = op.start_timestamp 916 update = True 917 918 assert op.status == constants.OP_STATUS_WAITLOCK 919 920 return update
921
922 - def _ExecOpCodeUnlocked(self, opctx):
923 """Processes one opcode and returns the result. 924 925 """ 926 op = opctx.op 927 928 assert op.status == constants.OP_STATUS_WAITLOCK 929 930 timeout = opctx.GetNextLockTimeout() 931 932 try: 933 # Make sure not to hold queue lock while calling ExecOpCode 934 result = self.opexec_fn(op.input, 935 _OpExecCallbacks(self.queue, self.job, op), 936 timeout=timeout, priority=op.priority) 937 except mcpu.LockAcquireTimeout: 938 assert timeout is not None, "Received timeout for blocking acquire" 939 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 940 941 assert op.status in (constants.OP_STATUS_WAITLOCK, 942 constants.OP_STATUS_CANCELING) 943 944 # Was job cancelled while we were waiting for the lock? 945 if op.status == constants.OP_STATUS_CANCELING: 946 return (constants.OP_STATUS_CANCELING, None) 947 948 # Stay in waitlock while trying to re-acquire lock 949 return (constants.OP_STATUS_WAITLOCK, None) 950 except CancelJob: 951 logging.exception("%s: Canceling job", opctx.log_prefix) 952 assert op.status == constants.OP_STATUS_CANCELING 953 return (constants.OP_STATUS_CANCELING, None) 954 except Exception, err: # pylint: disable-msg=W0703 955 logging.exception("%s: Caught exception in %s", 956 opctx.log_prefix, opctx.summary) 957 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 958 else: 959 logging.debug("%s: %s successful", 960 opctx.log_prefix, opctx.summary) 961 return (constants.OP_STATUS_SUCCESS, result)
962
963 - def __call__(self, _nextop_fn=None):
964 """Continues execution of a job. 965 966 @param _nextop_fn: Callback function for tests 967 @rtype: bool 968 @return: True if job is finished, False if processor needs to be called 969 again 970 971 """ 972 queue = self.queue 973 job = self.job 974 975 logging.debug("Processing job %s", job.id) 976 977 queue.acquire(shared=1) 978 try: 979 opcount = len(job.ops) 980 981 # Don't do anything for finalized jobs 982 if job.CalcStatus() in constants.JOBS_FINALIZED: 983 return True 984 985 # Is a previous opcode still pending? 986 if job.cur_opctx: 987 opctx = job.cur_opctx 988 job.cur_opctx = None 989 else: 990 if __debug__ and _nextop_fn: 991 _nextop_fn() 992 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 993 994 op = opctx.op 995 996 # Consistency check 997 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 998 constants.OP_STATUS_CANCELING) 999 for i in job.ops[opctx.index + 1:]) 1000 1001 assert op.status in (constants.OP_STATUS_QUEUED, 1002 constants.OP_STATUS_WAITLOCK, 1003 constants.OP_STATUS_CANCELING) 1004 1005 assert (op.priority <= constants.OP_PRIO_LOWEST and 1006 op.priority >= constants.OP_PRIO_HIGHEST) 1007 1008 if op.status != constants.OP_STATUS_CANCELING: 1009 assert op.status in (constants.OP_STATUS_QUEUED, 1010 constants.OP_STATUS_WAITLOCK) 1011 1012 # Prepare to start opcode 1013 if self._MarkWaitlock(job, op): 1014 # Write to disk 1015 queue.UpdateJobUnlocked(job) 1016 1017 assert op.status == constants.OP_STATUS_WAITLOCK 1018 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK 1019 assert job.start_timestamp and op.start_timestamp 1020 1021 logging.info("%s: opcode %s waiting for locks", 1022 opctx.log_prefix, opctx.summary) 1023 1024 queue.release() 1025 try: 1026 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1027 finally: 1028 queue.acquire(shared=1) 1029 1030 op.status = op_status 1031 op.result = op_result 1032 1033 if op.status == constants.OP_STATUS_WAITLOCK: 1034 # Couldn't get locks in time 1035 assert not op.end_timestamp 1036 else: 1037 # Finalize opcode 1038 op.end_timestamp = TimeStampNow() 1039 1040 if op.status == constants.OP_STATUS_CANCELING: 1041 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1042 for i in job.ops[opctx.index:]) 1043 else: 1044 assert op.status in constants.OPS_FINALIZED 1045 1046 if op.status == constants.OP_STATUS_WAITLOCK: 1047 finalize = False 1048 1049 if opctx.CheckPriorityIncrease(): 1050 # Priority was changed, need to update on-disk file 1051 queue.UpdateJobUnlocked(job) 1052 1053 # Keep around for another round 1054 job.cur_opctx = opctx 1055 1056 assert (op.priority <= constants.OP_PRIO_LOWEST and 1057 op.priority >= constants.OP_PRIO_HIGHEST) 1058 1059 # In no case must the status be finalized here 1060 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK 1061 1062 else: 1063 # Ensure all opcodes so far have been successful 1064 assert (opctx.index == 0 or 1065 compat.all(i.status == constants.OP_STATUS_SUCCESS 1066 for i in job.ops[:opctx.index])) 1067 1068 # Reset context 1069 job.cur_opctx = None 1070 1071 if op.status == constants.OP_STATUS_SUCCESS: 1072 finalize = False 1073 1074 elif op.status == constants.OP_STATUS_ERROR: 1075 # Ensure failed opcode has an exception as its result 1076 assert errors.GetEncodedError(job.ops[opctx.index].result) 1077 1078 to_encode = errors.OpExecError("Preceding opcode failed") 1079 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1080 _EncodeOpError(to_encode)) 1081 finalize = True 1082 1083 # Consistency check 1084 assert compat.all(i.status == constants.OP_STATUS_ERROR and 1085 errors.GetEncodedError(i.result) 1086 for i in job.ops[opctx.index:]) 1087 1088 elif op.status == constants.OP_STATUS_CANCELING: 1089 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1090 "Job canceled by request") 1091 finalize = True 1092 1093 else: 1094 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1095 1096 if opctx.index == (opcount - 1): 1097 # Finalize on last opcode 1098 finalize = True 1099 1100 if finalize: 1101 # All opcodes have been run, finalize job 1102 job.Finalize() 1103 1104 # Write to disk. If the job status is final, this is the final write 1105 # allowed. Once the file has been written, it can be archived anytime. 1106 queue.UpdateJobUnlocked(job) 1107 1108 if finalize: 1109 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1110 return True 1111 1112 return False 1113 finally: 1114 queue.release()
1115
1116 1117 -class _JobQueueWorker(workerpool.BaseWorker):
1118 """The actual job workers. 1119 1120 """
1121 - def RunTask(self, job): # pylint: disable-msg=W0221
1122 """Job executor. 1123 1124 This functions processes a job. It is closely tied to the L{_QueuedJob} and 1125 L{_QueuedOpCode} classes. 1126 1127 @type job: L{_QueuedJob} 1128 @param job: the job to be processed 1129 1130 """ 1131 queue = job.queue 1132 assert queue == self.pool.queue 1133 1134 self.SetTaskName("Job%s" % job.id) 1135 1136 proc = mcpu.Processor(queue.context, job.id) 1137 1138 if not _JobProcessor(queue, proc.ExecOpCode, job)(): 1139 # Schedule again 1140 raise workerpool.DeferTask(priority=job.CalcPriority())
1141
1142 1143 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1144 """Simple class implementing a job-processing workerpool. 1145 1146 """
1147 - def __init__(self, queue):
1148 super(_JobQueueWorkerPool, self).__init__("JobQueue", 1149 JOBQUEUE_THREADS, 1150 _JobQueueWorker) 1151 self.queue = queue
1152
1153 1154 -def _RequireOpenQueue(fn):
1155 """Decorator for "public" functions. 1156 1157 This function should be used for all 'public' functions. That is, 1158 functions usually called from other classes. Note that this should 1159 be applied only to methods (not plain functions), since it expects 1160 that the decorated function is called with a first argument that has 1161 a '_queue_filelock' argument. 1162 1163 @warning: Use this decorator only after locking.ssynchronized 1164 1165 Example:: 1166 @locking.ssynchronized(_LOCK) 1167 @_RequireOpenQueue 1168 def Example(self): 1169 pass 1170 1171 """ 1172 def wrapper(self, *args, **kwargs): 1173 # pylint: disable-msg=W0212 1174 assert self._queue_filelock is not None, "Queue should be open" 1175 return fn(self, *args, **kwargs)
1176 return wrapper 1177
1178 1179 -class JobQueue(object):
1180 """Queue used to manage the jobs. 1181 1182 @cvar _RE_JOB_FILE: regex matching the valid job file names 1183 1184 """ 1185 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) 1186
1187 - def __init__(self, context):
1188 """Constructor for JobQueue. 1189 1190 The constructor will initialize the job queue object and then 1191 start loading the current jobs from disk, either for starting them 1192 (if they were queue) or for aborting them (if they were already 1193 running). 1194 1195 @type context: GanetiContext 1196 @param context: the context object for access to the configuration 1197 data and other ganeti objects 1198 1199 """ 1200 self.context = context 1201 self._memcache = weakref.WeakValueDictionary() 1202 self._my_hostname = netutils.Hostname.GetSysName() 1203 1204 # The Big JobQueue lock. If a code block or method acquires it in shared 1205 # mode safe it must guarantee concurrency with all the code acquiring it in 1206 # shared mode, including itself. In order not to acquire it at all 1207 # concurrency must be guaranteed with all code acquiring it in shared mode 1208 # and all code acquiring it exclusively. 1209 self._lock = locking.SharedLock("JobQueue") 1210 1211 self.acquire = self._lock.acquire 1212 self.release = self._lock.release 1213 1214 # Initialize the queue, and acquire the filelock. 1215 # This ensures no other process is working on the job queue. 1216 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) 1217 1218 # Read serial file 1219 self._last_serial = jstore.ReadSerial() 1220 assert self._last_serial is not None, ("Serial file was modified between" 1221 " check in jstore and here") 1222 1223 # Get initial list of nodes 1224 self._nodes = dict((n.name, n.primary_ip) 1225 for n in self.context.cfg.GetAllNodesInfo().values() 1226 if n.master_candidate) 1227 1228 # Remove master node 1229 self._nodes.pop(self._my_hostname, None) 1230 1231 # TODO: Check consistency across nodes 1232 1233 self._queue_size = 0 1234 self._UpdateQueueSizeUnlocked() 1235 self._drained = self._IsQueueMarkedDrain() 1236 1237 # Setup worker pool 1238 self._wpool = _JobQueueWorkerPool(self) 1239 try: 1240 self._InspectQueue() 1241 except: 1242 self._wpool.TerminateWorkers() 1243 raise
1244 1245 @locking.ssynchronized(_LOCK) 1246 @_RequireOpenQueue
1247 - def _InspectQueue(self):
1248 """Loads the whole job queue and resumes unfinished jobs. 1249 1250 This function needs the lock here because WorkerPool.AddTask() may start a 1251 job while we're still doing our work. 1252 1253 """ 1254 logging.info("Inspecting job queue") 1255 1256 restartjobs = [] 1257 1258 all_job_ids = self._GetJobIDsUnlocked() 1259 jobs_count = len(all_job_ids) 1260 lastinfo = time.time() 1261 for idx, job_id in enumerate(all_job_ids): 1262 # Give an update every 1000 jobs or 10 seconds 1263 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or 1264 idx == (jobs_count - 1)): 1265 logging.info("Job queue inspection: %d/%d (%0.1f %%)", 1266 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) 1267 lastinfo = time.time() 1268 1269 job = self._LoadJobUnlocked(job_id) 1270 1271 # a failure in loading the job can cause 'None' to be returned 1272 if job is None: 1273 continue 1274 1275 status = job.CalcStatus() 1276 1277 if status == constants.JOB_STATUS_QUEUED: 1278 restartjobs.append(job) 1279 1280 elif status in (constants.JOB_STATUS_RUNNING, 1281 constants.JOB_STATUS_WAITLOCK, 1282 constants.JOB_STATUS_CANCELING): 1283 logging.warning("Unfinished job %s found: %s", job.id, job) 1284 1285 if status == constants.JOB_STATUS_WAITLOCK: 1286 # Restart job 1287 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1288 restartjobs.append(job) 1289 else: 1290 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1291 "Unclean master daemon shutdown") 1292 job.Finalize() 1293 1294 self.UpdateJobUnlocked(job) 1295 1296 if restartjobs: 1297 logging.info("Restarting %s jobs", len(restartjobs)) 1298 self._EnqueueJobs(restartjobs) 1299 1300 logging.info("Job queue inspection finished")
1301 1302 @locking.ssynchronized(_LOCK) 1303 @_RequireOpenQueue
1304 - def AddNode(self, node):
1305 """Register a new node with the queue. 1306 1307 @type node: L{objects.Node} 1308 @param node: the node object to be added 1309 1310 """ 1311 node_name = node.name 1312 assert node_name != self._my_hostname 1313 1314 # Clean queue directory on added node 1315 result = rpc.RpcRunner.call_jobqueue_purge(node_name) 1316 msg = result.fail_msg 1317 if msg: 1318 logging.warning("Cannot cleanup queue directory on node %s: %s", 1319 node_name, msg) 1320 1321 if not node.master_candidate: 1322 # remove if existing, ignoring errors 1323 self._nodes.pop(node_name, None) 1324 # and skip the replication of the job ids 1325 return 1326 1327 # Upload the whole queue excluding archived jobs 1328 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1329 1330 # Upload current serial file 1331 files.append(constants.JOB_QUEUE_SERIAL_FILE) 1332 1333 for file_name in files: 1334 # Read file content 1335 content = utils.ReadFile(file_name) 1336 1337 result = rpc.RpcRunner.call_jobqueue_update([node_name], 1338 [node.primary_ip], 1339 file_name, content) 1340 msg = result[node_name].fail_msg 1341 if msg: 1342 logging.error("Failed to upload file %s to node %s: %s", 1343 file_name, node_name, msg) 1344 1345 self._nodes[node_name] = node.primary_ip
1346 1347 @locking.ssynchronized(_LOCK) 1348 @_RequireOpenQueue
1349 - def RemoveNode(self, node_name):
1350 """Callback called when removing nodes from the cluster. 1351 1352 @type node_name: str 1353 @param node_name: the name of the node to remove 1354 1355 """ 1356 self._nodes.pop(node_name, None)
1357 1358 @staticmethod
1359 - def _CheckRpcResult(result, nodes, failmsg):
1360 """Verifies the status of an RPC call. 1361 1362 Since we aim to keep consistency should this node (the current 1363 master) fail, we will log errors if our rpc fail, and especially 1364 log the case when more than half of the nodes fails. 1365 1366 @param result: the data as returned from the rpc call 1367 @type nodes: list 1368 @param nodes: the list of nodes we made the call to 1369 @type failmsg: str 1370 @param failmsg: the identifier to be used for logging 1371 1372 """ 1373 failed = [] 1374 success = [] 1375 1376 for node in nodes: 1377 msg = result[node].fail_msg 1378 if msg: 1379 failed.append(node) 1380 logging.error("RPC call %s (%s) failed on node %s: %s", 1381 result[node].call, failmsg, node, msg) 1382 else: 1383 success.append(node) 1384 1385 # +1 for the master node 1386 if (len(success) + 1) < len(failed): 1387 # TODO: Handle failing nodes 1388 logging.error("More than half of the nodes failed")
1389
1390 - def _GetNodeIp(self):
1391 """Helper for returning the node name/ip list. 1392 1393 @rtype: (list, list) 1394 @return: a tuple of two lists, the first one with the node 1395 names and the second one with the node addresses 1396 1397 """ 1398 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1399 name_list = self._nodes.keys() 1400 addr_list = [self._nodes[name] for name in name_list] 1401 return name_list, addr_list
1402
1403 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1404 """Writes a file locally and then replicates it to all nodes. 1405 1406 This function will replace the contents of a file on the local 1407 node and then replicate it to all the other nodes we have. 1408 1409 @type file_name: str 1410 @param file_name: the path of the file to be replicated 1411 @type data: str 1412 @param data: the new contents of the file 1413 @type replicate: boolean 1414 @param replicate: whether to spread the changes to the remote nodes 1415 1416 """ 1417 getents = runtime.GetEnts() 1418 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1419 gid=getents.masterd_gid) 1420 1421 if replicate: 1422 names, addrs = self._GetNodeIp() 1423 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) 1424 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1425
1426 - def _RenameFilesUnlocked(self, rename):
1427 """Renames a file locally and then replicate the change. 1428 1429 This function will rename a file in the local queue directory 1430 and then replicate this rename to all the other nodes we have. 1431 1432 @type rename: list of (old, new) 1433 @param rename: List containing tuples mapping old to new names 1434 1435 """ 1436 # Rename them locally 1437 for old, new in rename: 1438 utils.RenameFile(old, new, mkdir=True) 1439 1440 # ... and on all nodes 1441 names, addrs = self._GetNodeIp() 1442 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) 1443 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1444 1445 @staticmethod
1446 - def _FormatJobID(job_id):
1447 """Convert a job ID to string format. 1448 1449 Currently this just does C{str(job_id)} after performing some 1450 checks, but if we want to change the job id format this will 1451 abstract this change. 1452 1453 @type job_id: int or long 1454 @param job_id: the numeric job id 1455 @rtype: str 1456 @return: the formatted job id 1457 1458 """ 1459 if not isinstance(job_id, (int, long)): 1460 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) 1461 if job_id < 0: 1462 raise errors.ProgrammerError("Job ID %s is negative" % job_id) 1463 1464 return str(job_id)
1465 1466 @classmethod
1467 - def _GetArchiveDirectory(cls, job_id):
1468 """Returns the archive directory for a job. 1469 1470 @type job_id: str 1471 @param job_id: Job identifier 1472 @rtype: str 1473 @return: Directory name 1474 1475 """ 1476 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1477
1478 - def _NewSerialsUnlocked(self, count):
1479 """Generates a new job identifier. 1480 1481 Job identifiers are unique during the lifetime of a cluster. 1482 1483 @type count: integer 1484 @param count: how many serials to return 1485 @rtype: str 1486 @return: a string representing the job identifier. 1487 1488 """ 1489 assert count > 0 1490 # New number 1491 serial = self._last_serial + count 1492 1493 # Write to file 1494 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE, 1495 "%s\n" % serial, True) 1496 1497 result = [self._FormatJobID(v) 1498 for v in range(self._last_serial + 1, serial + 1)] 1499 1500 # Keep it only if we were able to write the file 1501 self._last_serial = serial 1502 1503 assert len(result) == count 1504 1505 return result
1506 1507 @staticmethod
1508 - def _GetJobPath(job_id):
1509 """Returns the job file for a given job id. 1510 1511 @type job_id: str 1512 @param job_id: the job identifier 1513 @rtype: str 1514 @return: the path to the job file 1515 1516 """ 1517 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1518 1519 @classmethod
1520 - def _GetArchivedJobPath(cls, job_id):
1521 """Returns the archived job file for a give job id. 1522 1523 @type job_id: str 1524 @param job_id: the job identifier 1525 @rtype: str 1526 @return: the path to the archived job file 1527 1528 """ 1529 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, 1530 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1531
1532 - def _GetJobIDsUnlocked(self, sort=True):
1533 """Return all known job IDs. 1534 1535 The method only looks at disk because it's a requirement that all 1536 jobs are present on disk (so in the _memcache we don't have any 1537 extra IDs). 1538 1539 @type sort: boolean 1540 @param sort: perform sorting on the returned job ids 1541 @rtype: list 1542 @return: the list of job IDs 1543 1544 """ 1545 jlist = [] 1546 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR): 1547 m = self._RE_JOB_FILE.match(filename) 1548 if m: 1549 jlist.append(m.group(1)) 1550 if sort: 1551 jlist = utils.NiceSort(jlist) 1552 return jlist
1553
1554 - def _LoadJobUnlocked(self, job_id):
1555 """Loads a job from the disk or memory. 1556 1557 Given a job id, this will return the cached job object if 1558 existing, or try to load the job from the disk. If loading from 1559 disk, it will also add the job to the cache. 1560 1561 @param job_id: the job id 1562 @rtype: L{_QueuedJob} or None 1563 @return: either None or the job object 1564 1565 """ 1566 job = self._memcache.get(job_id, None) 1567 if job: 1568 logging.debug("Found job %s in memcache", job_id) 1569 return job 1570 1571 try: 1572 job = self._LoadJobFromDisk(job_id) 1573 if job is None: 1574 return job 1575 except errors.JobFileCorrupted: 1576 old_path = self._GetJobPath(job_id) 1577 new_path = self._GetArchivedJobPath(job_id) 1578 if old_path == new_path: 1579 # job already archived (future case) 1580 logging.exception("Can't parse job %s", job_id) 1581 else: 1582 # non-archived case 1583 logging.exception("Can't parse job %s, will archive.", job_id) 1584 self._RenameFilesUnlocked([(old_path, new_path)]) 1585 return None 1586 1587 self._memcache[job_id] = job 1588 logging.debug("Added job %s to the cache", job_id) 1589 return job
1590
1591 - def _LoadJobFromDisk(self, job_id):
1592 """Load the given job file from disk. 1593 1594 Given a job file, read, load and restore it in a _QueuedJob format. 1595 1596 @type job_id: string 1597 @param job_id: job identifier 1598 @rtype: L{_QueuedJob} or None 1599 @return: either None or the job object 1600 1601 """ 1602 filepath = self._GetJobPath(job_id) 1603 logging.debug("Loading job from %s", filepath) 1604 try: 1605 raw_data = utils.ReadFile(filepath) 1606 except EnvironmentError, err: 1607 if err.errno in (errno.ENOENT, ): 1608 return None 1609 raise 1610 1611 try: 1612 data = serializer.LoadJson(raw_data) 1613 job = _QueuedJob.Restore(self, data) 1614 except Exception, err: # pylint: disable-msg=W0703 1615 raise errors.JobFileCorrupted(err) 1616 1617 return job
1618
1619 - def SafeLoadJobFromDisk(self, job_id):
1620 """Load the given job file from disk. 1621 1622 Given a job file, read, load and restore it in a _QueuedJob format. 1623 In case of error reading the job, it gets returned as None, and the 1624 exception is logged. 1625 1626 @type job_id: string 1627 @param job_id: job identifier 1628 @rtype: L{_QueuedJob} or None 1629 @return: either None or the job object 1630 1631 """ 1632 try: 1633 return self._LoadJobFromDisk(job_id) 1634 except (errors.JobFileCorrupted, EnvironmentError): 1635 logging.exception("Can't load/parse job %s", job_id) 1636 return None
1637 1638 @staticmethod
1639 - def _IsQueueMarkedDrain():
1640 """Check if the queue is marked from drain. 1641 1642 This currently uses the queue drain file, which makes it a 1643 per-node flag. In the future this can be moved to the config file. 1644 1645 @rtype: boolean 1646 @return: True of the job queue is marked for draining 1647 1648 """ 1649 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1650
1651 - def _UpdateQueueSizeUnlocked(self):
1652 """Update the queue size. 1653 1654 """ 1655 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1656 1657 @locking.ssynchronized(_LOCK) 1658 @_RequireOpenQueue
1659 - def SetDrainFlag(self, drain_flag):
1660 """Sets the drain flag for the queue. 1661 1662 @type drain_flag: boolean 1663 @param drain_flag: Whether to set or unset the drain flag 1664 1665 """ 1666 getents = runtime.GetEnts() 1667 1668 if drain_flag: 1669 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True, 1670 uid=getents.masterd_uid, gid=getents.masterd_gid) 1671 else: 1672 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) 1673 1674 self._drained = drain_flag 1675 1676 return True
1677 1678 @_RequireOpenQueue
1679 - def _SubmitJobUnlocked(self, job_id, ops):
1680 """Create and store a new job. 1681 1682 This enters the job into our job queue and also puts it on the new 1683 queue, in order for it to be picked up by the queue processors. 1684 1685 @type job_id: job ID 1686 @param job_id: the job ID for the new job 1687 @type ops: list 1688 @param ops: The list of OpCodes that will become the new job. 1689 @rtype: L{_QueuedJob} 1690 @return: the job object to be queued 1691 @raise errors.JobQueueDrainError: if the job queue is marked for draining 1692 @raise errors.JobQueueFull: if the job queue has too many jobs in it 1693 @raise errors.GenericError: If an opcode is not valid 1694 1695 """ 1696 # Ok when sharing the big job queue lock, as the drain file is created when 1697 # the lock is exclusive. 1698 if self._drained: 1699 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 1700 1701 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: 1702 raise errors.JobQueueFull() 1703 1704 job = _QueuedJob(self, job_id, ops) 1705 1706 # Check priority 1707 for idx, op in enumerate(job.ops): 1708 if op.priority not in constants.OP_PRIO_SUBMIT_VALID: 1709 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 1710 raise errors.GenericError("Opcode %s has invalid priority %s, allowed" 1711 " are %s" % (idx, op.priority, allowed)) 1712 1713 # Write to disk 1714 self.UpdateJobUnlocked(job) 1715 1716 self._queue_size += 1 1717 1718 logging.debug("Adding new job %s to the cache", job_id) 1719 self._memcache[job_id] = job 1720 1721 return job
1722 1723 @locking.ssynchronized(_LOCK) 1724 @_RequireOpenQueue
1725 - def SubmitJob(self, ops):
1726 """Create and store a new job. 1727 1728 @see: L{_SubmitJobUnlocked} 1729 1730 """ 1731 job_id = self._NewSerialsUnlocked(1)[0] 1732 self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)]) 1733 return job_id
1734 1735 @locking.ssynchronized(_LOCK) 1736 @_RequireOpenQueue
1737 - def SubmitManyJobs(self, jobs):
1738 """Create and store multiple jobs. 1739 1740 @see: L{_SubmitJobUnlocked} 1741 1742 """ 1743 results = [] 1744 added_jobs = [] 1745 all_job_ids = self._NewSerialsUnlocked(len(jobs)) 1746 for job_id, ops in zip(all_job_ids, jobs): 1747 try: 1748 added_jobs.append(self._SubmitJobUnlocked(job_id, ops)) 1749 status = True 1750 data = job_id 1751 except errors.GenericError, err: 1752 data = str(err) 1753 status = False 1754 results.append((status, data)) 1755 1756 self._EnqueueJobs(added_jobs) 1757 1758 return results
1759
1760 - def _EnqueueJobs(self, jobs):
1761 """Helper function to add jobs to worker pool's queue. 1762 1763 @type jobs: list 1764 @param jobs: List of all jobs 1765 1766 """ 1767 self._wpool.AddManyTasks([(job, ) for job in jobs], 1768 priority=[job.CalcPriority() for job in jobs])
1769 1770 @_RequireOpenQueue
1771 - def UpdateJobUnlocked(self, job, replicate=True):
1772 """Update a job's on disk storage. 1773 1774 After a job has been modified, this function needs to be called in 1775 order to write the changes to disk and replicate them to the other 1776 nodes. 1777 1778 @type job: L{_QueuedJob} 1779 @param job: the changed job 1780 @type replicate: boolean 1781 @param replicate: whether to replicate the change to remote nodes 1782 1783 """ 1784 if __debug__: 1785 finalized = job.CalcStatus() in constants.JOBS_FINALIZED 1786 assert (finalized ^ (job.end_timestamp is None)) 1787 1788 filename = self._GetJobPath(job.id) 1789 data = serializer.DumpJson(job.Serialize(), indent=False) 1790 logging.debug("Writing job %s to %s", job.id, filename) 1791 self._UpdateJobQueueFile(filename, data, replicate)
1792
1793 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 1794 timeout):
1795 """Waits for changes in a job. 1796 1797 @type job_id: string 1798 @param job_id: Job identifier 1799 @type fields: list of strings 1800 @param fields: Which fields to check for changes 1801 @type prev_job_info: list or None 1802 @param prev_job_info: Last job information returned 1803 @type prev_log_serial: int 1804 @param prev_log_serial: Last job message serial number 1805 @type timeout: float 1806 @param timeout: maximum time to wait in seconds 1807 @rtype: tuple (job info, log entries) 1808 @return: a tuple of the job information as required via 1809 the fields parameter, and the log entries as a list 1810 1811 if the job has not changed and the timeout has expired, 1812 we instead return a special value, 1813 L{constants.JOB_NOTCHANGED}, which should be interpreted 1814 as such by the clients 1815 1816 """ 1817 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id) 1818 1819 helper = _WaitForJobChangesHelper() 1820 1821 return helper(self._GetJobPath(job_id), load_fn, 1822 fields, prev_job_info, prev_log_serial, timeout)
1823 1824 @locking.ssynchronized(_LOCK) 1825 @_RequireOpenQueue
1826 - def CancelJob(self, job_id):
1827 """Cancels a job. 1828 1829 This will only succeed if the job has not started yet. 1830 1831 @type job_id: string 1832 @param job_id: job ID of job to be cancelled. 1833 1834 """ 1835 logging.info("Cancelling job %s", job_id) 1836 1837 job = self._LoadJobUnlocked(job_id) 1838 if not job: 1839 logging.debug("Job %s not found", job_id) 1840 return (False, "Job %s not found" % job_id) 1841 1842 (success, msg) = job.Cancel() 1843 1844 if success: 1845 # If the job was finalized (e.g. cancelled), this is the final write 1846 # allowed. The job can be archived anytime. 1847 self.UpdateJobUnlocked(job) 1848 1849 return (success, msg)
1850 1851 @_RequireOpenQueue
1852 - def _ArchiveJobsUnlocked(self, jobs):
1853 """Archives jobs. 1854 1855 @type jobs: list of L{_QueuedJob} 1856 @param jobs: Job objects 1857 @rtype: int 1858 @return: Number of archived jobs 1859 1860 """ 1861 archive_jobs = [] 1862 rename_files = [] 1863 for job in jobs: 1864 if job.CalcStatus() not in constants.JOBS_FINALIZED: 1865 logging.debug("Job %s is not yet done", job.id) 1866 continue 1867 1868 archive_jobs.append(job) 1869 1870 old = self._GetJobPath(job.id) 1871 new = self._GetArchivedJobPath(job.id) 1872 rename_files.append((old, new)) 1873 1874 # TODO: What if 1..n files fail to rename? 1875 self._RenameFilesUnlocked(rename_files) 1876 1877 logging.debug("Successfully archived job(s) %s", 1878 utils.CommaJoin(job.id for job in archive_jobs)) 1879 1880 # Since we haven't quite checked, above, if we succeeded or failed renaming 1881 # the files, we update the cached queue size from the filesystem. When we 1882 # get around to fix the TODO: above, we can use the number of actually 1883 # archived jobs to fix this. 1884 self._UpdateQueueSizeUnlocked() 1885 return len(archive_jobs)
1886 1887 @locking.ssynchronized(_LOCK) 1888 @_RequireOpenQueue
1889 - def ArchiveJob(self, job_id):
1890 """Archives a job. 1891 1892 This is just a wrapper over L{_ArchiveJobsUnlocked}. 1893 1894 @type job_id: string 1895 @param job_id: Job ID of job to be archived. 1896 @rtype: bool 1897 @return: Whether job was archived 1898 1899 """ 1900 logging.info("Archiving job %s", job_id) 1901 1902 job = self._LoadJobUnlocked(job_id) 1903 if not job: 1904 logging.debug("Job %s not found", job_id) 1905 return False 1906 1907 return self._ArchiveJobsUnlocked([job]) == 1
1908 1909 @locking.ssynchronized(_LOCK) 1910 @_RequireOpenQueue
1911 - def AutoArchiveJobs(self, age, timeout):
1912 """Archives all jobs based on age. 1913 1914 The method will archive all jobs which are older than the age 1915 parameter. For jobs that don't have an end timestamp, the start 1916 timestamp will be considered. The special '-1' age will cause 1917 archival of all jobs (that are not running or queued). 1918 1919 @type age: int 1920 @param age: the minimum age in seconds 1921 1922 """ 1923 logging.info("Archiving jobs with age more than %s seconds", age) 1924 1925 now = time.time() 1926 end_time = now + timeout 1927 archived_count = 0 1928 last_touched = 0 1929 1930 all_job_ids = self._GetJobIDsUnlocked() 1931 pending = [] 1932 for idx, job_id in enumerate(all_job_ids): 1933 last_touched = idx + 1 1934 1935 # Not optimal because jobs could be pending 1936 # TODO: Measure average duration for job archival and take number of 1937 # pending jobs into account. 1938 if time.time() > end_time: 1939 break 1940 1941 # Returns None if the job failed to load 1942 job = self._LoadJobUnlocked(job_id) 1943 if job: 1944 if job.end_timestamp is None: 1945 if job.start_timestamp is None: 1946 job_age = job.received_timestamp 1947 else: 1948 job_age = job.start_timestamp 1949 else: 1950 job_age = job.end_timestamp 1951 1952 if age == -1 or now - job_age[0] > age: 1953 pending.append(job) 1954 1955 # Archive 10 jobs at a time 1956 if len(pending) >= 10: 1957 archived_count += self._ArchiveJobsUnlocked(pending) 1958 pending = [] 1959 1960 if pending: 1961 archived_count += self._ArchiveJobsUnlocked(pending) 1962 1963 return (archived_count, len(all_job_ids) - last_touched)
1964
1965 - def QueryJobs(self, job_ids, fields):
1966 """Returns a list of jobs in queue. 1967 1968 @type job_ids: list 1969 @param job_ids: sequence of job identifiers or None for all 1970 @type fields: list 1971 @param fields: names of fields to return 1972 @rtype: list 1973 @return: list one element per job, each element being list with 1974 the requested fields 1975 1976 """ 1977 jobs = [] 1978 list_all = False 1979 if not job_ids: 1980 # Since files are added to/removed from the queue atomically, there's no 1981 # risk of getting the job ids in an inconsistent state. 1982 job_ids = self._GetJobIDsUnlocked() 1983 list_all = True 1984 1985 for job_id in job_ids: 1986 job = self.SafeLoadJobFromDisk(job_id) 1987 if job is not None: 1988 jobs.append(job.GetInfo(fields)) 1989 elif not list_all: 1990 jobs.append(None) 1991 1992 return jobs
1993 1994 @locking.ssynchronized(_LOCK) 1995 @_RequireOpenQueue
1996 - def Shutdown(self):
1997 """Stops the job queue. 1998 1999 This shutdowns all the worker threads an closes the queue. 2000 2001 """ 2002 self._wpool.TerminateWorkers() 2003 2004 self._queue_filelock.Close() 2005 self._queue_filelock = None
2006