Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the job queue handling. 
  23   
  24  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  25  used by all other classes in this module. 
  26   
  27  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  28      processing jobs 
  29   
  30  """ 
  31   
  32  import os 
  33  import logging 
  34  import errno 
  35  import re 
  36  import time 
  37  import weakref 
  38   
  39  try: 
  40    # pylint: disable-msg=E0611 
  41    from pyinotify import pyinotify 
  42  except ImportError: 
  43    import pyinotify 
  44   
  45  from ganeti import asyncnotifier 
  46  from ganeti import constants 
  47  from ganeti import serializer 
  48  from ganeti import workerpool 
  49  from ganeti import locking 
  50  from ganeti import opcodes 
  51  from ganeti import errors 
  52  from ganeti import mcpu 
  53  from ganeti import utils 
  54  from ganeti import jstore 
  55  from ganeti import rpc 
  56  from ganeti import runtime 
  57  from ganeti import netutils 
  58  from ganeti import compat 
  59   
  60   
  61  JOBQUEUE_THREADS = 25 
  62  JOBS_PER_ARCHIVE_DIRECTORY = 10000 
  63   
  64  # member lock names to be passed to @ssynchronized decorator 
  65  _LOCK = "_lock" 
  66  _QUEUE = "_queue" 
67 68 69 -class CancelJob(Exception):
70 """Special exception to cancel a job. 71 72 """
73
74 75 -def TimeStampNow():
76 """Returns the current timestamp. 77 78 @rtype: tuple 79 @return: the current time in the (seconds, microseconds) format 80 81 """ 82 return utils.SplitTime(time.time())
83
84 85 -class _QueuedOpCode(object):
86 """Encapsulates an opcode object. 87 88 @ivar log: holds the execution log and consists of tuples 89 of the form C{(log_serial, timestamp, level, message)} 90 @ivar input: the OpCode we encapsulate 91 @ivar status: the current status 92 @ivar result: the result of the LU execution 93 @ivar start_timestamp: timestamp for the start of the execution 94 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 95 @ivar stop_timestamp: timestamp for the end of the execution 96 97 """ 98 __slots__ = ["input", "status", "result", "log", "priority", 99 "start_timestamp", "exec_timestamp", "end_timestamp", 100 "__weakref__"] 101
102 - def __init__(self, op):
103 """Constructor for the _QuededOpCode. 104 105 @type op: L{opcodes.OpCode} 106 @param op: the opcode we encapsulate 107 108 """ 109 self.input = op 110 self.status = constants.OP_STATUS_QUEUED 111 self.result = None 112 self.log = [] 113 self.start_timestamp = None 114 self.exec_timestamp = None 115 self.end_timestamp = None 116 117 # Get initial priority (it might change during the lifetime of this opcode) 118 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119 120 @classmethod
121 - def Restore(cls, state):
122 """Restore the _QueuedOpCode from the serialized form. 123 124 @type state: dict 125 @param state: the serialized state 126 @rtype: _QueuedOpCode 127 @return: a new _QueuedOpCode instance 128 129 """ 130 obj = _QueuedOpCode.__new__(cls) 131 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 132 obj.status = state["status"] 133 obj.result = state["result"] 134 obj.log = state["log"] 135 obj.start_timestamp = state.get("start_timestamp", None) 136 obj.exec_timestamp = state.get("exec_timestamp", None) 137 obj.end_timestamp = state.get("end_timestamp", None) 138 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) 139 return obj
140
141 - def Serialize(self):
142 """Serializes this _QueuedOpCode. 143 144 @rtype: dict 145 @return: the dictionary holding the serialized state 146 147 """ 148 return { 149 "input": self.input.__getstate__(), 150 "status": self.status, 151 "result": self.result, 152 "log": self.log, 153 "start_timestamp": self.start_timestamp, 154 "exec_timestamp": self.exec_timestamp, 155 "end_timestamp": self.end_timestamp, 156 "priority": self.priority, 157 }
158
159 160 -class _QueuedJob(object):
161 """In-memory job representation. 162 163 This is what we use to track the user-submitted jobs. Locking must 164 be taken care of by users of this class. 165 166 @type queue: L{JobQueue} 167 @ivar queue: the parent queue 168 @ivar id: the job ID 169 @type ops: list 170 @ivar ops: the list of _QueuedOpCode that constitute the job 171 @type log_serial: int 172 @ivar log_serial: holds the index for the next log entry 173 @ivar received_timestamp: the timestamp for when the job was received 174 @ivar start_timestmap: the timestamp for start of execution 175 @ivar end_timestamp: the timestamp for end of execution 176 177 """ 178 # pylint: disable-msg=W0212 179 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", 180 "received_timestamp", "start_timestamp", "end_timestamp", 181 "__weakref__"] 182
183 - def __init__(self, queue, job_id, ops):
184 """Constructor for the _QueuedJob. 185 186 @type queue: L{JobQueue} 187 @param queue: our parent queue 188 @type job_id: job_id 189 @param job_id: our job id 190 @type ops: list 191 @param ops: the list of opcodes we hold, which will be encapsulated 192 in _QueuedOpCodes 193 194 """ 195 if not ops: 196 raise errors.GenericError("A job needs at least one opcode") 197 198 self.queue = queue 199 self.id = job_id 200 self.ops = [_QueuedOpCode(op) for op in ops] 201 self.log_serial = 0 202 self.received_timestamp = TimeStampNow() 203 self.start_timestamp = None 204 self.end_timestamp = None 205 206 self._InitInMemory(self)
207 208 @staticmethod
209 - def _InitInMemory(obj):
210 """Initializes in-memory variables. 211 212 """ 213 obj.ops_iter = None 214 obj.cur_opctx = None
215
216 - def __repr__(self):
217 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 218 "id=%s" % self.id, 219 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 220 221 return "<%s at %#x>" % (" ".join(status), id(self))
222 223 @classmethod
224 - def Restore(cls, queue, state):
225 """Restore a _QueuedJob from serialized state: 226 227 @type queue: L{JobQueue} 228 @param queue: to which queue the restored job belongs 229 @type state: dict 230 @param state: the serialized state 231 @rtype: _JobQueue 232 @return: the restored _JobQueue instance 233 234 """ 235 obj = _QueuedJob.__new__(cls) 236 obj.queue = queue 237 obj.id = state["id"] 238 obj.received_timestamp = state.get("received_timestamp", None) 239 obj.start_timestamp = state.get("start_timestamp", None) 240 obj.end_timestamp = state.get("end_timestamp", None) 241 242 obj.ops = [] 243 obj.log_serial = 0 244 for op_state in state["ops"]: 245 op = _QueuedOpCode.Restore(op_state) 246 for log_entry in op.log: 247 obj.log_serial = max(obj.log_serial, log_entry[0]) 248 obj.ops.append(op) 249 250 cls._InitInMemory(obj) 251 252 return obj
253
254 - def Serialize(self):
255 """Serialize the _JobQueue instance. 256 257 @rtype: dict 258 @return: the serialized state 259 260 """ 261 return { 262 "id": self.id, 263 "ops": [op.Serialize() for op in self.ops], 264 "start_timestamp": self.start_timestamp, 265 "end_timestamp": self.end_timestamp, 266 "received_timestamp": self.received_timestamp, 267 }
268
269 - def CalcStatus(self):
270 """Compute the status of this job. 271 272 This function iterates over all the _QueuedOpCodes in the job and 273 based on their status, computes the job status. 274 275 The algorithm is: 276 - if we find a cancelled, or finished with error, the job 277 status will be the same 278 - otherwise, the last opcode with the status one of: 279 - waitlock 280 - canceling 281 - running 282 283 will determine the job status 284 285 - otherwise, it means either all opcodes are queued, or success, 286 and the job status will be the same 287 288 @return: the job status 289 290 """ 291 status = constants.JOB_STATUS_QUEUED 292 293 all_success = True 294 for op in self.ops: 295 if op.status == constants.OP_STATUS_SUCCESS: 296 continue 297 298 all_success = False 299 300 if op.status == constants.OP_STATUS_QUEUED: 301 pass 302 elif op.status == constants.OP_STATUS_WAITLOCK: 303 status = constants.JOB_STATUS_WAITLOCK 304 elif op.status == constants.OP_STATUS_RUNNING: 305 status = constants.JOB_STATUS_RUNNING 306 elif op.status == constants.OP_STATUS_CANCELING: 307 status = constants.JOB_STATUS_CANCELING 308 break 309 elif op.status == constants.OP_STATUS_ERROR: 310 status = constants.JOB_STATUS_ERROR 311 # The whole job fails if one opcode failed 312 break 313 elif op.status == constants.OP_STATUS_CANCELED: 314 status = constants.OP_STATUS_CANCELED 315 break 316 317 if all_success: 318 status = constants.JOB_STATUS_SUCCESS 319 320 return status
321
322 - def CalcPriority(self):
323 """Gets the current priority for this job. 324 325 Only unfinished opcodes are considered. When all are done, the default 326 priority is used. 327 328 @rtype: int 329 330 """ 331 priorities = [op.priority for op in self.ops 332 if op.status not in constants.OPS_FINALIZED] 333 334 if not priorities: 335 # All opcodes are done, assume default priority 336 return constants.OP_PRIO_DEFAULT 337 338 return min(priorities)
339
340 - def GetLogEntries(self, newer_than):
341 """Selectively returns the log entries. 342 343 @type newer_than: None or int 344 @param newer_than: if this is None, return all log entries, 345 otherwise return only the log entries with serial higher 346 than this value 347 @rtype: list 348 @return: the list of the log entries selected 349 350 """ 351 if newer_than is None: 352 serial = -1 353 else: 354 serial = newer_than 355 356 entries = [] 357 for op in self.ops: 358 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 359 360 return entries
361
362 - def GetInfo(self, fields):
363 """Returns information about a job. 364 365 @type fields: list 366 @param fields: names of fields to return 367 @rtype: list 368 @return: list with one element for each field 369 @raise errors.OpExecError: when an invalid field 370 has been passed 371 372 """ 373 row = [] 374 for fname in fields: 375 if fname == "id": 376 row.append(self.id) 377 elif fname == "status": 378 row.append(self.CalcStatus()) 379 elif fname == "priority": 380 row.append(self.CalcPriority()) 381 elif fname == "ops": 382 row.append([op.input.__getstate__() for op in self.ops]) 383 elif fname == "opresult": 384 row.append([op.result for op in self.ops]) 385 elif fname == "opstatus": 386 row.append([op.status for op in self.ops]) 387 elif fname == "oplog": 388 row.append([op.log for op in self.ops]) 389 elif fname == "opstart": 390 row.append([op.start_timestamp for op in self.ops]) 391 elif fname == "opexec": 392 row.append([op.exec_timestamp for op in self.ops]) 393 elif fname == "opend": 394 row.append([op.end_timestamp for op in self.ops]) 395 elif fname == "oppriority": 396 row.append([op.priority for op in self.ops]) 397 elif fname == "received_ts": 398 row.append(self.received_timestamp) 399 elif fname == "start_ts": 400 row.append(self.start_timestamp) 401 elif fname == "end_ts": 402 row.append(self.end_timestamp) 403 elif fname == "summary": 404 row.append([op.input.Summary() for op in self.ops]) 405 else: 406 raise errors.OpExecError("Invalid self query field '%s'" % fname) 407 return row
408
409 - def MarkUnfinishedOps(self, status, result):
410 """Mark unfinished opcodes with a given status and result. 411 412 This is an utility function for marking all running or waiting to 413 be run opcodes with a given status. Opcodes which are already 414 finalised are not changed. 415 416 @param status: a given opcode status 417 @param result: the opcode result 418 419 """ 420 not_marked = True 421 for op in self.ops: 422 if op.status in constants.OPS_FINALIZED: 423 assert not_marked, "Finalized opcodes found after non-finalized ones" 424 continue 425 op.status = status 426 op.result = result 427 not_marked = False
428
429 - def Cancel(self):
430 """Marks job as canceled/-ing if possible. 431 432 @rtype: tuple; (bool, string) 433 @return: Boolean describing whether job was successfully canceled or marked 434 as canceling and a text message 435 436 """ 437 status = self.CalcStatus() 438 439 if status == constants.JOB_STATUS_QUEUED: 440 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 441 "Job canceled by request") 442 return (True, "Job %s canceled" % self.id) 443 444 elif status == constants.JOB_STATUS_WAITLOCK: 445 # The worker will notice the new status and cancel the job 446 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 447 return (True, "Job %s will be canceled" % self.id) 448 449 else: 450 logging.debug("Job %s is no longer waiting in the queue", self.id) 451 return (False, "Job %s is no longer waiting in the queue" % self.id)
452
453 454 -class _OpExecCallbacks(mcpu.OpExecCbBase):
455 - def __init__(self, queue, job, op):
456 """Initializes this class. 457 458 @type queue: L{JobQueue} 459 @param queue: Job queue 460 @type job: L{_QueuedJob} 461 @param job: Job object 462 @type op: L{_QueuedOpCode} 463 @param op: OpCode 464 465 """ 466 assert queue, "Queue is missing" 467 assert job, "Job is missing" 468 assert op, "Opcode is missing" 469 470 self._queue = queue 471 self._job = job 472 self._op = op
473
474 - def _CheckCancel(self):
475 """Raises an exception to cancel the job if asked to. 476 477 """ 478 # Cancel here if we were asked to 479 if self._op.status == constants.OP_STATUS_CANCELING: 480 logging.debug("Canceling opcode") 481 raise CancelJob()
482 483 @locking.ssynchronized(_QUEUE, shared=1)
484 - def NotifyStart(self):
485 """Mark the opcode as running, not lock-waiting. 486 487 This is called from the mcpu code as a notifier function, when the LU is 488 finally about to start the Exec() method. Of course, to have end-user 489 visible results, the opcode must be initially (before calling into 490 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. 491 492 """ 493 assert self._op in self._job.ops 494 assert self._op.status in (constants.OP_STATUS_WAITLOCK, 495 constants.OP_STATUS_CANCELING) 496 497 # Cancel here if we were asked to 498 self._CheckCancel() 499 500 logging.debug("Opcode is now running") 501 502 self._op.status = constants.OP_STATUS_RUNNING 503 self._op.exec_timestamp = TimeStampNow() 504 505 # And finally replicate the job status 506 self._queue.UpdateJobUnlocked(self._job)
507 508 @locking.ssynchronized(_QUEUE, shared=1)
509 - def _AppendFeedback(self, timestamp, log_type, log_msg):
510 """Internal feedback append function, with locks 511 512 """ 513 self._job.log_serial += 1 514 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 515 self._queue.UpdateJobUnlocked(self._job, replicate=False)
516
517 - def Feedback(self, *args):
518 """Append a log entry. 519 520 """ 521 assert len(args) < 3 522 523 if len(args) == 1: 524 log_type = constants.ELOG_MESSAGE 525 log_msg = args[0] 526 else: 527 (log_type, log_msg) = args 528 529 # The time is split to make serialization easier and not lose 530 # precision. 531 timestamp = utils.SplitTime(time.time()) 532 self._AppendFeedback(timestamp, log_type, log_msg)
533
534 - def CheckCancel(self):
535 """Check whether job has been cancelled. 536 537 """ 538 assert self._op.status in (constants.OP_STATUS_WAITLOCK, 539 constants.OP_STATUS_CANCELING) 540 541 # Cancel here if we were asked to 542 self._CheckCancel()
543
544 545 -class _JobChangesChecker(object):
546 - def __init__(self, fields, prev_job_info, prev_log_serial):
547 """Initializes this class. 548 549 @type fields: list of strings 550 @param fields: Fields requested by LUXI client 551 @type prev_job_info: string 552 @param prev_job_info: previous job info, as passed by the LUXI client 553 @type prev_log_serial: string 554 @param prev_log_serial: previous job serial, as passed by the LUXI client 555 556 """ 557 self._fields = fields 558 self._prev_job_info = prev_job_info 559 self._prev_log_serial = prev_log_serial
560
561 - def __call__(self, job):
562 """Checks whether job has changed. 563 564 @type job: L{_QueuedJob} 565 @param job: Job object 566 567 """ 568 status = job.CalcStatus() 569 job_info = job.GetInfo(self._fields) 570 log_entries = job.GetLogEntries(self._prev_log_serial) 571 572 # Serializing and deserializing data can cause type changes (e.g. from 573 # tuple to list) or precision loss. We're doing it here so that we get 574 # the same modifications as the data received from the client. Without 575 # this, the comparison afterwards might fail without the data being 576 # significantly different. 577 # TODO: we just deserialized from disk, investigate how to make sure that 578 # the job info and log entries are compatible to avoid this further step. 579 # TODO: Doing something like in testutils.py:UnifyValueType might be more 580 # efficient, though floats will be tricky 581 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 582 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 583 584 # Don't even try to wait if the job is no longer running, there will be 585 # no changes. 586 if (status not in (constants.JOB_STATUS_QUEUED, 587 constants.JOB_STATUS_RUNNING, 588 constants.JOB_STATUS_WAITLOCK) or 589 job_info != self._prev_job_info or 590 (log_entries and self._prev_log_serial != log_entries[0][0])): 591 logging.debug("Job %s changed", job.id) 592 return (job_info, log_entries) 593 594 return None
595
596 597 -class _JobFileChangesWaiter(object):
598 - def __init__(self, filename):
599 """Initializes this class. 600 601 @type filename: string 602 @param filename: Path to job file 603 @raises errors.InotifyError: if the notifier cannot be setup 604 605 """ 606 self._wm = pyinotify.WatchManager() 607 self._inotify_handler = \ 608 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) 609 self._notifier = \ 610 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) 611 try: 612 self._inotify_handler.enable() 613 except Exception: 614 # pyinotify doesn't close file descriptors automatically 615 self._notifier.stop() 616 raise
617
618 - def _OnInotify(self, notifier_enabled):
619 """Callback for inotify. 620 621 """ 622 if not notifier_enabled: 623 self._inotify_handler.enable()
624
625 - def Wait(self, timeout):
626 """Waits for the job file to change. 627 628 @type timeout: float 629 @param timeout: Timeout in seconds 630 @return: Whether there have been events 631 632 """ 633 assert timeout >= 0 634 have_events = self._notifier.check_events(timeout * 1000) 635 if have_events: 636 self._notifier.read_events() 637 self._notifier.process_events() 638 return have_events
639
640 - def Close(self):
641 """Closes underlying notifier and its file descriptor. 642 643 """ 644 self._notifier.stop()
645
646 647 -class _JobChangesWaiter(object):
648 - def __init__(self, filename):
649 """Initializes this class. 650 651 @type filename: string 652 @param filename: Path to job file 653 654 """ 655 self._filewaiter = None 656 self._filename = filename
657
658 - def Wait(self, timeout):
659 """Waits for a job to change. 660 661 @type timeout: float 662 @param timeout: Timeout in seconds 663 @return: Whether there have been events 664 665 """ 666 if self._filewaiter: 667 return self._filewaiter.Wait(timeout) 668 669 # Lazy setup: Avoid inotify setup cost when job file has already changed. 670 # If this point is reached, return immediately and let caller check the job 671 # file again in case there were changes since the last check. This avoids a 672 # race condition. 673 self._filewaiter = _JobFileChangesWaiter(self._filename) 674 675 return True
676
677 - def Close(self):
678 """Closes underlying waiter. 679 680 """ 681 if self._filewaiter: 682 self._filewaiter.Close()
683
684 685 -class _WaitForJobChangesHelper(object):
686 """Helper class using inotify to wait for changes in a job file. 687 688 This class takes a previous job status and serial, and alerts the client when 689 the current job status has changed. 690 691 """ 692 @staticmethod
693 - def _CheckForChanges(job_load_fn, check_fn):
694 job = job_load_fn() 695 if not job: 696 raise errors.JobLost() 697 698 result = check_fn(job) 699 if result is None: 700 raise utils.RetryAgain() 701 702 return result
703
704 - def __call__(self, filename, job_load_fn, 705 fields, prev_job_info, prev_log_serial, timeout):
706 """Waits for changes on a job. 707 708 @type filename: string 709 @param filename: File on which to wait for changes 710 @type job_load_fn: callable 711 @param job_load_fn: Function to load job 712 @type fields: list of strings 713 @param fields: Which fields to check for changes 714 @type prev_job_info: list or None 715 @param prev_job_info: Last job information returned 716 @type prev_log_serial: int 717 @param prev_log_serial: Last job message serial number 718 @type timeout: float 719 @param timeout: maximum time to wait in seconds 720 721 """ 722 try: 723 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) 724 waiter = _JobChangesWaiter(filename) 725 try: 726 return utils.Retry(compat.partial(self._CheckForChanges, 727 job_load_fn, check_fn), 728 utils.RETRY_REMAINING_TIME, timeout, 729 wait_fn=waiter.Wait) 730 finally: 731 waiter.Close() 732 except (errors.InotifyError, errors.JobLost): 733 return None 734 except utils.RetryTimeout: 735 return constants.JOB_NOTCHANGED
736
737 738 -def _EncodeOpError(err):
739 """Encodes an error which occurred while processing an opcode. 740 741 """ 742 if isinstance(err, errors.GenericError): 743 to_encode = err 744 else: 745 to_encode = errors.OpExecError(str(err)) 746 747 return errors.EncodeException(to_encode)
748
749 750 -class _TimeoutStrategyWrapper:
751 - def __init__(self, fn):
752 """Initializes this class. 753 754 """ 755 self._fn = fn 756 self._next = None
757
758 - def _Advance(self):
759 """Gets the next timeout if necessary. 760 761 """ 762 if self._next is None: 763 self._next = self._fn()
764
765 - def Peek(self):
766 """Returns the next timeout. 767 768 """ 769 self._Advance() 770 return self._next
771
772 - def Next(self):
773 """Returns the current timeout and advances the internal state. 774 775 """ 776 self._Advance() 777 result = self._next 778 self._next = None 779 return result
780
781 782 -class _OpExecContext:
783 - def __init__(self, op, index, log_prefix, timeout_strategy_factory):
784 """Initializes this class. 785 786 """ 787 self.op = op 788 self.index = index 789 self.log_prefix = log_prefix 790 self.summary = op.input.Summary() 791 792 self._timeout_strategy_factory = timeout_strategy_factory 793 self._ResetTimeoutStrategy()
794
795 - def _ResetTimeoutStrategy(self):
796 """Creates a new timeout strategy. 797 798 """ 799 self._timeout_strategy = \ 800 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
801
802 - def CheckPriorityIncrease(self):
803 """Checks whether priority can and should be increased. 804 805 Called when locks couldn't be acquired. 806 807 """ 808 op = self.op 809 810 # Exhausted all retries and next round should not use blocking acquire 811 # for locks? 812 if (self._timeout_strategy.Peek() is None and 813 op.priority > constants.OP_PRIO_HIGHEST): 814 logging.debug("Increasing priority") 815 op.priority -= 1 816 self._ResetTimeoutStrategy() 817 return True 818 819 return False
820
821 - def GetNextLockTimeout(self):
822 """Returns the next lock acquire timeout. 823 824 """ 825 return self._timeout_strategy.Next()
826
827 828 -class _JobProcessor(object):
829 - def __init__(self, queue, opexec_fn, job, 830 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
831 """Initializes this class. 832 833 """ 834 self.queue = queue 835 self.opexec_fn = opexec_fn 836 self.job = job 837 self._timeout_strategy_factory = _timeout_strategy_factory
838 839 @staticmethod
840 - def _FindNextOpcode(job, timeout_strategy_factory):
841 """Locates the next opcode to run. 842 843 @type job: L{_QueuedJob} 844 @param job: Job object 845 @param timeout_strategy_factory: Callable to create new timeout strategy 846 847 """ 848 # Create some sort of a cache to speed up locating next opcode for future 849 # lookups 850 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for 851 # pending and one for processed ops. 852 if job.ops_iter is None: 853 job.ops_iter = enumerate(job.ops) 854 855 # Find next opcode to run 856 while True: 857 try: 858 (idx, op) = job.ops_iter.next() 859 except StopIteration: 860 raise errors.ProgrammerError("Called for a finished job") 861 862 if op.status == constants.OP_STATUS_RUNNING: 863 # Found an opcode already marked as running 864 raise errors.ProgrammerError("Called for job marked as running") 865 866 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), 867 timeout_strategy_factory) 868 869 if op.status == constants.OP_STATUS_CANCELED: 870 # Cancelled jobs are handled by the caller 871 assert not compat.any(i.status != constants.OP_STATUS_CANCELED 872 for i in job.ops[idx:]) 873 874 elif op.status in constants.OPS_FINALIZED: 875 # This is a job that was partially completed before master daemon 876 # shutdown, so it can be expected that some opcodes are already 877 # completed successfully (if any did error out, then the whole job 878 # should have been aborted and not resubmitted for processing). 879 logging.info("%s: opcode %s already processed, skipping", 880 opctx.log_prefix, opctx.summary) 881 continue 882 883 return opctx
884 885 @staticmethod
886 - def _MarkWaitlock(job, op):
887 """Marks an opcode as waiting for locks. 888 889 The job's start timestamp is also set if necessary. 890 891 @type job: L{_QueuedJob} 892 @param job: Job object 893 @type op: L{_QueuedOpCode} 894 @param op: Opcode object 895 896 """ 897 assert op in job.ops 898 899 op.status = constants.OP_STATUS_WAITLOCK 900 op.result = None 901 op.start_timestamp = TimeStampNow() 902 903 if job.start_timestamp is None: 904 job.start_timestamp = op.start_timestamp
905
906 - def _ExecOpCodeUnlocked(self, opctx):
907 """Processes one opcode and returns the result. 908 909 """ 910 op = opctx.op 911 912 assert op.status == constants.OP_STATUS_WAITLOCK 913 914 timeout = opctx.GetNextLockTimeout() 915 916 try: 917 # Make sure not to hold queue lock while calling ExecOpCode 918 result = self.opexec_fn(op.input, 919 _OpExecCallbacks(self.queue, self.job, op), 920 timeout=timeout, priority=op.priority) 921 except mcpu.LockAcquireTimeout: 922 assert timeout is not None, "Received timeout for blocking acquire" 923 logging.debug("Couldn't acquire locks in %0.6fs", timeout) 924 925 assert op.status in (constants.OP_STATUS_WAITLOCK, 926 constants.OP_STATUS_CANCELING) 927 928 # Was job cancelled while we were waiting for the lock? 929 if op.status == constants.OP_STATUS_CANCELING: 930 return (constants.OP_STATUS_CANCELING, None) 931 932 return (constants.OP_STATUS_QUEUED, None) 933 except CancelJob: 934 logging.exception("%s: Canceling job", opctx.log_prefix) 935 assert op.status == constants.OP_STATUS_CANCELING 936 return (constants.OP_STATUS_CANCELING, None) 937 except Exception, err: # pylint: disable-msg=W0703 938 logging.exception("%s: Caught exception in %s", 939 opctx.log_prefix, opctx.summary) 940 return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) 941 else: 942 logging.debug("%s: %s successful", 943 opctx.log_prefix, opctx.summary) 944 return (constants.OP_STATUS_SUCCESS, result)
945
946 - def __call__(self, _nextop_fn=None):
947 """Continues execution of a job. 948 949 @param _nextop_fn: Callback function for tests 950 @rtype: bool 951 @return: True if job is finished, False if processor needs to be called 952 again 953 954 """ 955 queue = self.queue 956 job = self.job 957 958 logging.debug("Processing job %s", job.id) 959 960 queue.acquire(shared=1) 961 try: 962 opcount = len(job.ops) 963 964 # Is a previous opcode still pending? 965 if job.cur_opctx: 966 opctx = job.cur_opctx 967 else: 968 if __debug__ and _nextop_fn: 969 _nextop_fn() 970 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) 971 972 op = opctx.op 973 974 # Consistency check 975 assert compat.all(i.status in (constants.OP_STATUS_QUEUED, 976 constants.OP_STATUS_CANCELED) 977 for i in job.ops[opctx.index:]) 978 979 assert op.status in (constants.OP_STATUS_QUEUED, 980 constants.OP_STATUS_WAITLOCK, 981 constants.OP_STATUS_CANCELED) 982 983 assert (op.priority <= constants.OP_PRIO_LOWEST and 984 op.priority >= constants.OP_PRIO_HIGHEST) 985 986 if op.status != constants.OP_STATUS_CANCELED: 987 # Prepare to start opcode 988 self._MarkWaitlock(job, op) 989 990 assert op.status == constants.OP_STATUS_WAITLOCK 991 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK 992 993 # Write to disk 994 queue.UpdateJobUnlocked(job) 995 996 logging.info("%s: opcode %s waiting for locks", 997 opctx.log_prefix, opctx.summary) 998 999 queue.release() 1000 try: 1001 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) 1002 finally: 1003 queue.acquire(shared=1) 1004 1005 op.status = op_status 1006 op.result = op_result 1007 1008 if op.status == constants.OP_STATUS_QUEUED: 1009 # Couldn't get locks in time 1010 assert not op.end_timestamp 1011 else: 1012 # Finalize opcode 1013 op.end_timestamp = TimeStampNow() 1014 1015 if op.status == constants.OP_STATUS_CANCELING: 1016 assert not compat.any(i.status != constants.OP_STATUS_CANCELING 1017 for i in job.ops[opctx.index:]) 1018 else: 1019 assert op.status in constants.OPS_FINALIZED 1020 1021 if op.status == constants.OP_STATUS_QUEUED: 1022 finalize = False 1023 1024 opctx.CheckPriorityIncrease() 1025 1026 # Keep around for another round 1027 job.cur_opctx = opctx 1028 1029 assert (op.priority <= constants.OP_PRIO_LOWEST and 1030 op.priority >= constants.OP_PRIO_HIGHEST) 1031 1032 # In no case must the status be finalized here 1033 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED 1034 1035 queue.UpdateJobUnlocked(job) 1036 1037 else: 1038 # Ensure all opcodes so far have been successful 1039 assert (opctx.index == 0 or 1040 compat.all(i.status == constants.OP_STATUS_SUCCESS 1041 for i in job.ops[:opctx.index])) 1042 1043 # Reset context 1044 job.cur_opctx = None 1045 1046 if op.status == constants.OP_STATUS_SUCCESS: 1047 finalize = False 1048 1049 elif op.status == constants.OP_STATUS_ERROR: 1050 # Ensure failed opcode has an exception as its result 1051 assert errors.GetEncodedError(job.ops[opctx.index].result) 1052 1053 to_encode = errors.OpExecError("Preceding opcode failed") 1054 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1055 _EncodeOpError(to_encode)) 1056 finalize = True 1057 1058 # Consistency check 1059 assert compat.all(i.status == constants.OP_STATUS_ERROR and 1060 errors.GetEncodedError(i.result) 1061 for i in job.ops[opctx.index:]) 1062 1063 elif op.status == constants.OP_STATUS_CANCELING: 1064 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1065 "Job canceled by request") 1066 finalize = True 1067 1068 elif op.status == constants.OP_STATUS_CANCELED: 1069 finalize = True 1070 1071 else: 1072 raise errors.ProgrammerError("Unknown status '%s'" % op.status) 1073 1074 # Finalizing or last opcode? 1075 if finalize or opctx.index == (opcount - 1): 1076 # All opcodes have been run, finalize job 1077 job.end_timestamp = TimeStampNow() 1078 1079 # Write to disk. If the job status is final, this is the final write 1080 # allowed. Once the file has been written, it can be archived anytime. 1081 queue.UpdateJobUnlocked(job) 1082 1083 if finalize or opctx.index == (opcount - 1): 1084 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) 1085 return True 1086 1087 return False 1088 finally: 1089 queue.release()
1090
1091 1092 -class _JobQueueWorker(workerpool.BaseWorker):
1093 """The actual job workers. 1094 1095 """
1096 - def RunTask(self, job): # pylint: disable-msg=W0221
1097 """Job executor. 1098 1099 This functions processes a job. It is closely tied to the L{_QueuedJob} and 1100 L{_QueuedOpCode} classes. 1101 1102 @type job: L{_QueuedJob} 1103 @param job: the job to be processed 1104 1105 """ 1106 queue = job.queue 1107 assert queue == self.pool.queue 1108 1109 self.SetTaskName("Job%s" % job.id) 1110 1111 proc = mcpu.Processor(queue.context, job.id) 1112 1113 if not _JobProcessor(queue, proc.ExecOpCode, job)(): 1114 # Schedule again 1115 raise workerpool.DeferTask(priority=job.CalcPriority())
1116
1117 1118 -class _JobQueueWorkerPool(workerpool.WorkerPool):
1119 """Simple class implementing a job-processing workerpool. 1120 1121 """
1122 - def __init__(self, queue):
1123 super(_JobQueueWorkerPool, self).__init__("JobQueue", 1124 JOBQUEUE_THREADS, 1125 _JobQueueWorker) 1126 self.queue = queue
1127
1128 1129 -def _RequireOpenQueue(fn):
1130 """Decorator for "public" functions. 1131 1132 This function should be used for all 'public' functions. That is, 1133 functions usually called from other classes. Note that this should 1134 be applied only to methods (not plain functions), since it expects 1135 that the decorated function is called with a first argument that has 1136 a '_queue_filelock' argument. 1137 1138 @warning: Use this decorator only after locking.ssynchronized 1139 1140 Example:: 1141 @locking.ssynchronized(_LOCK) 1142 @_RequireOpenQueue 1143 def Example(self): 1144 pass 1145 1146 """ 1147 def wrapper(self, *args, **kwargs): 1148 # pylint: disable-msg=W0212 1149 assert self._queue_filelock is not None, "Queue should be open" 1150 return fn(self, *args, **kwargs)
1151 return wrapper 1152
1153 1154 -class JobQueue(object):
1155 """Queue used to manage the jobs. 1156 1157 @cvar _RE_JOB_FILE: regex matching the valid job file names 1158 1159 """ 1160 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) 1161
1162 - def __init__(self, context):
1163 """Constructor for JobQueue. 1164 1165 The constructor will initialize the job queue object and then 1166 start loading the current jobs from disk, either for starting them 1167 (if they were queue) or for aborting them (if they were already 1168 running). 1169 1170 @type context: GanetiContext 1171 @param context: the context object for access to the configuration 1172 data and other ganeti objects 1173 1174 """ 1175 self.context = context 1176 self._memcache = weakref.WeakValueDictionary() 1177 self._my_hostname = netutils.Hostname.GetSysName() 1178 1179 # The Big JobQueue lock. If a code block or method acquires it in shared 1180 # mode safe it must guarantee concurrency with all the code acquiring it in 1181 # shared mode, including itself. In order not to acquire it at all 1182 # concurrency must be guaranteed with all code acquiring it in shared mode 1183 # and all code acquiring it exclusively. 1184 self._lock = locking.SharedLock("JobQueue") 1185 1186 self.acquire = self._lock.acquire 1187 self.release = self._lock.release 1188 1189 # Initialize the queue, and acquire the filelock. 1190 # This ensures no other process is working on the job queue. 1191 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) 1192 1193 # Read serial file 1194 self._last_serial = jstore.ReadSerial() 1195 assert self._last_serial is not None, ("Serial file was modified between" 1196 " check in jstore and here") 1197 1198 # Get initial list of nodes 1199 self._nodes = dict((n.name, n.primary_ip) 1200 for n in self.context.cfg.GetAllNodesInfo().values() 1201 if n.master_candidate) 1202 1203 # Remove master node 1204 self._nodes.pop(self._my_hostname, None) 1205 1206 # TODO: Check consistency across nodes 1207 1208 self._queue_size = 0 1209 self._UpdateQueueSizeUnlocked() 1210 self._drained = self._IsQueueMarkedDrain() 1211 1212 # Setup worker pool 1213 self._wpool = _JobQueueWorkerPool(self) 1214 try: 1215 self._InspectQueue() 1216 except: 1217 self._wpool.TerminateWorkers() 1218 raise
1219 1220 @locking.ssynchronized(_LOCK) 1221 @_RequireOpenQueue
1222 - def _InspectQueue(self):
1223 """Loads the whole job queue and resumes unfinished jobs. 1224 1225 This function needs the lock here because WorkerPool.AddTask() may start a 1226 job while we're still doing our work. 1227 1228 """ 1229 logging.info("Inspecting job queue") 1230 1231 restartjobs = [] 1232 1233 all_job_ids = self._GetJobIDsUnlocked() 1234 jobs_count = len(all_job_ids) 1235 lastinfo = time.time() 1236 for idx, job_id in enumerate(all_job_ids): 1237 # Give an update every 1000 jobs or 10 seconds 1238 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or 1239 idx == (jobs_count - 1)): 1240 logging.info("Job queue inspection: %d/%d (%0.1f %%)", 1241 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) 1242 lastinfo = time.time() 1243 1244 job = self._LoadJobUnlocked(job_id) 1245 1246 # a failure in loading the job can cause 'None' to be returned 1247 if job is None: 1248 continue 1249 1250 status = job.CalcStatus() 1251 1252 if status == constants.JOB_STATUS_QUEUED: 1253 restartjobs.append(job) 1254 1255 elif status in (constants.JOB_STATUS_RUNNING, 1256 constants.JOB_STATUS_WAITLOCK, 1257 constants.JOB_STATUS_CANCELING): 1258 logging.warning("Unfinished job %s found: %s", job.id, job) 1259 1260 if status == constants.JOB_STATUS_WAITLOCK: 1261 # Restart job 1262 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) 1263 restartjobs.append(job) 1264 else: 1265 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 1266 "Unclean master daemon shutdown") 1267 1268 self.UpdateJobUnlocked(job) 1269 1270 if restartjobs: 1271 logging.info("Restarting %s jobs", len(restartjobs)) 1272 self._EnqueueJobs(restartjobs) 1273 1274 logging.info("Job queue inspection finished")
1275 1276 @locking.ssynchronized(_LOCK) 1277 @_RequireOpenQueue
1278 - def AddNode(self, node):
1279 """Register a new node with the queue. 1280 1281 @type node: L{objects.Node} 1282 @param node: the node object to be added 1283 1284 """ 1285 node_name = node.name 1286 assert node_name != self._my_hostname 1287 1288 # Clean queue directory on added node 1289 result = rpc.RpcRunner.call_jobqueue_purge(node_name) 1290 msg = result.fail_msg 1291 if msg: 1292 logging.warning("Cannot cleanup queue directory on node %s: %s", 1293 node_name, msg) 1294 1295 if not node.master_candidate: 1296 # remove if existing, ignoring errors 1297 self._nodes.pop(node_name, None) 1298 # and skip the replication of the job ids 1299 return 1300 1301 # Upload the whole queue excluding archived jobs 1302 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 1303 1304 # Upload current serial file 1305 files.append(constants.JOB_QUEUE_SERIAL_FILE) 1306 1307 for file_name in files: 1308 # Read file content 1309 content = utils.ReadFile(file_name) 1310 1311 result = rpc.RpcRunner.call_jobqueue_update([node_name], 1312 [node.primary_ip], 1313 file_name, content) 1314 msg = result[node_name].fail_msg 1315 if msg: 1316 logging.error("Failed to upload file %s to node %s: %s", 1317 file_name, node_name, msg) 1318 1319 self._nodes[node_name] = node.primary_ip
1320 1321 @locking.ssynchronized(_LOCK) 1322 @_RequireOpenQueue
1323 - def RemoveNode(self, node_name):
1324 """Callback called when removing nodes from the cluster. 1325 1326 @type node_name: str 1327 @param node_name: the name of the node to remove 1328 1329 """ 1330 self._nodes.pop(node_name, None)
1331 1332 @staticmethod
1333 - def _CheckRpcResult(result, nodes, failmsg):
1334 """Verifies the status of an RPC call. 1335 1336 Since we aim to keep consistency should this node (the current 1337 master) fail, we will log errors if our rpc fail, and especially 1338 log the case when more than half of the nodes fails. 1339 1340 @param result: the data as returned from the rpc call 1341 @type nodes: list 1342 @param nodes: the list of nodes we made the call to 1343 @type failmsg: str 1344 @param failmsg: the identifier to be used for logging 1345 1346 """ 1347 failed = [] 1348 success = [] 1349 1350 for node in nodes: 1351 msg = result[node].fail_msg 1352 if msg: 1353 failed.append(node) 1354 logging.error("RPC call %s (%s) failed on node %s: %s", 1355 result[node].call, failmsg, node, msg) 1356 else: 1357 success.append(node) 1358 1359 # +1 for the master node 1360 if (len(success) + 1) < len(failed): 1361 # TODO: Handle failing nodes 1362 logging.error("More than half of the nodes failed")
1363
1364 - def _GetNodeIp(self):
1365 """Helper for returning the node name/ip list. 1366 1367 @rtype: (list, list) 1368 @return: a tuple of two lists, the first one with the node 1369 names and the second one with the node addresses 1370 1371 """ 1372 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1373 name_list = self._nodes.keys() 1374 addr_list = [self._nodes[name] for name in name_list] 1375 return name_list, addr_list
1376
1377 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1378 """Writes a file locally and then replicates it to all nodes. 1379 1380 This function will replace the contents of a file on the local 1381 node and then replicate it to all the other nodes we have. 1382 1383 @type file_name: str 1384 @param file_name: the path of the file to be replicated 1385 @type data: str 1386 @param data: the new contents of the file 1387 @type replicate: boolean 1388 @param replicate: whether to spread the changes to the remote nodes 1389 1390 """ 1391 getents = runtime.GetEnts() 1392 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, 1393 gid=getents.masterd_gid) 1394 1395 if replicate: 1396 names, addrs = self._GetNodeIp() 1397 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) 1398 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1399
1400 - def _RenameFilesUnlocked(self, rename):
1401 """Renames a file locally and then replicate the change. 1402 1403 This function will rename a file in the local queue directory 1404 and then replicate this rename to all the other nodes we have. 1405 1406 @type rename: list of (old, new) 1407 @param rename: List containing tuples mapping old to new names 1408 1409 """ 1410 # Rename them locally 1411 for old, new in rename: 1412 utils.RenameFile(old, new, mkdir=True) 1413 1414 # ... and on all nodes 1415 names, addrs = self._GetNodeIp() 1416 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) 1417 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1418 1419 @staticmethod
1420 - def _FormatJobID(job_id):
1421 """Convert a job ID to string format. 1422 1423 Currently this just does C{str(job_id)} after performing some 1424 checks, but if we want to change the job id format this will 1425 abstract this change. 1426 1427 @type job_id: int or long 1428 @param job_id: the numeric job id 1429 @rtype: str 1430 @return: the formatted job id 1431 1432 """ 1433 if not isinstance(job_id, (int, long)): 1434 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) 1435 if job_id < 0: 1436 raise errors.ProgrammerError("Job ID %s is negative" % job_id) 1437 1438 return str(job_id)
1439 1440 @classmethod
1441 - def _GetArchiveDirectory(cls, job_id):
1442 """Returns the archive directory for a job. 1443 1444 @type job_id: str 1445 @param job_id: Job identifier 1446 @rtype: str 1447 @return: Directory name 1448 1449 """ 1450 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1451
1452 - def _NewSerialsUnlocked(self, count):
1453 """Generates a new job identifier. 1454 1455 Job identifiers are unique during the lifetime of a cluster. 1456 1457 @type count: integer 1458 @param count: how many serials to return 1459 @rtype: str 1460 @return: a string representing the job identifier. 1461 1462 """ 1463 assert count > 0 1464 # New number 1465 serial = self._last_serial + count 1466 1467 # Write to file 1468 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE, 1469 "%s\n" % serial, True) 1470 1471 result = [self._FormatJobID(v) 1472 for v in range(self._last_serial, serial + 1)] 1473 # Keep it only if we were able to write the file 1474 self._last_serial = serial 1475 1476 return result
1477 1478 @staticmethod
1479 - def _GetJobPath(job_id):
1480 """Returns the job file for a given job id. 1481 1482 @type job_id: str 1483 @param job_id: the job identifier 1484 @rtype: str 1485 @return: the path to the job file 1486 1487 """ 1488 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1489 1490 @classmethod
1491 - def _GetArchivedJobPath(cls, job_id):
1492 """Returns the archived job file for a give job id. 1493 1494 @type job_id: str 1495 @param job_id: the job identifier 1496 @rtype: str 1497 @return: the path to the archived job file 1498 1499 """ 1500 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, 1501 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1502
1503 - def _GetJobIDsUnlocked(self, sort=True):
1504 """Return all known job IDs. 1505 1506 The method only looks at disk because it's a requirement that all 1507 jobs are present on disk (so in the _memcache we don't have any 1508 extra IDs). 1509 1510 @type sort: boolean 1511 @param sort: perform sorting on the returned job ids 1512 @rtype: list 1513 @return: the list of job IDs 1514 1515 """ 1516 jlist = [] 1517 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR): 1518 m = self._RE_JOB_FILE.match(filename) 1519 if m: 1520 jlist.append(m.group(1)) 1521 if sort: 1522 jlist = utils.NiceSort(jlist) 1523 return jlist
1524
1525 - def _LoadJobUnlocked(self, job_id):
1526 """Loads a job from the disk or memory. 1527 1528 Given a job id, this will return the cached job object if 1529 existing, or try to load the job from the disk. If loading from 1530 disk, it will also add the job to the cache. 1531 1532 @param job_id: the job id 1533 @rtype: L{_QueuedJob} or None 1534 @return: either None or the job object 1535 1536 """ 1537 job = self._memcache.get(job_id, None) 1538 if job: 1539 logging.debug("Found job %s in memcache", job_id) 1540 return job 1541 1542 try: 1543 job = self._LoadJobFromDisk(job_id) 1544 if job is None: 1545 return job 1546 except errors.JobFileCorrupted: 1547 old_path = self._GetJobPath(job_id) 1548 new_path = self._GetArchivedJobPath(job_id) 1549 if old_path == new_path: 1550 # job already archived (future case) 1551 logging.exception("Can't parse job %s", job_id) 1552 else: 1553 # non-archived case 1554 logging.exception("Can't parse job %s, will archive.", job_id) 1555 self._RenameFilesUnlocked([(old_path, new_path)]) 1556 return None 1557 1558 self._memcache[job_id] = job 1559 logging.debug("Added job %s to the cache", job_id) 1560 return job
1561
1562 - def _LoadJobFromDisk(self, job_id):
1563 """Load the given job file from disk. 1564 1565 Given a job file, read, load and restore it in a _QueuedJob format. 1566 1567 @type job_id: string 1568 @param job_id: job identifier 1569 @rtype: L{_QueuedJob} or None 1570 @return: either None or the job object 1571 1572 """ 1573 filepath = self._GetJobPath(job_id) 1574 logging.debug("Loading job from %s", filepath) 1575 try: 1576 raw_data = utils.ReadFile(filepath) 1577 except EnvironmentError, err: 1578 if err.errno in (errno.ENOENT, ): 1579 return None 1580 raise 1581 1582 try: 1583 data = serializer.LoadJson(raw_data) 1584 job = _QueuedJob.Restore(self, data) 1585 except Exception, err: # pylint: disable-msg=W0703 1586 raise errors.JobFileCorrupted(err) 1587 1588 return job
1589
1590 - def SafeLoadJobFromDisk(self, job_id):
1591 """Load the given job file from disk. 1592 1593 Given a job file, read, load and restore it in a _QueuedJob format. 1594 In case of error reading the job, it gets returned as None, and the 1595 exception is logged. 1596 1597 @type job_id: string 1598 @param job_id: job identifier 1599 @rtype: L{_QueuedJob} or None 1600 @return: either None or the job object 1601 1602 """ 1603 try: 1604 return self._LoadJobFromDisk(job_id) 1605 except (errors.JobFileCorrupted, EnvironmentError): 1606 logging.exception("Can't load/parse job %s", job_id) 1607 return None
1608 1609 @staticmethod
1610 - def _IsQueueMarkedDrain():
1611 """Check if the queue is marked from drain. 1612 1613 This currently uses the queue drain file, which makes it a 1614 per-node flag. In the future this can be moved to the config file. 1615 1616 @rtype: boolean 1617 @return: True of the job queue is marked for draining 1618 1619 """ 1620 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1621
1622 - def _UpdateQueueSizeUnlocked(self):
1623 """Update the queue size. 1624 1625 """ 1626 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1627 1628 @locking.ssynchronized(_LOCK) 1629 @_RequireOpenQueue
1630 - def SetDrainFlag(self, drain_flag):
1631 """Sets the drain flag for the queue. 1632 1633 @type drain_flag: boolean 1634 @param drain_flag: Whether to set or unset the drain flag 1635 1636 """ 1637 getents = runtime.GetEnts() 1638 1639 if drain_flag: 1640 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True, 1641 uid=getents.masterd_uid, gid=getents.masterd_gid) 1642 else: 1643 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) 1644 1645 self._drained = drain_flag 1646 1647 return True
1648 1649 @_RequireOpenQueue
1650 - def _SubmitJobUnlocked(self, job_id, ops):
1651 """Create and store a new job. 1652 1653 This enters the job into our job queue and also puts it on the new 1654 queue, in order for it to be picked up by the queue processors. 1655 1656 @type job_id: job ID 1657 @param job_id: the job ID for the new job 1658 @type ops: list 1659 @param ops: The list of OpCodes that will become the new job. 1660 @rtype: L{_QueuedJob} 1661 @return: the job object to be queued 1662 @raise errors.JobQueueDrainError: if the job queue is marked for draining 1663 @raise errors.JobQueueFull: if the job queue has too many jobs in it 1664 @raise errors.GenericError: If an opcode is not valid 1665 1666 """ 1667 # Ok when sharing the big job queue lock, as the drain file is created when 1668 # the lock is exclusive. 1669 if self._drained: 1670 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 1671 1672 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: 1673 raise errors.JobQueueFull() 1674 1675 job = _QueuedJob(self, job_id, ops) 1676 1677 # Check priority 1678 for idx, op in enumerate(job.ops): 1679 if op.priority not in constants.OP_PRIO_SUBMIT_VALID: 1680 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) 1681 raise errors.GenericError("Opcode %s has invalid priority %s, allowed" 1682 " are %s" % (idx, op.priority, allowed)) 1683 1684 # Write to disk 1685 self.UpdateJobUnlocked(job) 1686 1687 self._queue_size += 1 1688 1689 logging.debug("Adding new job %s to the cache", job_id) 1690 self._memcache[job_id] = job 1691 1692 return job
1693 1694 @locking.ssynchronized(_LOCK) 1695 @_RequireOpenQueue
1696 - def SubmitJob(self, ops):
1697 """Create and store a new job. 1698 1699 @see: L{_SubmitJobUnlocked} 1700 1701 """ 1702 job_id = self._NewSerialsUnlocked(1)[0] 1703 self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)]) 1704 return job_id
1705 1706 @locking.ssynchronized(_LOCK) 1707 @_RequireOpenQueue
1708 - def SubmitManyJobs(self, jobs):
1709 """Create and store multiple jobs. 1710 1711 @see: L{_SubmitJobUnlocked} 1712 1713 """ 1714 results = [] 1715 added_jobs = [] 1716 all_job_ids = self._NewSerialsUnlocked(len(jobs)) 1717 for job_id, ops in zip(all_job_ids, jobs): 1718 try: 1719 added_jobs.append(self._SubmitJobUnlocked(job_id, ops)) 1720 status = True 1721 data = job_id 1722 except errors.GenericError, err: 1723 data = str(err) 1724 status = False 1725 results.append((status, data)) 1726 1727 self._EnqueueJobs(added_jobs) 1728 1729 return results
1730
1731 - def _EnqueueJobs(self, jobs):
1732 """Helper function to add jobs to worker pool's queue. 1733 1734 @type jobs: list 1735 @param jobs: List of all jobs 1736 1737 """ 1738 self._wpool.AddManyTasks([(job, ) for job in jobs], 1739 priority=[job.CalcPriority() for job in jobs])
1740 1741 @_RequireOpenQueue
1742 - def UpdateJobUnlocked(self, job, replicate=True):
1743 """Update a job's on disk storage. 1744 1745 After a job has been modified, this function needs to be called in 1746 order to write the changes to disk and replicate them to the other 1747 nodes. 1748 1749 @type job: L{_QueuedJob} 1750 @param job: the changed job 1751 @type replicate: boolean 1752 @param replicate: whether to replicate the change to remote nodes 1753 1754 """ 1755 filename = self._GetJobPath(job.id) 1756 data = serializer.DumpJson(job.Serialize(), indent=False) 1757 logging.debug("Writing job %s to %s", job.id, filename) 1758 self._UpdateJobQueueFile(filename, data, replicate)
1759
1760 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 1761 timeout):
1762 """Waits for changes in a job. 1763 1764 @type job_id: string 1765 @param job_id: Job identifier 1766 @type fields: list of strings 1767 @param fields: Which fields to check for changes 1768 @type prev_job_info: list or None 1769 @param prev_job_info: Last job information returned 1770 @type prev_log_serial: int 1771 @param prev_log_serial: Last job message serial number 1772 @type timeout: float 1773 @param timeout: maximum time to wait in seconds 1774 @rtype: tuple (job info, log entries) 1775 @return: a tuple of the job information as required via 1776 the fields parameter, and the log entries as a list 1777 1778 if the job has not changed and the timeout has expired, 1779 we instead return a special value, 1780 L{constants.JOB_NOTCHANGED}, which should be interpreted 1781 as such by the clients 1782 1783 """ 1784 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id) 1785 1786 helper = _WaitForJobChangesHelper() 1787 1788 return helper(self._GetJobPath(job_id), load_fn, 1789 fields, prev_job_info, prev_log_serial, timeout)
1790 1791 @locking.ssynchronized(_LOCK) 1792 @_RequireOpenQueue
1793 - def CancelJob(self, job_id):
1794 """Cancels a job. 1795 1796 This will only succeed if the job has not started yet. 1797 1798 @type job_id: string 1799 @param job_id: job ID of job to be cancelled. 1800 1801 """ 1802 logging.info("Cancelling job %s", job_id) 1803 1804 job = self._LoadJobUnlocked(job_id) 1805 if not job: 1806 logging.debug("Job %s not found", job_id) 1807 return (False, "Job %s not found" % job_id) 1808 1809 (success, msg) = job.Cancel() 1810 1811 if success: 1812 self.UpdateJobUnlocked(job) 1813 1814 return (success, msg)
1815 1816 @_RequireOpenQueue
1817 - def _ArchiveJobsUnlocked(self, jobs):
1818 """Archives jobs. 1819 1820 @type jobs: list of L{_QueuedJob} 1821 @param jobs: Job objects 1822 @rtype: int 1823 @return: Number of archived jobs 1824 1825 """ 1826 archive_jobs = [] 1827 rename_files = [] 1828 for job in jobs: 1829 if job.CalcStatus() not in constants.JOBS_FINALIZED: 1830 logging.debug("Job %s is not yet done", job.id) 1831 continue 1832 1833 archive_jobs.append(job) 1834 1835 old = self._GetJobPath(job.id) 1836 new = self._GetArchivedJobPath(job.id) 1837 rename_files.append((old, new)) 1838 1839 # TODO: What if 1..n files fail to rename? 1840 self._RenameFilesUnlocked(rename_files) 1841 1842 logging.debug("Successfully archived job(s) %s", 1843 utils.CommaJoin(job.id for job in archive_jobs)) 1844 1845 # Since we haven't quite checked, above, if we succeeded or failed renaming 1846 # the files, we update the cached queue size from the filesystem. When we 1847 # get around to fix the TODO: above, we can use the number of actually 1848 # archived jobs to fix this. 1849 self._UpdateQueueSizeUnlocked() 1850 return len(archive_jobs)
1851 1852 @locking.ssynchronized(_LOCK) 1853 @_RequireOpenQueue
1854 - def ArchiveJob(self, job_id):
1855 """Archives a job. 1856 1857 This is just a wrapper over L{_ArchiveJobsUnlocked}. 1858 1859 @type job_id: string 1860 @param job_id: Job ID of job to be archived. 1861 @rtype: bool 1862 @return: Whether job was archived 1863 1864 """ 1865 logging.info("Archiving job %s", job_id) 1866 1867 job = self._LoadJobUnlocked(job_id) 1868 if not job: 1869 logging.debug("Job %s not found", job_id) 1870 return False 1871 1872 return self._ArchiveJobsUnlocked([job]) == 1
1873 1874 @locking.ssynchronized(_LOCK) 1875 @_RequireOpenQueue
1876 - def AutoArchiveJobs(self, age, timeout):
1877 """Archives all jobs based on age. 1878 1879 The method will archive all jobs which are older than the age 1880 parameter. For jobs that don't have an end timestamp, the start 1881 timestamp will be considered. The special '-1' age will cause 1882 archival of all jobs (that are not running or queued). 1883 1884 @type age: int 1885 @param age: the minimum age in seconds 1886 1887 """ 1888 logging.info("Archiving jobs with age more than %s seconds", age) 1889 1890 now = time.time() 1891 end_time = now + timeout 1892 archived_count = 0 1893 last_touched = 0 1894 1895 all_job_ids = self._GetJobIDsUnlocked() 1896 pending = [] 1897 for idx, job_id in enumerate(all_job_ids): 1898 last_touched = idx + 1 1899 1900 # Not optimal because jobs could be pending 1901 # TODO: Measure average duration for job archival and take number of 1902 # pending jobs into account. 1903 if time.time() > end_time: 1904 break 1905 1906 # Returns None if the job failed to load 1907 job = self._LoadJobUnlocked(job_id) 1908 if job: 1909 if job.end_timestamp is None: 1910 if job.start_timestamp is None: 1911 job_age = job.received_timestamp 1912 else: 1913 job_age = job.start_timestamp 1914 else: 1915 job_age = job.end_timestamp 1916 1917 if age == -1 or now - job_age[0] > age: 1918 pending.append(job) 1919 1920 # Archive 10 jobs at a time 1921 if len(pending) >= 10: 1922 archived_count += self._ArchiveJobsUnlocked(pending) 1923 pending = [] 1924 1925 if pending: 1926 archived_count += self._ArchiveJobsUnlocked(pending) 1927 1928 return (archived_count, len(all_job_ids) - last_touched)
1929
1930 - def QueryJobs(self, job_ids, fields):
1931 """Returns a list of jobs in queue. 1932 1933 @type job_ids: list 1934 @param job_ids: sequence of job identifiers or None for all 1935 @type fields: list 1936 @param fields: names of fields to return 1937 @rtype: list 1938 @return: list one element per job, each element being list with 1939 the requested fields 1940 1941 """ 1942 jobs = [] 1943 list_all = False 1944 if not job_ids: 1945 # Since files are added to/removed from the queue atomically, there's no 1946 # risk of getting the job ids in an inconsistent state. 1947 job_ids = self._GetJobIDsUnlocked() 1948 list_all = True 1949 1950 for job_id in job_ids: 1951 job = self.SafeLoadJobFromDisk(job_id) 1952 if job is not None: 1953 jobs.append(job.GetInfo(fields)) 1954 elif not list_all: 1955 jobs.append(None) 1956 1957 return jobs
1958 1959 @locking.ssynchronized(_LOCK) 1960 @_RequireOpenQueue
1961 - def Shutdown(self):
1962 """Stops the job queue. 1963 1964 This shutdowns all the worker threads an closes the queue. 1965 1966 """ 1967 self._wpool.TerminateWorkers() 1968 1969 self._queue_filelock.Close() 1970 self._queue_filelock = None
1971