Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the job queue handling. 
  23   
  24  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  25  used by all other classes in this module. 
  26   
  27  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  28      processing jobs 
  29   
  30  """ 
  31   
  32  import os 
  33  import logging 
  34  import errno 
  35  import re 
  36  import time 
  37  import weakref 
  38   
  39  try: 
  40    # pylint: disable-msg=E0611 
  41    from pyinotify import pyinotify 
  42  except ImportError: 
  43    import pyinotify 
  44   
  45  from ganeti import asyncnotifier 
  46  from ganeti import constants 
  47  from ganeti import serializer 
  48  from ganeti import workerpool 
  49  from ganeti import locking 
  50  from ganeti import opcodes 
  51  from ganeti import errors 
  52  from ganeti import mcpu 
  53  from ganeti import utils 
  54  from ganeti import jstore 
  55  from ganeti import rpc 
  56  from ganeti import netutils 
  57  from ganeti import compat 
  58   
  59   
  60  JOBQUEUE_THREADS = 25 
  61  JOBS_PER_ARCHIVE_DIRECTORY = 10000 
  62   
  63  # member lock names to be passed to @ssynchronized decorator 
  64  _LOCK = "_lock" 
  65  _QUEUE = "_queue" 
66 67 68 -class CancelJob(Exception):
69 """Special exception to cancel a job. 70 71 """
72
73 74 -def TimeStampNow():
75 """Returns the current timestamp. 76 77 @rtype: tuple 78 @return: the current time in the (seconds, microseconds) format 79 80 """ 81 return utils.SplitTime(time.time())
82
83 84 -class _QueuedOpCode(object):
85 """Encapsulates an opcode object. 86 87 @ivar log: holds the execution log and consists of tuples 88 of the form C{(log_serial, timestamp, level, message)} 89 @ivar input: the OpCode we encapsulate 90 @ivar status: the current status 91 @ivar result: the result of the LU execution 92 @ivar start_timestamp: timestamp for the start of the execution 93 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 94 @ivar stop_timestamp: timestamp for the end of the execution 95 96 """ 97 __slots__ = ["input", "status", "result", "log", 98 "start_timestamp", "exec_timestamp", "end_timestamp", 99 "__weakref__"] 100
101 - def __init__(self, op):
102 """Constructor for the _QuededOpCode. 103 104 @type op: L{opcodes.OpCode} 105 @param op: the opcode we encapsulate 106 107 """ 108 self.input = op 109 self.status = constants.OP_STATUS_QUEUED 110 self.result = None 111 self.log = [] 112 self.start_timestamp = None 113 self.exec_timestamp = None 114 self.end_timestamp = None
115 116 @classmethod
117 - def Restore(cls, state):
118 """Restore the _QueuedOpCode from the serialized form. 119 120 @type state: dict 121 @param state: the serialized state 122 @rtype: _QueuedOpCode 123 @return: a new _QueuedOpCode instance 124 125 """ 126 obj = _QueuedOpCode.__new__(cls) 127 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 128 obj.status = state["status"] 129 obj.result = state["result"] 130 obj.log = state["log"] 131 obj.start_timestamp = state.get("start_timestamp", None) 132 obj.exec_timestamp = state.get("exec_timestamp", None) 133 obj.end_timestamp = state.get("end_timestamp", None) 134 return obj
135
136 - def Serialize(self):
137 """Serializes this _QueuedOpCode. 138 139 @rtype: dict 140 @return: the dictionary holding the serialized state 141 142 """ 143 return { 144 "input": self.input.__getstate__(), 145 "status": self.status, 146 "result": self.result, 147 "log": self.log, 148 "start_timestamp": self.start_timestamp, 149 "exec_timestamp": self.exec_timestamp, 150 "end_timestamp": self.end_timestamp, 151 }
152
153 154 -class _QueuedJob(object):
155 """In-memory job representation. 156 157 This is what we use to track the user-submitted jobs. Locking must 158 be taken care of by users of this class. 159 160 @type queue: L{JobQueue} 161 @ivar queue: the parent queue 162 @ivar id: the job ID 163 @type ops: list 164 @ivar ops: the list of _QueuedOpCode that constitute the job 165 @type log_serial: int 166 @ivar log_serial: holds the index for the next log entry 167 @ivar received_timestamp: the timestamp for when the job was received 168 @ivar start_timestmap: the timestamp for start of execution 169 @ivar end_timestamp: the timestamp for end of execution 170 171 """ 172 # pylint: disable-msg=W0212 173 __slots__ = ["queue", "id", "ops", "log_serial", 174 "received_timestamp", "start_timestamp", "end_timestamp", 175 "__weakref__"] 176
177 - def __init__(self, queue, job_id, ops):
178 """Constructor for the _QueuedJob. 179 180 @type queue: L{JobQueue} 181 @param queue: our parent queue 182 @type job_id: job_id 183 @param job_id: our job id 184 @type ops: list 185 @param ops: the list of opcodes we hold, which will be encapsulated 186 in _QueuedOpCodes 187 188 """ 189 if not ops: 190 raise errors.GenericError("A job needs at least one opcode") 191 192 self.queue = queue 193 self.id = job_id 194 self.ops = [_QueuedOpCode(op) for op in ops] 195 self.log_serial = 0 196 self.received_timestamp = TimeStampNow() 197 self.start_timestamp = None 198 self.end_timestamp = None
199
200 - def __repr__(self):
201 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 202 "id=%s" % self.id, 203 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 204 205 return "<%s at %#x>" % (" ".join(status), id(self))
206 207 @classmethod
208 - def Restore(cls, queue, state):
209 """Restore a _QueuedJob from serialized state: 210 211 @type queue: L{JobQueue} 212 @param queue: to which queue the restored job belongs 213 @type state: dict 214 @param state: the serialized state 215 @rtype: _JobQueue 216 @return: the restored _JobQueue instance 217 218 """ 219 obj = _QueuedJob.__new__(cls) 220 obj.queue = queue 221 obj.id = state["id"] 222 obj.received_timestamp = state.get("received_timestamp", None) 223 obj.start_timestamp = state.get("start_timestamp", None) 224 obj.end_timestamp = state.get("end_timestamp", None) 225 226 obj.ops = [] 227 obj.log_serial = 0 228 for op_state in state["ops"]: 229 op = _QueuedOpCode.Restore(op_state) 230 for log_entry in op.log: 231 obj.log_serial = max(obj.log_serial, log_entry[0]) 232 obj.ops.append(op) 233 234 return obj
235
236 - def Serialize(self):
237 """Serialize the _JobQueue instance. 238 239 @rtype: dict 240 @return: the serialized state 241 242 """ 243 return { 244 "id": self.id, 245 "ops": [op.Serialize() for op in self.ops], 246 "start_timestamp": self.start_timestamp, 247 "end_timestamp": self.end_timestamp, 248 "received_timestamp": self.received_timestamp, 249 }
250
251 - def CalcStatus(self):
252 """Compute the status of this job. 253 254 This function iterates over all the _QueuedOpCodes in the job and 255 based on their status, computes the job status. 256 257 The algorithm is: 258 - if we find a cancelled, or finished with error, the job 259 status will be the same 260 - otherwise, the last opcode with the status one of: 261 - waitlock 262 - canceling 263 - running 264 265 will determine the job status 266 267 - otherwise, it means either all opcodes are queued, or success, 268 and the job status will be the same 269 270 @return: the job status 271 272 """ 273 status = constants.JOB_STATUS_QUEUED 274 275 all_success = True 276 for op in self.ops: 277 if op.status == constants.OP_STATUS_SUCCESS: 278 continue 279 280 all_success = False 281 282 if op.status == constants.OP_STATUS_QUEUED: 283 pass 284 elif op.status == constants.OP_STATUS_WAITLOCK: 285 status = constants.JOB_STATUS_WAITLOCK 286 elif op.status == constants.OP_STATUS_RUNNING: 287 status = constants.JOB_STATUS_RUNNING 288 elif op.status == constants.OP_STATUS_CANCELING: 289 status = constants.JOB_STATUS_CANCELING 290 break 291 elif op.status == constants.OP_STATUS_ERROR: 292 status = constants.JOB_STATUS_ERROR 293 # The whole job fails if one opcode failed 294 break 295 elif op.status == constants.OP_STATUS_CANCELED: 296 status = constants.OP_STATUS_CANCELED 297 break 298 299 if all_success: 300 status = constants.JOB_STATUS_SUCCESS 301 302 return status
303
304 - def GetLogEntries(self, newer_than):
305 """Selectively returns the log entries. 306 307 @type newer_than: None or int 308 @param newer_than: if this is None, return all log entries, 309 otherwise return only the log entries with serial higher 310 than this value 311 @rtype: list 312 @return: the list of the log entries selected 313 314 """ 315 if newer_than is None: 316 serial = -1 317 else: 318 serial = newer_than 319 320 entries = [] 321 for op in self.ops: 322 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 323 324 return entries
325
326 - def GetInfo(self, fields):
327 """Returns information about a job. 328 329 @type fields: list 330 @param fields: names of fields to return 331 @rtype: list 332 @return: list with one element for each field 333 @raise errors.OpExecError: when an invalid field 334 has been passed 335 336 """ 337 row = [] 338 for fname in fields: 339 if fname == "id": 340 row.append(self.id) 341 elif fname == "status": 342 row.append(self.CalcStatus()) 343 elif fname == "ops": 344 row.append([op.input.__getstate__() for op in self.ops]) 345 elif fname == "opresult": 346 row.append([op.result for op in self.ops]) 347 elif fname == "opstatus": 348 row.append([op.status for op in self.ops]) 349 elif fname == "oplog": 350 row.append([op.log for op in self.ops]) 351 elif fname == "opstart": 352 row.append([op.start_timestamp for op in self.ops]) 353 elif fname == "opexec": 354 row.append([op.exec_timestamp for op in self.ops]) 355 elif fname == "opend": 356 row.append([op.end_timestamp for op in self.ops]) 357 elif fname == "received_ts": 358 row.append(self.received_timestamp) 359 elif fname == "start_ts": 360 row.append(self.start_timestamp) 361 elif fname == "end_ts": 362 row.append(self.end_timestamp) 363 elif fname == "summary": 364 row.append([op.input.Summary() for op in self.ops]) 365 else: 366 raise errors.OpExecError("Invalid self query field '%s'" % fname) 367 return row
368
369 - def MarkUnfinishedOps(self, status, result):
370 """Mark unfinished opcodes with a given status and result. 371 372 This is an utility function for marking all running or waiting to 373 be run opcodes with a given status. Opcodes which are already 374 finalised are not changed. 375 376 @param status: a given opcode status 377 @param result: the opcode result 378 379 """ 380 not_marked = True 381 for op in self.ops: 382 if op.status in constants.OPS_FINALIZED: 383 assert not_marked, "Finalized opcodes found after non-finalized ones" 384 continue 385 op.status = status 386 op.result = result 387 not_marked = False
388
389 390 -class _OpExecCallbacks(mcpu.OpExecCbBase):
391 - def __init__(self, queue, job, op):
392 """Initializes this class. 393 394 @type queue: L{JobQueue} 395 @param queue: Job queue 396 @type job: L{_QueuedJob} 397 @param job: Job object 398 @type op: L{_QueuedOpCode} 399 @param op: OpCode 400 401 """ 402 assert queue, "Queue is missing" 403 assert job, "Job is missing" 404 assert op, "Opcode is missing" 405 406 self._queue = queue 407 self._job = job 408 self._op = op
409
410 - def _CheckCancel(self):
411 """Raises an exception to cancel the job if asked to. 412 413 """ 414 # Cancel here if we were asked to 415 if self._op.status == constants.OP_STATUS_CANCELING: 416 logging.debug("Canceling opcode") 417 raise CancelJob()
418 419 @locking.ssynchronized(_QUEUE, shared=1)
420 - def NotifyStart(self):
421 """Mark the opcode as running, not lock-waiting. 422 423 This is called from the mcpu code as a notifier function, when the LU is 424 finally about to start the Exec() method. Of course, to have end-user 425 visible results, the opcode must be initially (before calling into 426 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. 427 428 """ 429 assert self._op in self._job.ops 430 assert self._op.status in (constants.OP_STATUS_WAITLOCK, 431 constants.OP_STATUS_CANCELING) 432 433 # Cancel here if we were asked to 434 self._CheckCancel() 435 436 logging.debug("Opcode is now running") 437 438 self._op.status = constants.OP_STATUS_RUNNING 439 self._op.exec_timestamp = TimeStampNow() 440 441 # And finally replicate the job status 442 self._queue.UpdateJobUnlocked(self._job)
443 444 @locking.ssynchronized(_QUEUE, shared=1)
445 - def _AppendFeedback(self, timestamp, log_type, log_msg):
446 """Internal feedback append function, with locks 447 448 """ 449 self._job.log_serial += 1 450 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 451 self._queue.UpdateJobUnlocked(self._job, replicate=False)
452
453 - def Feedback(self, *args):
454 """Append a log entry. 455 456 """ 457 assert len(args) < 3 458 459 if len(args) == 1: 460 log_type = constants.ELOG_MESSAGE 461 log_msg = args[0] 462 else: 463 (log_type, log_msg) = args 464 465 # The time is split to make serialization easier and not lose 466 # precision. 467 timestamp = utils.SplitTime(time.time()) 468 self._AppendFeedback(timestamp, log_type, log_msg)
469
470 - def CheckCancel(self):
471 """Check whether job has been cancelled. 472 473 """ 474 assert self._op.status in (constants.OP_STATUS_WAITLOCK, 475 constants.OP_STATUS_CANCELING) 476 477 # Cancel here if we were asked to 478 self._CheckCancel()
479
480 481 -class _JobChangesChecker(object):
482 - def __init__(self, fields, prev_job_info, prev_log_serial):
483 """Initializes this class. 484 485 @type fields: list of strings 486 @param fields: Fields requested by LUXI client 487 @type prev_job_info: string 488 @param prev_job_info: previous job info, as passed by the LUXI client 489 @type prev_log_serial: string 490 @param prev_log_serial: previous job serial, as passed by the LUXI client 491 492 """ 493 self._fields = fields 494 self._prev_job_info = prev_job_info 495 self._prev_log_serial = prev_log_serial
496
497 - def __call__(self, job):
498 """Checks whether job has changed. 499 500 @type job: L{_QueuedJob} 501 @param job: Job object 502 503 """ 504 status = job.CalcStatus() 505 job_info = job.GetInfo(self._fields) 506 log_entries = job.GetLogEntries(self._prev_log_serial) 507 508 # Serializing and deserializing data can cause type changes (e.g. from 509 # tuple to list) or precision loss. We're doing it here so that we get 510 # the same modifications as the data received from the client. Without 511 # this, the comparison afterwards might fail without the data being 512 # significantly different. 513 # TODO: we just deserialized from disk, investigate how to make sure that 514 # the job info and log entries are compatible to avoid this further step. 515 # TODO: Doing something like in testutils.py:UnifyValueType might be more 516 # efficient, though floats will be tricky 517 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 518 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 519 520 # Don't even try to wait if the job is no longer running, there will be 521 # no changes. 522 if (status not in (constants.JOB_STATUS_QUEUED, 523 constants.JOB_STATUS_RUNNING, 524 constants.JOB_STATUS_WAITLOCK) or 525 job_info != self._prev_job_info or 526 (log_entries and self._prev_log_serial != log_entries[0][0])): 527 logging.debug("Job %s changed", job.id) 528 return (job_info, log_entries) 529 530 return None
531
532 533 -class _JobFileChangesWaiter(object):
534 - def __init__(self, filename):
535 """Initializes this class. 536 537 @type filename: string 538 @param filename: Path to job file 539 @raises errors.InotifyError: if the notifier cannot be setup 540 541 """ 542 self._wm = pyinotify.WatchManager() 543 self._inotify_handler = \ 544 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) 545 self._notifier = \ 546 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) 547 try: 548 self._inotify_handler.enable() 549 except Exception: 550 # pyinotify doesn't close file descriptors automatically 551 self._notifier.stop() 552 raise
553
554 - def _OnInotify(self, notifier_enabled):
555 """Callback for inotify. 556 557 """ 558 if not notifier_enabled: 559 self._inotify_handler.enable()
560
561 - def Wait(self, timeout):
562 """Waits for the job file to change. 563 564 @type timeout: float 565 @param timeout: Timeout in seconds 566 @return: Whether there have been events 567 568 """ 569 assert timeout >= 0 570 have_events = self._notifier.check_events(timeout * 1000) 571 if have_events: 572 self._notifier.read_events() 573 self._notifier.process_events() 574 return have_events
575
576 - def Close(self):
577 """Closes underlying notifier and its file descriptor. 578 579 """ 580 self._notifier.stop()
581
582 583 -class _JobChangesWaiter(object):
584 - def __init__(self, filename):
585 """Initializes this class. 586 587 @type filename: string 588 @param filename: Path to job file 589 590 """ 591 self._filewaiter = None 592 self._filename = filename
593
594 - def Wait(self, timeout):
595 """Waits for a job to change. 596 597 @type timeout: float 598 @param timeout: Timeout in seconds 599 @return: Whether there have been events 600 601 """ 602 if self._filewaiter: 603 return self._filewaiter.Wait(timeout) 604 605 # Lazy setup: Avoid inotify setup cost when job file has already changed. 606 # If this point is reached, return immediately and let caller check the job 607 # file again in case there were changes since the last check. This avoids a 608 # race condition. 609 self._filewaiter = _JobFileChangesWaiter(self._filename) 610 611 return True
612
613 - def Close(self):
614 """Closes underlying waiter. 615 616 """ 617 if self._filewaiter: 618 self._filewaiter.Close()
619
620 621 -class _WaitForJobChangesHelper(object):
622 """Helper class using inotify to wait for changes in a job file. 623 624 This class takes a previous job status and serial, and alerts the client when 625 the current job status has changed. 626 627 """ 628 @staticmethod
629 - def _CheckForChanges(job_load_fn, check_fn):
630 job = job_load_fn() 631 if not job: 632 raise errors.JobLost() 633 634 result = check_fn(job) 635 if result is None: 636 raise utils.RetryAgain() 637 638 return result
639
640 - def __call__(self, filename, job_load_fn, 641 fields, prev_job_info, prev_log_serial, timeout):
642 """Waits for changes on a job. 643 644 @type filename: string 645 @param filename: File on which to wait for changes 646 @type job_load_fn: callable 647 @param job_load_fn: Function to load job 648 @type fields: list of strings 649 @param fields: Which fields to check for changes 650 @type prev_job_info: list or None 651 @param prev_job_info: Last job information returned 652 @type prev_log_serial: int 653 @param prev_log_serial: Last job message serial number 654 @type timeout: float 655 @param timeout: maximum time to wait in seconds 656 657 """ 658 try: 659 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) 660 waiter = _JobChangesWaiter(filename) 661 try: 662 return utils.Retry(compat.partial(self._CheckForChanges, 663 job_load_fn, check_fn), 664 utils.RETRY_REMAINING_TIME, timeout, 665 wait_fn=waiter.Wait) 666 finally: 667 waiter.Close() 668 except (errors.InotifyError, errors.JobLost): 669 return None 670 except utils.RetryTimeout: 671 return constants.JOB_NOTCHANGED
672
673 674 -def _EncodeOpError(err):
675 """Encodes an error which occurred while processing an opcode. 676 677 """ 678 if isinstance(err, errors.GenericError): 679 to_encode = err 680 else: 681 to_encode = errors.OpExecError(str(err)) 682 683 return errors.EncodeException(to_encode)
684
685 686 -class _JobQueueWorker(workerpool.BaseWorker):
687 """The actual job workers. 688 689 """
690 - def RunTask(self, job): # pylint: disable-msg=W0221
691 """Job executor. 692 693 This functions processes a job. It is closely tied to the _QueuedJob and 694 _QueuedOpCode classes. 695 696 @type job: L{_QueuedJob} 697 @param job: the job to be processed 698 699 """ 700 self.SetTaskName("Job%s" % job.id) 701 702 logging.info("Processing job %s", job.id) 703 proc = mcpu.Processor(self.pool.queue.context, job.id) 704 queue = job.queue 705 try: 706 try: 707 count = len(job.ops) 708 for idx, op in enumerate(job.ops): 709 op_summary = op.input.Summary() 710 if op.status == constants.OP_STATUS_SUCCESS: 711 # this is a job that was partially completed before master 712 # daemon shutdown, so it can be expected that some opcodes 713 # are already completed successfully (if any did error 714 # out, then the whole job should have been aborted and not 715 # resubmitted for processing) 716 logging.info("Op %s/%s: opcode %s already processed, skipping", 717 idx + 1, count, op_summary) 718 continue 719 try: 720 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, 721 op_summary) 722 723 queue.acquire(shared=1) 724 try: 725 if op.status == constants.OP_STATUS_CANCELED: 726 logging.debug("Canceling opcode") 727 raise CancelJob() 728 assert op.status == constants.OP_STATUS_QUEUED 729 logging.debug("Opcode %s/%s waiting for locks", 730 idx + 1, count) 731 op.status = constants.OP_STATUS_WAITLOCK 732 op.result = None 733 op.start_timestamp = TimeStampNow() 734 if idx == 0: # first opcode 735 job.start_timestamp = op.start_timestamp 736 queue.UpdateJobUnlocked(job) 737 738 input_opcode = op.input 739 finally: 740 queue.release() 741 742 # Make sure not to hold queue lock while calling ExecOpCode 743 result = proc.ExecOpCode(input_opcode, 744 _OpExecCallbacks(queue, job, op)) 745 746 queue.acquire(shared=1) 747 try: 748 logging.debug("Opcode %s/%s succeeded", idx + 1, count) 749 op.status = constants.OP_STATUS_SUCCESS 750 op.result = result 751 op.end_timestamp = TimeStampNow() 752 if idx == count - 1: 753 job.end_timestamp = TimeStampNow() 754 755 # Consistency check 756 assert compat.all(i.status == constants.OP_STATUS_SUCCESS 757 for i in job.ops) 758 759 queue.UpdateJobUnlocked(job) 760 finally: 761 queue.release() 762 763 logging.info("Op %s/%s: Successfully finished opcode %s", 764 idx + 1, count, op_summary) 765 except CancelJob: 766 # Will be handled further up 767 raise 768 except Exception, err: 769 queue.acquire(shared=1) 770 try: 771 try: 772 logging.debug("Opcode %s/%s failed", idx + 1, count) 773 op.status = constants.OP_STATUS_ERROR 774 op.result = _EncodeOpError(err) 775 op.end_timestamp = TimeStampNow() 776 logging.info("Op %s/%s: Error in opcode %s: %s", 777 idx + 1, count, op_summary, err) 778 779 to_encode = errors.OpExecError("Preceding opcode failed") 780 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 781 _EncodeOpError(to_encode)) 782 783 # Consistency check 784 assert compat.all(i.status == constants.OP_STATUS_SUCCESS 785 for i in job.ops[:idx]) 786 assert compat.all(i.status == constants.OP_STATUS_ERROR and 787 errors.GetEncodedError(i.result) 788 for i in job.ops[idx:]) 789 finally: 790 job.end_timestamp = TimeStampNow() 791 queue.UpdateJobUnlocked(job) 792 finally: 793 queue.release() 794 raise 795 796 except CancelJob: 797 queue.acquire(shared=1) 798 try: 799 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 800 "Job canceled by request") 801 job.end_timestamp = TimeStampNow() 802 queue.UpdateJobUnlocked(job) 803 finally: 804 queue.release() 805 except errors.GenericError, err: 806 logging.exception("Ganeti exception") 807 except: 808 logging.exception("Unhandled exception") 809 finally: 810 status = job.CalcStatus() 811 logging.info("Finished job %s, status = %s", job.id, status)
812
813 814 -class _JobQueueWorkerPool(workerpool.WorkerPool):
815 """Simple class implementing a job-processing workerpool. 816 817 """
818 - def __init__(self, queue):
819 super(_JobQueueWorkerPool, self).__init__("JobQueue", 820 JOBQUEUE_THREADS, 821 _JobQueueWorker) 822 self.queue = queue
823
824 825 -def _RequireOpenQueue(fn):
826 """Decorator for "public" functions. 827 828 This function should be used for all 'public' functions. That is, 829 functions usually called from other classes. Note that this should 830 be applied only to methods (not plain functions), since it expects 831 that the decorated function is called with a first argument that has 832 a '_queue_filelock' argument. 833 834 @warning: Use this decorator only after locking.ssynchronized 835 836 Example:: 837 @locking.ssynchronized(_LOCK) 838 @_RequireOpenQueue 839 def Example(self): 840 pass 841 842 """ 843 def wrapper(self, *args, **kwargs): 844 # pylint: disable-msg=W0212 845 assert self._queue_filelock is not None, "Queue should be open" 846 return fn(self, *args, **kwargs)
847 return wrapper 848
849 850 -class JobQueue(object):
851 """Queue used to manage the jobs. 852 853 @cvar _RE_JOB_FILE: regex matching the valid job file names 854 855 """ 856 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) 857
858 - def __init__(self, context):
859 """Constructor for JobQueue. 860 861 The constructor will initialize the job queue object and then 862 start loading the current jobs from disk, either for starting them 863 (if they were queue) or for aborting them (if they were already 864 running). 865 866 @type context: GanetiContext 867 @param context: the context object for access to the configuration 868 data and other ganeti objects 869 870 """ 871 self.context = context 872 self._memcache = weakref.WeakValueDictionary() 873 self._my_hostname = netutils.HostInfo().name 874 875 # The Big JobQueue lock. If a code block or method acquires it in shared 876 # mode safe it must guarantee concurrency with all the code acquiring it in 877 # shared mode, including itself. In order not to acquire it at all 878 # concurrency must be guaranteed with all code acquiring it in shared mode 879 # and all code acquiring it exclusively. 880 self._lock = locking.SharedLock("JobQueue") 881 882 self.acquire = self._lock.acquire 883 self.release = self._lock.release 884 885 # Initialize the queue, and acquire the filelock. 886 # This ensures no other process is working on the job queue. 887 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) 888 889 # Read serial file 890 self._last_serial = jstore.ReadSerial() 891 assert self._last_serial is not None, ("Serial file was modified between" 892 " check in jstore and here") 893 894 # Get initial list of nodes 895 self._nodes = dict((n.name, n.primary_ip) 896 for n in self.context.cfg.GetAllNodesInfo().values() 897 if n.master_candidate) 898 899 # Remove master node 900 self._nodes.pop(self._my_hostname, None) 901 902 # TODO: Check consistency across nodes 903 904 self._queue_size = 0 905 self._UpdateQueueSizeUnlocked() 906 self._drained = self._IsQueueMarkedDrain() 907 908 # Setup worker pool 909 self._wpool = _JobQueueWorkerPool(self) 910 try: 911 self._InspectQueue() 912 except: 913 self._wpool.TerminateWorkers() 914 raise
915 916 @locking.ssynchronized(_LOCK) 917 @_RequireOpenQueue
918 - def _InspectQueue(self):
919 """Loads the whole job queue and resumes unfinished jobs. 920 921 This function needs the lock here because WorkerPool.AddTask() may start a 922 job while we're still doing our work. 923 924 """ 925 logging.info("Inspecting job queue") 926 927 all_job_ids = self._GetJobIDsUnlocked() 928 jobs_count = len(all_job_ids) 929 lastinfo = time.time() 930 for idx, job_id in enumerate(all_job_ids): 931 # Give an update every 1000 jobs or 10 seconds 932 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or 933 idx == (jobs_count - 1)): 934 logging.info("Job queue inspection: %d/%d (%0.1f %%)", 935 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) 936 lastinfo = time.time() 937 938 job = self._LoadJobUnlocked(job_id) 939 940 # a failure in loading the job can cause 'None' to be returned 941 if job is None: 942 continue 943 944 status = job.CalcStatus() 945 946 if status in (constants.JOB_STATUS_QUEUED, ): 947 self._wpool.AddTask((job, )) 948 949 elif status in (constants.JOB_STATUS_RUNNING, 950 constants.JOB_STATUS_WAITLOCK, 951 constants.JOB_STATUS_CANCELING): 952 logging.warning("Unfinished job %s found: %s", job.id, job) 953 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 954 "Unclean master daemon shutdown") 955 self.UpdateJobUnlocked(job) 956 957 logging.info("Job queue inspection finished")
958 959 @locking.ssynchronized(_LOCK) 960 @_RequireOpenQueue
961 - def AddNode(self, node):
962 """Register a new node with the queue. 963 964 @type node: L{objects.Node} 965 @param node: the node object to be added 966 967 """ 968 node_name = node.name 969 assert node_name != self._my_hostname 970 971 # Clean queue directory on added node 972 result = rpc.RpcRunner.call_jobqueue_purge(node_name) 973 msg = result.fail_msg 974 if msg: 975 logging.warning("Cannot cleanup queue directory on node %s: %s", 976 node_name, msg) 977 978 if not node.master_candidate: 979 # remove if existing, ignoring errors 980 self._nodes.pop(node_name, None) 981 # and skip the replication of the job ids 982 return 983 984 # Upload the whole queue excluding archived jobs 985 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 986 987 # Upload current serial file 988 files.append(constants.JOB_QUEUE_SERIAL_FILE) 989 990 for file_name in files: 991 # Read file content 992 content = utils.ReadFile(file_name) 993 994 result = rpc.RpcRunner.call_jobqueue_update([node_name], 995 [node.primary_ip], 996 file_name, content) 997 msg = result[node_name].fail_msg 998 if msg: 999 logging.error("Failed to upload file %s to node %s: %s", 1000 file_name, node_name, msg) 1001 1002 self._nodes[node_name] = node.primary_ip
1003 1004 @locking.ssynchronized(_LOCK) 1005 @_RequireOpenQueue
1006 - def RemoveNode(self, node_name):
1007 """Callback called when removing nodes from the cluster. 1008 1009 @type node_name: str 1010 @param node_name: the name of the node to remove 1011 1012 """ 1013 self._nodes.pop(node_name, None)
1014 1015 @staticmethod
1016 - def _CheckRpcResult(result, nodes, failmsg):
1017 """Verifies the status of an RPC call. 1018 1019 Since we aim to keep consistency should this node (the current 1020 master) fail, we will log errors if our rpc fail, and especially 1021 log the case when more than half of the nodes fails. 1022 1023 @param result: the data as returned from the rpc call 1024 @type nodes: list 1025 @param nodes: the list of nodes we made the call to 1026 @type failmsg: str 1027 @param failmsg: the identifier to be used for logging 1028 1029 """ 1030 failed = [] 1031 success = [] 1032 1033 for node in nodes: 1034 msg = result[node].fail_msg 1035 if msg: 1036 failed.append(node) 1037 logging.error("RPC call %s (%s) failed on node %s: %s", 1038 result[node].call, failmsg, node, msg) 1039 else: 1040 success.append(node) 1041 1042 # +1 for the master node 1043 if (len(success) + 1) < len(failed): 1044 # TODO: Handle failing nodes 1045 logging.error("More than half of the nodes failed")
1046
1047 - def _GetNodeIp(self):
1048 """Helper for returning the node name/ip list. 1049 1050 @rtype: (list, list) 1051 @return: a tuple of two lists, the first one with the node 1052 names and the second one with the node addresses 1053 1054 """ 1055 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? 1056 name_list = self._nodes.keys() 1057 addr_list = [self._nodes[name] for name in name_list] 1058 return name_list, addr_list
1059
1060 - def _UpdateJobQueueFile(self, file_name, data, replicate):
1061 """Writes a file locally and then replicates it to all nodes. 1062 1063 This function will replace the contents of a file on the local 1064 node and then replicate it to all the other nodes we have. 1065 1066 @type file_name: str 1067 @param file_name: the path of the file to be replicated 1068 @type data: str 1069 @param data: the new contents of the file 1070 @type replicate: boolean 1071 @param replicate: whether to spread the changes to the remote nodes 1072 1073 """ 1074 utils.WriteFile(file_name, data=data) 1075 1076 if replicate: 1077 names, addrs = self._GetNodeIp() 1078 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) 1079 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1080
1081 - def _RenameFilesUnlocked(self, rename):
1082 """Renames a file locally and then replicate the change. 1083 1084 This function will rename a file in the local queue directory 1085 and then replicate this rename to all the other nodes we have. 1086 1087 @type rename: list of (old, new) 1088 @param rename: List containing tuples mapping old to new names 1089 1090 """ 1091 # Rename them locally 1092 for old, new in rename: 1093 utils.RenameFile(old, new, mkdir=True) 1094 1095 # ... and on all nodes 1096 names, addrs = self._GetNodeIp() 1097 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) 1098 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1099 1100 @staticmethod
1101 - def _FormatJobID(job_id):
1102 """Convert a job ID to string format. 1103 1104 Currently this just does C{str(job_id)} after performing some 1105 checks, but if we want to change the job id format this will 1106 abstract this change. 1107 1108 @type job_id: int or long 1109 @param job_id: the numeric job id 1110 @rtype: str 1111 @return: the formatted job id 1112 1113 """ 1114 if not isinstance(job_id, (int, long)): 1115 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) 1116 if job_id < 0: 1117 raise errors.ProgrammerError("Job ID %s is negative" % job_id) 1118 1119 return str(job_id)
1120 1121 @classmethod
1122 - def _GetArchiveDirectory(cls, job_id):
1123 """Returns the archive directory for a job. 1124 1125 @type job_id: str 1126 @param job_id: Job identifier 1127 @rtype: str 1128 @return: Directory name 1129 1130 """ 1131 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1132
1133 - def _NewSerialsUnlocked(self, count):
1134 """Generates a new job identifier. 1135 1136 Job identifiers are unique during the lifetime of a cluster. 1137 1138 @type count: integer 1139 @param count: how many serials to return 1140 @rtype: str 1141 @return: a string representing the job identifier. 1142 1143 """ 1144 assert count > 0 1145 # New number 1146 serial = self._last_serial + count 1147 1148 # Write to file 1149 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE, 1150 "%s\n" % serial, True) 1151 1152 result = [self._FormatJobID(v) 1153 for v in range(self._last_serial, serial + 1)] 1154 # Keep it only if we were able to write the file 1155 self._last_serial = serial 1156 1157 return result
1158 1159 @staticmethod
1160 - def _GetJobPath(job_id):
1161 """Returns the job file for a given job id. 1162 1163 @type job_id: str 1164 @param job_id: the job identifier 1165 @rtype: str 1166 @return: the path to the job file 1167 1168 """ 1169 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1170 1171 @classmethod
1172 - def _GetArchivedJobPath(cls, job_id):
1173 """Returns the archived job file for a give job id. 1174 1175 @type job_id: str 1176 @param job_id: the job identifier 1177 @rtype: str 1178 @return: the path to the archived job file 1179 1180 """ 1181 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, 1182 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1183
1184 - def _GetJobIDsUnlocked(self, sort=True):
1185 """Return all known job IDs. 1186 1187 The method only looks at disk because it's a requirement that all 1188 jobs are present on disk (so in the _memcache we don't have any 1189 extra IDs). 1190 1191 @type sort: boolean 1192 @param sort: perform sorting on the returned job ids 1193 @rtype: list 1194 @return: the list of job IDs 1195 1196 """ 1197 jlist = [] 1198 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR): 1199 m = self._RE_JOB_FILE.match(filename) 1200 if m: 1201 jlist.append(m.group(1)) 1202 if sort: 1203 jlist = utils.NiceSort(jlist) 1204 return jlist
1205
1206 - def _LoadJobUnlocked(self, job_id):
1207 """Loads a job from the disk or memory. 1208 1209 Given a job id, this will return the cached job object if 1210 existing, or try to load the job from the disk. If loading from 1211 disk, it will also add the job to the cache. 1212 1213 @param job_id: the job id 1214 @rtype: L{_QueuedJob} or None 1215 @return: either None or the job object 1216 1217 """ 1218 job = self._memcache.get(job_id, None) 1219 if job: 1220 logging.debug("Found job %s in memcache", job_id) 1221 return job 1222 1223 try: 1224 job = self._LoadJobFromDisk(job_id) 1225 if job is None: 1226 return job 1227 except errors.JobFileCorrupted: 1228 old_path = self._GetJobPath(job_id) 1229 new_path = self._GetArchivedJobPath(job_id) 1230 if old_path == new_path: 1231 # job already archived (future case) 1232 logging.exception("Can't parse job %s", job_id) 1233 else: 1234 # non-archived case 1235 logging.exception("Can't parse job %s, will archive.", job_id) 1236 self._RenameFilesUnlocked([(old_path, new_path)]) 1237 return None 1238 1239 self._memcache[job_id] = job 1240 logging.debug("Added job %s to the cache", job_id) 1241 return job
1242
1243 - def _LoadJobFromDisk(self, job_id):
1244 """Load the given job file from disk. 1245 1246 Given a job file, read, load and restore it in a _QueuedJob format. 1247 1248 @type job_id: string 1249 @param job_id: job identifier 1250 @rtype: L{_QueuedJob} or None 1251 @return: either None or the job object 1252 1253 """ 1254 filepath = self._GetJobPath(job_id) 1255 logging.debug("Loading job from %s", filepath) 1256 try: 1257 raw_data = utils.ReadFile(filepath) 1258 except EnvironmentError, err: 1259 if err.errno in (errno.ENOENT, ): 1260 return None 1261 raise 1262 1263 try: 1264 data = serializer.LoadJson(raw_data) 1265 job = _QueuedJob.Restore(self, data) 1266 except Exception, err: # pylint: disable-msg=W0703 1267 raise errors.JobFileCorrupted(err) 1268 1269 return job
1270
1271 - def SafeLoadJobFromDisk(self, job_id):
1272 """Load the given job file from disk. 1273 1274 Given a job file, read, load and restore it in a _QueuedJob format. 1275 In case of error reading the job, it gets returned as None, and the 1276 exception is logged. 1277 1278 @type job_id: string 1279 @param job_id: job identifier 1280 @rtype: L{_QueuedJob} or None 1281 @return: either None or the job object 1282 1283 """ 1284 try: 1285 return self._LoadJobFromDisk(job_id) 1286 except (errors.JobFileCorrupted, EnvironmentError): 1287 logging.exception("Can't load/parse job %s", job_id) 1288 return None
1289 1290 @staticmethod
1291 - def _IsQueueMarkedDrain():
1292 """Check if the queue is marked from drain. 1293 1294 This currently uses the queue drain file, which makes it a 1295 per-node flag. In the future this can be moved to the config file. 1296 1297 @rtype: boolean 1298 @return: True of the job queue is marked for draining 1299 1300 """ 1301 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1302
1303 - def _UpdateQueueSizeUnlocked(self):
1304 """Update the queue size. 1305 1306 """ 1307 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1308 1309 @locking.ssynchronized(_LOCK) 1310 @_RequireOpenQueue
1311 - def SetDrainFlag(self, drain_flag):
1312 """Sets the drain flag for the queue. 1313 1314 @type drain_flag: boolean 1315 @param drain_flag: Whether to set or unset the drain flag 1316 1317 """ 1318 if drain_flag: 1319 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) 1320 else: 1321 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) 1322 1323 self._drained = drain_flag 1324 1325 return True
1326 1327 @_RequireOpenQueue
1328 - def _SubmitJobUnlocked(self, job_id, ops):
1329 """Create and store a new job. 1330 1331 This enters the job into our job queue and also puts it on the new 1332 queue, in order for it to be picked up by the queue processors. 1333 1334 @type job_id: job ID 1335 @param job_id: the job ID for the new job 1336 @type ops: list 1337 @param ops: The list of OpCodes that will become the new job. 1338 @rtype: L{_QueuedJob} 1339 @return: the job object to be queued 1340 @raise errors.JobQueueDrainError: if the job queue is marked for draining 1341 @raise errors.JobQueueFull: if the job queue has too many jobs in it 1342 1343 """ 1344 # Ok when sharing the big job queue lock, as the drain file is created when 1345 # the lock is exclusive. 1346 if self._drained: 1347 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 1348 1349 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: 1350 raise errors.JobQueueFull() 1351 1352 job = _QueuedJob(self, job_id, ops) 1353 1354 # Write to disk 1355 self.UpdateJobUnlocked(job) 1356 1357 self._queue_size += 1 1358 1359 logging.debug("Adding new job %s to the cache", job_id) 1360 self._memcache[job_id] = job 1361 1362 return job
1363 1364 @locking.ssynchronized(_LOCK) 1365 @_RequireOpenQueue
1366 - def SubmitJob(self, ops):
1367 """Create and store a new job. 1368 1369 @see: L{_SubmitJobUnlocked} 1370 1371 """ 1372 job_id = self._NewSerialsUnlocked(1)[0] 1373 self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), )) 1374 return job_id
1375 1376 @locking.ssynchronized(_LOCK) 1377 @_RequireOpenQueue
1378 - def SubmitManyJobs(self, jobs):
1379 """Create and store multiple jobs. 1380 1381 @see: L{_SubmitJobUnlocked} 1382 1383 """ 1384 results = [] 1385 tasks = [] 1386 all_job_ids = self._NewSerialsUnlocked(len(jobs)) 1387 for job_id, ops in zip(all_job_ids, jobs): 1388 try: 1389 tasks.append((self._SubmitJobUnlocked(job_id, ops), )) 1390 status = True 1391 data = job_id 1392 except errors.GenericError, err: 1393 data = str(err) 1394 status = False 1395 results.append((status, data)) 1396 self._wpool.AddManyTasks(tasks) 1397 1398 return results
1399 1400 @_RequireOpenQueue
1401 - def UpdateJobUnlocked(self, job, replicate=True):
1402 """Update a job's on disk storage. 1403 1404 After a job has been modified, this function needs to be called in 1405 order to write the changes to disk and replicate them to the other 1406 nodes. 1407 1408 @type job: L{_QueuedJob} 1409 @param job: the changed job 1410 @type replicate: boolean 1411 @param replicate: whether to replicate the change to remote nodes 1412 1413 """ 1414 filename = self._GetJobPath(job.id) 1415 data = serializer.DumpJson(job.Serialize(), indent=False) 1416 logging.debug("Writing job %s to %s", job.id, filename) 1417 self._UpdateJobQueueFile(filename, data, replicate)
1418
1419 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 1420 timeout):
1421 """Waits for changes in a job. 1422 1423 @type job_id: string 1424 @param job_id: Job identifier 1425 @type fields: list of strings 1426 @param fields: Which fields to check for changes 1427 @type prev_job_info: list or None 1428 @param prev_job_info: Last job information returned 1429 @type prev_log_serial: int 1430 @param prev_log_serial: Last job message serial number 1431 @type timeout: float 1432 @param timeout: maximum time to wait in seconds 1433 @rtype: tuple (job info, log entries) 1434 @return: a tuple of the job information as required via 1435 the fields parameter, and the log entries as a list 1436 1437 if the job has not changed and the timeout has expired, 1438 we instead return a special value, 1439 L{constants.JOB_NOTCHANGED}, which should be interpreted 1440 as such by the clients 1441 1442 """ 1443 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id) 1444 1445 helper = _WaitForJobChangesHelper() 1446 1447 return helper(self._GetJobPath(job_id), load_fn, 1448 fields, prev_job_info, prev_log_serial, timeout)
1449 1450 @locking.ssynchronized(_LOCK) 1451 @_RequireOpenQueue
1452 - def CancelJob(self, job_id):
1453 """Cancels a job. 1454 1455 This will only succeed if the job has not started yet. 1456 1457 @type job_id: string 1458 @param job_id: job ID of job to be cancelled. 1459 1460 """ 1461 logging.info("Cancelling job %s", job_id) 1462 1463 job = self._LoadJobUnlocked(job_id) 1464 if not job: 1465 logging.debug("Job %s not found", job_id) 1466 return (False, "Job %s not found" % job_id) 1467 1468 job_status = job.CalcStatus() 1469 1470 if job_status not in (constants.JOB_STATUS_QUEUED, 1471 constants.JOB_STATUS_WAITLOCK): 1472 logging.debug("Job %s is no longer waiting in the queue", job.id) 1473 return (False, "Job %s is no longer waiting in the queue" % job.id) 1474 1475 if job_status == constants.JOB_STATUS_QUEUED: 1476 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1477 "Job canceled by request") 1478 msg = "Job %s canceled" % job.id 1479 1480 elif job_status == constants.JOB_STATUS_WAITLOCK: 1481 # The worker will notice the new status and cancel the job 1482 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 1483 msg = "Job %s will be canceled" % job.id 1484 1485 self.UpdateJobUnlocked(job) 1486 1487 return (True, msg)
1488 1489 @_RequireOpenQueue
1490 - def _ArchiveJobsUnlocked(self, jobs):
1491 """Archives jobs. 1492 1493 @type jobs: list of L{_QueuedJob} 1494 @param jobs: Job objects 1495 @rtype: int 1496 @return: Number of archived jobs 1497 1498 """ 1499 archive_jobs = [] 1500 rename_files = [] 1501 for job in jobs: 1502 if job.CalcStatus() not in constants.JOBS_FINALIZED: 1503 logging.debug("Job %s is not yet done", job.id) 1504 continue 1505 1506 archive_jobs.append(job) 1507 1508 old = self._GetJobPath(job.id) 1509 new = self._GetArchivedJobPath(job.id) 1510 rename_files.append((old, new)) 1511 1512 # TODO: What if 1..n files fail to rename? 1513 self._RenameFilesUnlocked(rename_files) 1514 1515 logging.debug("Successfully archived job(s) %s", 1516 utils.CommaJoin(job.id for job in archive_jobs)) 1517 1518 # Since we haven't quite checked, above, if we succeeded or failed renaming 1519 # the files, we update the cached queue size from the filesystem. When we 1520 # get around to fix the TODO: above, we can use the number of actually 1521 # archived jobs to fix this. 1522 self._UpdateQueueSizeUnlocked() 1523 return len(archive_jobs)
1524 1525 @locking.ssynchronized(_LOCK) 1526 @_RequireOpenQueue
1527 - def ArchiveJob(self, job_id):
1528 """Archives a job. 1529 1530 This is just a wrapper over L{_ArchiveJobsUnlocked}. 1531 1532 @type job_id: string 1533 @param job_id: Job ID of job to be archived. 1534 @rtype: bool 1535 @return: Whether job was archived 1536 1537 """ 1538 logging.info("Archiving job %s", job_id) 1539 1540 job = self._LoadJobUnlocked(job_id) 1541 if not job: 1542 logging.debug("Job %s not found", job_id) 1543 return False 1544 1545 return self._ArchiveJobsUnlocked([job]) == 1
1546 1547 @locking.ssynchronized(_LOCK) 1548 @_RequireOpenQueue
1549 - def AutoArchiveJobs(self, age, timeout):
1550 """Archives all jobs based on age. 1551 1552 The method will archive all jobs which are older than the age 1553 parameter. For jobs that don't have an end timestamp, the start 1554 timestamp will be considered. The special '-1' age will cause 1555 archival of all jobs (that are not running or queued). 1556 1557 @type age: int 1558 @param age: the minimum age in seconds 1559 1560 """ 1561 logging.info("Archiving jobs with age more than %s seconds", age) 1562 1563 now = time.time() 1564 end_time = now + timeout 1565 archived_count = 0 1566 last_touched = 0 1567 1568 all_job_ids = self._GetJobIDsUnlocked() 1569 pending = [] 1570 for idx, job_id in enumerate(all_job_ids): 1571 last_touched = idx + 1 1572 1573 # Not optimal because jobs could be pending 1574 # TODO: Measure average duration for job archival and take number of 1575 # pending jobs into account. 1576 if time.time() > end_time: 1577 break 1578 1579 # Returns None if the job failed to load 1580 job = self._LoadJobUnlocked(job_id) 1581 if job: 1582 if job.end_timestamp is None: 1583 if job.start_timestamp is None: 1584 job_age = job.received_timestamp 1585 else: 1586 job_age = job.start_timestamp 1587 else: 1588 job_age = job.end_timestamp 1589 1590 if age == -1 or now - job_age[0] > age: 1591 pending.append(job) 1592 1593 # Archive 10 jobs at a time 1594 if len(pending) >= 10: 1595 archived_count += self._ArchiveJobsUnlocked(pending) 1596 pending = [] 1597 1598 if pending: 1599 archived_count += self._ArchiveJobsUnlocked(pending) 1600 1601 return (archived_count, len(all_job_ids) - last_touched)
1602
1603 - def QueryJobs(self, job_ids, fields):
1604 """Returns a list of jobs in queue. 1605 1606 @type job_ids: list 1607 @param job_ids: sequence of job identifiers or None for all 1608 @type fields: list 1609 @param fields: names of fields to return 1610 @rtype: list 1611 @return: list one element per job, each element being list with 1612 the requested fields 1613 1614 """ 1615 jobs = [] 1616 list_all = False 1617 if not job_ids: 1618 # Since files are added to/removed from the queue atomically, there's no 1619 # risk of getting the job ids in an inconsistent state. 1620 job_ids = self._GetJobIDsUnlocked() 1621 list_all = True 1622 1623 for job_id in job_ids: 1624 job = self.SafeLoadJobFromDisk(job_id) 1625 if job is not None: 1626 jobs.append(job.GetInfo(fields)) 1627 elif not list_all: 1628 jobs.append(None) 1629 1630 return jobs
1631 1632 @locking.ssynchronized(_LOCK) 1633 @_RequireOpenQueue
1634 - def Shutdown(self):
1635 """Stops the job queue. 1636 1637 This shutdowns all the worker threads an closes the queue. 1638 1639 """ 1640 self._wpool.TerminateWorkers() 1641 1642 self._queue_filelock.Close() 1643 self._queue_filelock = None
1644