Package ganeti :: Module jqueue
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the job queue handling. 
  23   
  24  Locking: there's a single, large lock in the L{JobQueue} class. It's 
  25  used by all other classes in this module. 
  26   
  27  @var JOBQUEUE_THREADS: the number of worker threads we start for 
  28      processing jobs 
  29   
  30  """ 
  31   
  32  import os 
  33  import logging 
  34  import threading 
  35  import errno 
  36  import re 
  37  import time 
  38  import weakref 
  39   
  40  from ganeti import constants 
  41  from ganeti import serializer 
  42  from ganeti import workerpool 
  43  from ganeti import opcodes 
  44  from ganeti import errors 
  45  from ganeti import mcpu 
  46  from ganeti import utils 
  47  from ganeti import jstore 
  48  from ganeti import rpc 
  49   
  50   
  51  JOBQUEUE_THREADS = 25 
  52  JOBS_PER_ARCHIVE_DIRECTORY = 10000 
53 54 55 -class CancelJob(Exception):
56 """Special exception to cancel a job. 57 58 """
59
60 61 -def TimeStampNow():
62 """Returns the current timestamp. 63 64 @rtype: tuple 65 @return: the current time in the (seconds, microseconds) format 66 67 """ 68 return utils.SplitTime(time.time())
69
70 71 -class _QueuedOpCode(object):
72 """Encapsulates an opcode object. 73 74 @ivar log: holds the execution log and consists of tuples 75 of the form C{(log_serial, timestamp, level, message)} 76 @ivar input: the OpCode we encapsulate 77 @ivar status: the current status 78 @ivar result: the result of the LU execution 79 @ivar start_timestamp: timestamp for the start of the execution 80 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation 81 @ivar stop_timestamp: timestamp for the end of the execution 82 83 """ 84 __slots__ = ["input", "status", "result", "log", 85 "start_timestamp", "exec_timestamp", "end_timestamp", 86 "__weakref__"] 87
88 - def __init__(self, op):
89 """Constructor for the _QuededOpCode. 90 91 @type op: L{opcodes.OpCode} 92 @param op: the opcode we encapsulate 93 94 """ 95 self.input = op 96 self.status = constants.OP_STATUS_QUEUED 97 self.result = None 98 self.log = [] 99 self.start_timestamp = None 100 self.exec_timestamp = None 101 self.end_timestamp = None
102 103 @classmethod
104 - def Restore(cls, state):
105 """Restore the _QueuedOpCode from the serialized form. 106 107 @type state: dict 108 @param state: the serialized state 109 @rtype: _QueuedOpCode 110 @return: a new _QueuedOpCode instance 111 112 """ 113 obj = _QueuedOpCode.__new__(cls) 114 obj.input = opcodes.OpCode.LoadOpCode(state["input"]) 115 obj.status = state["status"] 116 obj.result = state["result"] 117 obj.log = state["log"] 118 obj.start_timestamp = state.get("start_timestamp", None) 119 obj.exec_timestamp = state.get("exec_timestamp", None) 120 obj.end_timestamp = state.get("end_timestamp", None) 121 return obj
122
123 - def Serialize(self):
124 """Serializes this _QueuedOpCode. 125 126 @rtype: dict 127 @return: the dictionary holding the serialized state 128 129 """ 130 return { 131 "input": self.input.__getstate__(), 132 "status": self.status, 133 "result": self.result, 134 "log": self.log, 135 "start_timestamp": self.start_timestamp, 136 "exec_timestamp": self.exec_timestamp, 137 "end_timestamp": self.end_timestamp, 138 }
139
140 141 -class _QueuedJob(object):
142 """In-memory job representation. 143 144 This is what we use to track the user-submitted jobs. Locking must 145 be taken care of by users of this class. 146 147 @type queue: L{JobQueue} 148 @ivar queue: the parent queue 149 @ivar id: the job ID 150 @type ops: list 151 @ivar ops: the list of _QueuedOpCode that constitute the job 152 @type log_serial: int 153 @ivar log_serial: holds the index for the next log entry 154 @ivar received_timestamp: the timestamp for when the job was received 155 @ivar start_timestmap: the timestamp for start of execution 156 @ivar end_timestamp: the timestamp for end of execution 157 @ivar lock_status: In-memory locking information for debugging 158 @ivar change: a Condition variable we use for waiting for job changes 159 160 """ 161 # pylint: disable-msg=W0212 162 __slots__ = ["queue", "id", "ops", "log_serial", 163 "received_timestamp", "start_timestamp", "end_timestamp", 164 "lock_status", "change", 165 "__weakref__"] 166
167 - def __init__(self, queue, job_id, ops):
168 """Constructor for the _QueuedJob. 169 170 @type queue: L{JobQueue} 171 @param queue: our parent queue 172 @type job_id: job_id 173 @param job_id: our job id 174 @type ops: list 175 @param ops: the list of opcodes we hold, which will be encapsulated 176 in _QueuedOpCodes 177 178 """ 179 if not ops: 180 # TODO: use a better exception 181 raise Exception("No opcodes") 182 183 self.queue = queue 184 self.id = job_id 185 self.ops = [_QueuedOpCode(op) for op in ops] 186 self.log_serial = 0 187 self.received_timestamp = TimeStampNow() 188 self.start_timestamp = None 189 self.end_timestamp = None 190 191 # In-memory attributes 192 self.lock_status = None 193 194 # Condition to wait for changes 195 self.change = threading.Condition(self.queue._lock)
196
197 - def __repr__(self):
198 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 199 "id=%s" % self.id, 200 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] 201 202 return "<%s at %#x>" % (" ".join(status), id(self))
203 204 @classmethod
205 - def Restore(cls, queue, state):
206 """Restore a _QueuedJob from serialized state: 207 208 @type queue: L{JobQueue} 209 @param queue: to which queue the restored job belongs 210 @type state: dict 211 @param state: the serialized state 212 @rtype: _JobQueue 213 @return: the restored _JobQueue instance 214 215 """ 216 obj = _QueuedJob.__new__(cls) 217 obj.queue = queue 218 obj.id = state["id"] 219 obj.received_timestamp = state.get("received_timestamp", None) 220 obj.start_timestamp = state.get("start_timestamp", None) 221 obj.end_timestamp = state.get("end_timestamp", None) 222 223 # In-memory attributes 224 obj.lock_status = None 225 226 obj.ops = [] 227 obj.log_serial = 0 228 for op_state in state["ops"]: 229 op = _QueuedOpCode.Restore(op_state) 230 for log_entry in op.log: 231 obj.log_serial = max(obj.log_serial, log_entry[0]) 232 obj.ops.append(op) 233 234 # Condition to wait for changes 235 obj.change = threading.Condition(obj.queue._lock) 236 237 return obj
238
239 - def Serialize(self):
240 """Serialize the _JobQueue instance. 241 242 @rtype: dict 243 @return: the serialized state 244 245 """ 246 return { 247 "id": self.id, 248 "ops": [op.Serialize() for op in self.ops], 249 "start_timestamp": self.start_timestamp, 250 "end_timestamp": self.end_timestamp, 251 "received_timestamp": self.received_timestamp, 252 }
253
254 - def CalcStatus(self):
255 """Compute the status of this job. 256 257 This function iterates over all the _QueuedOpCodes in the job and 258 based on their status, computes the job status. 259 260 The algorithm is: 261 - if we find a cancelled, or finished with error, the job 262 status will be the same 263 - otherwise, the last opcode with the status one of: 264 - waitlock 265 - canceling 266 - running 267 268 will determine the job status 269 270 - otherwise, it means either all opcodes are queued, or success, 271 and the job status will be the same 272 273 @return: the job status 274 275 """ 276 status = constants.JOB_STATUS_QUEUED 277 278 all_success = True 279 for op in self.ops: 280 if op.status == constants.OP_STATUS_SUCCESS: 281 continue 282 283 all_success = False 284 285 if op.status == constants.OP_STATUS_QUEUED: 286 pass 287 elif op.status == constants.OP_STATUS_WAITLOCK: 288 status = constants.JOB_STATUS_WAITLOCK 289 elif op.status == constants.OP_STATUS_RUNNING: 290 status = constants.JOB_STATUS_RUNNING 291 elif op.status == constants.OP_STATUS_CANCELING: 292 status = constants.JOB_STATUS_CANCELING 293 break 294 elif op.status == constants.OP_STATUS_ERROR: 295 status = constants.JOB_STATUS_ERROR 296 # The whole job fails if one opcode failed 297 break 298 elif op.status == constants.OP_STATUS_CANCELED: 299 status = constants.OP_STATUS_CANCELED 300 break 301 302 if all_success: 303 status = constants.JOB_STATUS_SUCCESS 304 305 return status
306
307 - def GetLogEntries(self, newer_than):
308 """Selectively returns the log entries. 309 310 @type newer_than: None or int 311 @param newer_than: if this is None, return all log entries, 312 otherwise return only the log entries with serial higher 313 than this value 314 @rtype: list 315 @return: the list of the log entries selected 316 317 """ 318 if newer_than is None: 319 serial = -1 320 else: 321 serial = newer_than 322 323 entries = [] 324 for op in self.ops: 325 entries.extend(filter(lambda entry: entry[0] > serial, op.log)) 326 327 return entries
328
329 - def MarkUnfinishedOps(self, status, result):
330 """Mark unfinished opcodes with a given status and result. 331 332 This is an utility function for marking all running or waiting to 333 be run opcodes with a given status. Opcodes which are already 334 finalised are not changed. 335 336 @param status: a given opcode status 337 @param result: the opcode result 338 339 """ 340 not_marked = True 341 for op in self.ops: 342 if op.status in constants.OPS_FINALIZED: 343 assert not_marked, "Finalized opcodes found after non-finalized ones" 344 continue 345 op.status = status 346 op.result = result 347 not_marked = False
348
349 350 -class _OpExecCallbacks(mcpu.OpExecCbBase):
351 - def __init__(self, queue, job, op):
352 """Initializes this class. 353 354 @type queue: L{JobQueue} 355 @param queue: Job queue 356 @type job: L{_QueuedJob} 357 @param job: Job object 358 @type op: L{_QueuedOpCode} 359 @param op: OpCode 360 361 """ 362 assert queue, "Queue is missing" 363 assert job, "Job is missing" 364 assert op, "Opcode is missing" 365 366 self._queue = queue 367 self._job = job 368 self._op = op
369
370 - def NotifyStart(self):
371 """Mark the opcode as running, not lock-waiting. 372 373 This is called from the mcpu code as a notifier function, when the LU is 374 finally about to start the Exec() method. Of course, to have end-user 375 visible results, the opcode must be initially (before calling into 376 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. 377 378 """ 379 self._queue.acquire() 380 try: 381 assert self._op.status in (constants.OP_STATUS_WAITLOCK, 382 constants.OP_STATUS_CANCELING) 383 384 # All locks are acquired by now 385 self._job.lock_status = None 386 387 # Cancel here if we were asked to 388 if self._op.status == constants.OP_STATUS_CANCELING: 389 raise CancelJob() 390 391 self._op.status = constants.OP_STATUS_RUNNING 392 self._op.exec_timestamp = TimeStampNow() 393 finally: 394 self._queue.release()
395
396 - def Feedback(self, *args):
397 """Append a log entry. 398 399 """ 400 assert len(args) < 3 401 402 if len(args) == 1: 403 log_type = constants.ELOG_MESSAGE 404 log_msg = args[0] 405 else: 406 (log_type, log_msg) = args 407 408 # The time is split to make serialization easier and not lose 409 # precision. 410 timestamp = utils.SplitTime(time.time()) 411 412 self._queue.acquire() 413 try: 414 self._job.log_serial += 1 415 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) 416 417 self._job.change.notifyAll() 418 finally: 419 self._queue.release()
420
421 - def ReportLocks(self, msg):
422 """Write locking information to the job. 423 424 Called whenever the LU processor is waiting for a lock or has acquired one. 425 426 """ 427 # Not getting the queue lock because this is a single assignment 428 self._job.lock_status = msg
429
430 431 -class _JobQueueWorker(workerpool.BaseWorker):
432 """The actual job workers. 433 434 """
435 - def RunTask(self, job): # pylint: disable-msg=W0221
436 """Job executor. 437 438 This functions processes a job. It is closely tied to the _QueuedJob and 439 _QueuedOpCode classes. 440 441 @type job: L{_QueuedJob} 442 @param job: the job to be processed 443 444 """ 445 logging.info("Processing job %s", job.id) 446 proc = mcpu.Processor(self.pool.queue.context, job.id) 447 queue = job.queue 448 try: 449 try: 450 count = len(job.ops) 451 for idx, op in enumerate(job.ops): 452 op_summary = op.input.Summary() 453 if op.status == constants.OP_STATUS_SUCCESS: 454 # this is a job that was partially completed before master 455 # daemon shutdown, so it can be expected that some opcodes 456 # are already completed successfully (if any did error 457 # out, then the whole job should have been aborted and not 458 # resubmitted for processing) 459 logging.info("Op %s/%s: opcode %s already processed, skipping", 460 idx + 1, count, op_summary) 461 continue 462 try: 463 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, 464 op_summary) 465 466 queue.acquire() 467 try: 468 if op.status == constants.OP_STATUS_CANCELED: 469 raise CancelJob() 470 assert op.status == constants.OP_STATUS_QUEUED 471 op.status = constants.OP_STATUS_WAITLOCK 472 op.result = None 473 op.start_timestamp = TimeStampNow() 474 if idx == 0: # first opcode 475 job.start_timestamp = op.start_timestamp 476 queue.UpdateJobUnlocked(job) 477 478 input_opcode = op.input 479 finally: 480 queue.release() 481 482 # Make sure not to hold queue lock while calling ExecOpCode 483 result = proc.ExecOpCode(input_opcode, 484 _OpExecCallbacks(queue, job, op)) 485 486 queue.acquire() 487 try: 488 op.status = constants.OP_STATUS_SUCCESS 489 op.result = result 490 op.end_timestamp = TimeStampNow() 491 queue.UpdateJobUnlocked(job) 492 finally: 493 queue.release() 494 495 logging.info("Op %s/%s: Successfully finished opcode %s", 496 idx + 1, count, op_summary) 497 except CancelJob: 498 # Will be handled further up 499 raise 500 except Exception, err: 501 queue.acquire() 502 try: 503 try: 504 op.status = constants.OP_STATUS_ERROR 505 if isinstance(err, errors.GenericError): 506 op.result = errors.EncodeException(err) 507 else: 508 op.result = str(err) 509 op.end_timestamp = TimeStampNow() 510 logging.info("Op %s/%s: Error in opcode %s: %s", 511 idx + 1, count, op_summary, err) 512 finally: 513 queue.UpdateJobUnlocked(job) 514 finally: 515 queue.release() 516 raise 517 518 except CancelJob: 519 queue.acquire() 520 try: 521 queue.CancelJobUnlocked(job) 522 finally: 523 queue.release() 524 except errors.GenericError, err: 525 logging.exception("Ganeti exception") 526 except: 527 logging.exception("Unhandled exception") 528 finally: 529 queue.acquire() 530 try: 531 try: 532 job.lock_status = None 533 job.end_timestamp = TimeStampNow() 534 queue.UpdateJobUnlocked(job) 535 finally: 536 job_id = job.id 537 status = job.CalcStatus() 538 finally: 539 queue.release() 540 541 logging.info("Finished job %s, status = %s", job_id, status)
542
543 544 -class _JobQueueWorkerPool(workerpool.WorkerPool):
545 """Simple class implementing a job-processing workerpool. 546 547 """
548 - def __init__(self, queue):
549 super(_JobQueueWorkerPool, self).__init__("JobQueue", 550 JOBQUEUE_THREADS, 551 _JobQueueWorker) 552 self.queue = queue
553
554 555 -def _RequireOpenQueue(fn):
556 """Decorator for "public" functions. 557 558 This function should be used for all 'public' functions. That is, 559 functions usually called from other classes. Note that this should 560 be applied only to methods (not plain functions), since it expects 561 that the decorated function is called with a first argument that has 562 a '_queue_lock' argument. 563 564 @warning: Use this decorator only after utils.LockedMethod! 565 566 Example:: 567 @utils.LockedMethod 568 @_RequireOpenQueue 569 def Example(self): 570 pass 571 572 """ 573 def wrapper(self, *args, **kwargs): 574 # pylint: disable-msg=W0212 575 assert self._queue_lock is not None, "Queue should be open" 576 return fn(self, *args, **kwargs)
577 return wrapper 578
579 580 -class JobQueue(object):
581 """Queue used to manage the jobs. 582 583 @cvar _RE_JOB_FILE: regex matching the valid job file names 584 585 """ 586 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) 587
588 - def __init__(self, context):
589 """Constructor for JobQueue. 590 591 The constructor will initialize the job queue object and then 592 start loading the current jobs from disk, either for starting them 593 (if they were queue) or for aborting them (if they were already 594 running). 595 596 @type context: GanetiContext 597 @param context: the context object for access to the configuration 598 data and other ganeti objects 599 600 """ 601 self.context = context 602 self._memcache = weakref.WeakValueDictionary() 603 self._my_hostname = utils.HostInfo().name 604 605 # Locking 606 self._lock = threading.Lock() 607 self.acquire = self._lock.acquire 608 self.release = self._lock.release 609 610 # Initialize 611 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True) 612 613 # Read serial file 614 self._last_serial = jstore.ReadSerial() 615 assert self._last_serial is not None, ("Serial file was modified between" 616 " check in jstore and here") 617 618 # Get initial list of nodes 619 self._nodes = dict((n.name, n.primary_ip) 620 for n in self.context.cfg.GetAllNodesInfo().values() 621 if n.master_candidate) 622 623 # Remove master node 624 try: 625 del self._nodes[self._my_hostname] 626 except KeyError: 627 pass 628 629 # TODO: Check consistency across nodes 630 631 # Setup worker pool 632 self._wpool = _JobQueueWorkerPool(self) 633 try: 634 # We need to lock here because WorkerPool.AddTask() may start a job while 635 # we're still doing our work. 636 self.acquire() 637 try: 638 logging.info("Inspecting job queue") 639 640 all_job_ids = self._GetJobIDsUnlocked() 641 jobs_count = len(all_job_ids) 642 lastinfo = time.time() 643 for idx, job_id in enumerate(all_job_ids): 644 # Give an update every 1000 jobs or 10 seconds 645 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or 646 idx == (jobs_count - 1)): 647 logging.info("Job queue inspection: %d/%d (%0.1f %%)", 648 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) 649 lastinfo = time.time() 650 651 job = self._LoadJobUnlocked(job_id) 652 653 # a failure in loading the job can cause 'None' to be returned 654 if job is None: 655 continue 656 657 status = job.CalcStatus() 658 659 if status in (constants.JOB_STATUS_QUEUED, ): 660 self._wpool.AddTask(job) 661 662 elif status in (constants.JOB_STATUS_RUNNING, 663 constants.JOB_STATUS_WAITLOCK, 664 constants.JOB_STATUS_CANCELING): 665 logging.warning("Unfinished job %s found: %s", job.id, job) 666 try: 667 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, 668 "Unclean master daemon shutdown") 669 finally: 670 self.UpdateJobUnlocked(job) 671 672 logging.info("Job queue inspection finished") 673 finally: 674 self.release() 675 except: 676 self._wpool.TerminateWorkers() 677 raise
678 679 @utils.LockedMethod 680 @_RequireOpenQueue
681 - def AddNode(self, node):
682 """Register a new node with the queue. 683 684 @type node: L{objects.Node} 685 @param node: the node object to be added 686 687 """ 688 node_name = node.name 689 assert node_name != self._my_hostname 690 691 # Clean queue directory on added node 692 result = rpc.RpcRunner.call_jobqueue_purge(node_name) 693 msg = result.fail_msg 694 if msg: 695 logging.warning("Cannot cleanup queue directory on node %s: %s", 696 node_name, msg) 697 698 if not node.master_candidate: 699 # remove if existing, ignoring errors 700 self._nodes.pop(node_name, None) 701 # and skip the replication of the job ids 702 return 703 704 # Upload the whole queue excluding archived jobs 705 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] 706 707 # Upload current serial file 708 files.append(constants.JOB_QUEUE_SERIAL_FILE) 709 710 for file_name in files: 711 # Read file content 712 content = utils.ReadFile(file_name) 713 714 result = rpc.RpcRunner.call_jobqueue_update([node_name], 715 [node.primary_ip], 716 file_name, content) 717 msg = result[node_name].fail_msg 718 if msg: 719 logging.error("Failed to upload file %s to node %s: %s", 720 file_name, node_name, msg) 721 722 self._nodes[node_name] = node.primary_ip
723 724 @utils.LockedMethod 725 @_RequireOpenQueue
726 - def RemoveNode(self, node_name):
727 """Callback called when removing nodes from the cluster. 728 729 @type node_name: str 730 @param node_name: the name of the node to remove 731 732 """ 733 try: 734 # The queue is removed by the "leave node" RPC call. 735 del self._nodes[node_name] 736 except KeyError: 737 pass
738 739 @staticmethod
740 - def _CheckRpcResult(result, nodes, failmsg):
741 """Verifies the status of an RPC call. 742 743 Since we aim to keep consistency should this node (the current 744 master) fail, we will log errors if our rpc fail, and especially 745 log the case when more than half of the nodes fails. 746 747 @param result: the data as returned from the rpc call 748 @type nodes: list 749 @param nodes: the list of nodes we made the call to 750 @type failmsg: str 751 @param failmsg: the identifier to be used for logging 752 753 """ 754 failed = [] 755 success = [] 756 757 for node in nodes: 758 msg = result[node].fail_msg 759 if msg: 760 failed.append(node) 761 logging.error("RPC call %s (%s) failed on node %s: %s", 762 result[node].call, failmsg, node, msg) 763 else: 764 success.append(node) 765 766 # +1 for the master node 767 if (len(success) + 1) < len(failed): 768 # TODO: Handle failing nodes 769 logging.error("More than half of the nodes failed")
770
771 - def _GetNodeIp(self):
772 """Helper for returning the node name/ip list. 773 774 @rtype: (list, list) 775 @return: a tuple of two lists, the first one with the node 776 names and the second one with the node addresses 777 778 """ 779 name_list = self._nodes.keys() 780 addr_list = [self._nodes[name] for name in name_list] 781 return name_list, addr_list
782
783 - def _WriteAndReplicateFileUnlocked(self, file_name, data):
784 """Writes a file locally and then replicates it to all nodes. 785 786 This function will replace the contents of a file on the local 787 node and then replicate it to all the other nodes we have. 788 789 @type file_name: str 790 @param file_name: the path of the file to be replicated 791 @type data: str 792 @param data: the new contents of the file 793 794 """ 795 utils.WriteFile(file_name, data=data) 796 797 names, addrs = self._GetNodeIp() 798 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) 799 self._CheckRpcResult(result, self._nodes, 800 "Updating %s" % file_name)
801
802 - def _RenameFilesUnlocked(self, rename):
803 """Renames a file locally and then replicate the change. 804 805 This function will rename a file in the local queue directory 806 and then replicate this rename to all the other nodes we have. 807 808 @type rename: list of (old, new) 809 @param rename: List containing tuples mapping old to new names 810 811 """ 812 # Rename them locally 813 for old, new in rename: 814 utils.RenameFile(old, new, mkdir=True) 815 816 # ... and on all nodes 817 names, addrs = self._GetNodeIp() 818 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) 819 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
820 821 @staticmethod
822 - def _FormatJobID(job_id):
823 """Convert a job ID to string format. 824 825 Currently this just does C{str(job_id)} after performing some 826 checks, but if we want to change the job id format this will 827 abstract this change. 828 829 @type job_id: int or long 830 @param job_id: the numeric job id 831 @rtype: str 832 @return: the formatted job id 833 834 """ 835 if not isinstance(job_id, (int, long)): 836 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) 837 if job_id < 0: 838 raise errors.ProgrammerError("Job ID %s is negative" % job_id) 839 840 return str(job_id)
841 842 @classmethod
843 - def _GetArchiveDirectory(cls, job_id):
844 """Returns the archive directory for a job. 845 846 @type job_id: str 847 @param job_id: Job identifier 848 @rtype: str 849 @return: Directory name 850 851 """ 852 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
853
854 - def _NewSerialsUnlocked(self, count):
855 """Generates a new job identifier. 856 857 Job identifiers are unique during the lifetime of a cluster. 858 859 @type count: integer 860 @param count: how many serials to return 861 @rtype: str 862 @return: a string representing the job identifier. 863 864 """ 865 assert count > 0 866 # New number 867 serial = self._last_serial + count 868 869 # Write to file 870 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, 871 "%s\n" % serial) 872 873 result = [self._FormatJobID(v) 874 for v in range(self._last_serial, serial + 1)] 875 # Keep it only if we were able to write the file 876 self._last_serial = serial 877 878 return result
879 880 @staticmethod
881 - def _GetJobPath(job_id):
882 """Returns the job file for a given job id. 883 884 @type job_id: str 885 @param job_id: the job identifier 886 @rtype: str 887 @return: the path to the job file 888 889 """ 890 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
891 892 @classmethod
893 - def _GetArchivedJobPath(cls, job_id):
894 """Returns the archived job file for a give job id. 895 896 @type job_id: str 897 @param job_id: the job identifier 898 @rtype: str 899 @return: the path to the archived job file 900 901 """ 902 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, 903 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
904 905 @classmethod
906 - def _ExtractJobID(cls, name):
907 """Extract the job id from a filename. 908 909 @type name: str 910 @param name: the job filename 911 @rtype: job id or None 912 @return: the job id corresponding to the given filename, 913 or None if the filename does not represent a valid 914 job file 915 916 """ 917 m = cls._RE_JOB_FILE.match(name) 918 if m: 919 return m.group(1) 920 else: 921 return None
922
923 - def _GetJobIDsUnlocked(self, archived=False):
924 """Return all known job IDs. 925 926 If the parameter archived is True, archived jobs IDs will be 927 included. Currently this argument is unused. 928 929 The method only looks at disk because it's a requirement that all 930 jobs are present on disk (so in the _memcache we don't have any 931 extra IDs). 932 933 @rtype: list 934 @return: the list of job IDs 935 936 """ 937 # pylint: disable-msg=W0613 938 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()] 939 jlist = utils.NiceSort(jlist) 940 return jlist
941
942 - def _ListJobFiles(self):
943 """Returns the list of current job files. 944 945 @rtype: list 946 @return: the list of job file names 947 948 """ 949 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR) 950 if self._RE_JOB_FILE.match(name)]
951
952 - def _LoadJobUnlocked(self, job_id):
953 """Loads a job from the disk or memory. 954 955 Given a job id, this will return the cached job object if 956 existing, or try to load the job from the disk. If loading from 957 disk, it will also add the job to the cache. 958 959 @param job_id: the job id 960 @rtype: L{_QueuedJob} or None 961 @return: either None or the job object 962 963 """ 964 job = self._memcache.get(job_id, None) 965 if job: 966 logging.debug("Found job %s in memcache", job_id) 967 return job 968 969 filepath = self._GetJobPath(job_id) 970 logging.debug("Loading job from %s", filepath) 971 try: 972 raw_data = utils.ReadFile(filepath) 973 except IOError, err: 974 if err.errno in (errno.ENOENT, ): 975 return None 976 raise 977 978 data = serializer.LoadJson(raw_data) 979 980 try: 981 job = _QueuedJob.Restore(self, data) 982 except Exception, err: # pylint: disable-msg=W0703 983 new_path = self._GetArchivedJobPath(job_id) 984 if filepath == new_path: 985 # job already archived (future case) 986 logging.exception("Can't parse job %s", job_id) 987 else: 988 # non-archived case 989 logging.exception("Can't parse job %s, will archive.", job_id) 990 self._RenameFilesUnlocked([(filepath, new_path)]) 991 return None 992 993 self._memcache[job_id] = job 994 logging.debug("Added job %s to the cache", job_id) 995 return job
996
997 - def _GetJobsUnlocked(self, job_ids):
998 """Return a list of jobs based on their IDs. 999 1000 @type job_ids: list 1001 @param job_ids: either an empty list (meaning all jobs), 1002 or a list of job IDs 1003 @rtype: list 1004 @return: the list of job objects 1005 1006 """ 1007 if not job_ids: 1008 job_ids = self._GetJobIDsUnlocked() 1009 1010 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1011 1012 @staticmethod
1013 - def _IsQueueMarkedDrain():
1014 """Check if the queue is marked from drain. 1015 1016 This currently uses the queue drain file, which makes it a 1017 per-node flag. In the future this can be moved to the config file. 1018 1019 @rtype: boolean 1020 @return: True of the job queue is marked for draining 1021 1022 """ 1023 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1024 1025 @staticmethod
1026 - def SetDrainFlag(drain_flag):
1027 """Sets the drain flag for the queue. 1028 1029 This is similar to the function L{backend.JobQueueSetDrainFlag}, 1030 and in the future we might merge them. 1031 1032 @type drain_flag: boolean 1033 @param drain_flag: Whether to set or unset the drain flag 1034 1035 """ 1036 if drain_flag: 1037 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) 1038 else: 1039 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) 1040 return True
1041 1042 @_RequireOpenQueue
1043 - def _SubmitJobUnlocked(self, job_id, ops):
1044 """Create and store a new job. 1045 1046 This enters the job into our job queue and also puts it on the new 1047 queue, in order for it to be picked up by the queue processors. 1048 1049 @type job_id: job ID 1050 @param job_id: the job ID for the new job 1051 @type ops: list 1052 @param ops: The list of OpCodes that will become the new job. 1053 @rtype: job ID 1054 @return: the job ID of the newly created job 1055 @raise errors.JobQueueDrainError: if the job is marked for draining 1056 1057 """ 1058 if self._IsQueueMarkedDrain(): 1059 raise errors.JobQueueDrainError("Job queue is drained, refusing job") 1060 1061 # Check job queue size 1062 size = len(self._ListJobFiles()) 1063 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT: 1064 # TODO: Autoarchive jobs. Make sure it's not done on every job 1065 # submission, though. 1066 #size = ... 1067 pass 1068 1069 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: 1070 raise errors.JobQueueFull() 1071 1072 job = _QueuedJob(self, job_id, ops) 1073 1074 # Write to disk 1075 self.UpdateJobUnlocked(job) 1076 1077 logging.debug("Adding new job %s to the cache", job_id) 1078 self._memcache[job_id] = job 1079 1080 # Add to worker pool 1081 self._wpool.AddTask(job) 1082 1083 return job.id
1084 1085 @utils.LockedMethod 1086 @_RequireOpenQueue
1087 - def SubmitJob(self, ops):
1088 """Create and store a new job. 1089 1090 @see: L{_SubmitJobUnlocked} 1091 1092 """ 1093 job_id = self._NewSerialsUnlocked(1)[0] 1094 return self._SubmitJobUnlocked(job_id, ops)
1095 1096 @utils.LockedMethod 1097 @_RequireOpenQueue
1098 - def SubmitManyJobs(self, jobs):
1099 """Create and store multiple jobs. 1100 1101 @see: L{_SubmitJobUnlocked} 1102 1103 """ 1104 results = [] 1105 all_job_ids = self._NewSerialsUnlocked(len(jobs)) 1106 for job_id, ops in zip(all_job_ids, jobs): 1107 try: 1108 data = self._SubmitJobUnlocked(job_id, ops) 1109 status = True 1110 except errors.GenericError, err: 1111 data = str(err) 1112 status = False 1113 results.append((status, data)) 1114 1115 return results
1116 1117 @_RequireOpenQueue
1118 - def UpdateJobUnlocked(self, job):
1119 """Update a job's on disk storage. 1120 1121 After a job has been modified, this function needs to be called in 1122 order to write the changes to disk and replicate them to the other 1123 nodes. 1124 1125 @type job: L{_QueuedJob} 1126 @param job: the changed job 1127 1128 """ 1129 filename = self._GetJobPath(job.id) 1130 data = serializer.DumpJson(job.Serialize(), indent=False) 1131 logging.debug("Writing job %s to %s", job.id, filename) 1132 self._WriteAndReplicateFileUnlocked(filename, data) 1133 1134 # Notify waiters about potential changes 1135 job.change.notifyAll()
1136 1137 @utils.LockedMethod 1138 @_RequireOpenQueue
1139 - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, 1140 timeout):
1141 """Waits for changes in a job. 1142 1143 @type job_id: string 1144 @param job_id: Job identifier 1145 @type fields: list of strings 1146 @param fields: Which fields to check for changes 1147 @type prev_job_info: list or None 1148 @param prev_job_info: Last job information returned 1149 @type prev_log_serial: int 1150 @param prev_log_serial: Last job message serial number 1151 @type timeout: float 1152 @param timeout: maximum time to wait 1153 @rtype: tuple (job info, log entries) 1154 @return: a tuple of the job information as required via 1155 the fields parameter, and the log entries as a list 1156 1157 if the job has not changed and the timeout has expired, 1158 we instead return a special value, 1159 L{constants.JOB_NOTCHANGED}, which should be interpreted 1160 as such by the clients 1161 1162 """ 1163 job = self._LoadJobUnlocked(job_id) 1164 if not job: 1165 logging.debug("Job %s not found", job_id) 1166 return None 1167 1168 def _CheckForChanges(): 1169 logging.debug("Waiting for changes in job %s", job_id) 1170 1171 status = job.CalcStatus() 1172 job_info = self._GetJobInfoUnlocked(job, fields) 1173 log_entries = job.GetLogEntries(prev_log_serial) 1174 1175 # Serializing and deserializing data can cause type changes (e.g. from 1176 # tuple to list) or precision loss. We're doing it here so that we get 1177 # the same modifications as the data received from the client. Without 1178 # this, the comparison afterwards might fail without the data being 1179 # significantly different. 1180 job_info = serializer.LoadJson(serializer.DumpJson(job_info)) 1181 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) 1182 1183 # Don't even try to wait if the job is no longer running, there will be 1184 # no changes. 1185 if (status not in (constants.JOB_STATUS_QUEUED, 1186 constants.JOB_STATUS_RUNNING, 1187 constants.JOB_STATUS_WAITLOCK) or 1188 prev_job_info != job_info or 1189 (log_entries and prev_log_serial != log_entries[0][0])): 1190 logging.debug("Job %s changed", job_id) 1191 return (job_info, log_entries) 1192 1193 raise utils.RetryAgain()
1194 1195 try: 1196 # Setting wait function to release the queue lock while waiting 1197 return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout, 1198 wait_fn=job.change.wait) 1199 except utils.RetryTimeout: 1200 return constants.JOB_NOTCHANGED
1201 1202 @utils.LockedMethod 1203 @_RequireOpenQueue
1204 - def CancelJob(self, job_id):
1205 """Cancels a job. 1206 1207 This will only succeed if the job has not started yet. 1208 1209 @type job_id: string 1210 @param job_id: job ID of job to be cancelled. 1211 1212 """ 1213 logging.info("Cancelling job %s", job_id) 1214 1215 job = self._LoadJobUnlocked(job_id) 1216 if not job: 1217 logging.debug("Job %s not found", job_id) 1218 return (False, "Job %s not found" % job_id) 1219 1220 job_status = job.CalcStatus() 1221 1222 if job_status not in (constants.JOB_STATUS_QUEUED, 1223 constants.JOB_STATUS_WAITLOCK): 1224 logging.debug("Job %s is no longer waiting in the queue", job.id) 1225 return (False, "Job %s is no longer waiting in the queue" % job.id) 1226 1227 if job_status == constants.JOB_STATUS_QUEUED: 1228 self.CancelJobUnlocked(job) 1229 return (True, "Job %s canceled" % job.id) 1230 1231 elif job_status == constants.JOB_STATUS_WAITLOCK: 1232 # The worker will notice the new status and cancel the job 1233 try: 1234 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) 1235 finally: 1236 self.UpdateJobUnlocked(job) 1237 return (True, "Job %s will be canceled" % job.id)
1238 1239 @_RequireOpenQueue
1240 - def CancelJobUnlocked(self, job):
1241 """Marks a job as canceled. 1242 1243 """ 1244 try: 1245 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, 1246 "Job canceled by request") 1247 finally: 1248 self.UpdateJobUnlocked(job)
1249 1250 @_RequireOpenQueue
1251 - def _ArchiveJobsUnlocked(self, jobs):
1252 """Archives jobs. 1253 1254 @type jobs: list of L{_QueuedJob} 1255 @param jobs: Job objects 1256 @rtype: int 1257 @return: Number of archived jobs 1258 1259 """ 1260 archive_jobs = [] 1261 rename_files = [] 1262 for job in jobs: 1263 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, 1264 constants.JOB_STATUS_SUCCESS, 1265 constants.JOB_STATUS_ERROR): 1266 logging.debug("Job %s is not yet done", job.id) 1267 continue 1268 1269 archive_jobs.append(job) 1270 1271 old = self._GetJobPath(job.id) 1272 new = self._GetArchivedJobPath(job.id) 1273 rename_files.append((old, new)) 1274 1275 # TODO: What if 1..n files fail to rename? 1276 self._RenameFilesUnlocked(rename_files) 1277 1278 logging.debug("Successfully archived job(s) %s", 1279 utils.CommaJoin(job.id for job in archive_jobs)) 1280 1281 return len(archive_jobs)
1282 1283 @utils.LockedMethod 1284 @_RequireOpenQueue
1285 - def ArchiveJob(self, job_id):
1286 """Archives a job. 1287 1288 This is just a wrapper over L{_ArchiveJobsUnlocked}. 1289 1290 @type job_id: string 1291 @param job_id: Job ID of job to be archived. 1292 @rtype: bool 1293 @return: Whether job was archived 1294 1295 """ 1296 logging.info("Archiving job %s", job_id) 1297 1298 job = self._LoadJobUnlocked(job_id) 1299 if not job: 1300 logging.debug("Job %s not found", job_id) 1301 return False 1302 1303 return self._ArchiveJobsUnlocked([job]) == 1
1304 1305 @utils.LockedMethod 1306 @_RequireOpenQueue
1307 - def AutoArchiveJobs(self, age, timeout):
1308 """Archives all jobs based on age. 1309 1310 The method will archive all jobs which are older than the age 1311 parameter. For jobs that don't have an end timestamp, the start 1312 timestamp will be considered. The special '-1' age will cause 1313 archival of all jobs (that are not running or queued). 1314 1315 @type age: int 1316 @param age: the minimum age in seconds 1317 1318 """ 1319 logging.info("Archiving jobs with age more than %s seconds", age) 1320 1321 now = time.time() 1322 end_time = now + timeout 1323 archived_count = 0 1324 last_touched = 0 1325 1326 all_job_ids = self._GetJobIDsUnlocked(archived=False) 1327 pending = [] 1328 for idx, job_id in enumerate(all_job_ids): 1329 last_touched = idx + 1 1330 1331 # Not optimal because jobs could be pending 1332 # TODO: Measure average duration for job archival and take number of 1333 # pending jobs into account. 1334 if time.time() > end_time: 1335 break 1336 1337 # Returns None if the job failed to load 1338 job = self._LoadJobUnlocked(job_id) 1339 if job: 1340 if job.end_timestamp is None: 1341 if job.start_timestamp is None: 1342 job_age = job.received_timestamp 1343 else: 1344 job_age = job.start_timestamp 1345 else: 1346 job_age = job.end_timestamp 1347 1348 if age == -1 or now - job_age[0] > age: 1349 pending.append(job) 1350 1351 # Archive 10 jobs at a time 1352 if len(pending) >= 10: 1353 archived_count += self._ArchiveJobsUnlocked(pending) 1354 pending = [] 1355 1356 if pending: 1357 archived_count += self._ArchiveJobsUnlocked(pending) 1358 1359 return (archived_count, len(all_job_ids) - last_touched)
1360 1361 @staticmethod
1362 - def _GetJobInfoUnlocked(job, fields):
1363 """Returns information about a job. 1364 1365 @type job: L{_QueuedJob} 1366 @param job: the job which we query 1367 @type fields: list 1368 @param fields: names of fields to return 1369 @rtype: list 1370 @return: list with one element for each field 1371 @raise errors.OpExecError: when an invalid field 1372 has been passed 1373 1374 """ 1375 row = [] 1376 for fname in fields: 1377 if fname == "id": 1378 row.append(job.id) 1379 elif fname == "status": 1380 row.append(job.CalcStatus()) 1381 elif fname == "ops": 1382 row.append([op.input.__getstate__() for op in job.ops]) 1383 elif fname == "opresult": 1384 row.append([op.result for op in job.ops]) 1385 elif fname == "opstatus": 1386 row.append([op.status for op in job.ops]) 1387 elif fname == "oplog": 1388 row.append([op.log for op in job.ops]) 1389 elif fname == "opstart": 1390 row.append([op.start_timestamp for op in job.ops]) 1391 elif fname == "opexec": 1392 row.append([op.exec_timestamp for op in job.ops]) 1393 elif fname == "opend": 1394 row.append([op.end_timestamp for op in job.ops]) 1395 elif fname == "received_ts": 1396 row.append(job.received_timestamp) 1397 elif fname == "start_ts": 1398 row.append(job.start_timestamp) 1399 elif fname == "end_ts": 1400 row.append(job.end_timestamp) 1401 elif fname == "lock_status": 1402 row.append(job.lock_status) 1403 elif fname == "summary": 1404 row.append([op.input.Summary() for op in job.ops]) 1405 else: 1406 raise errors.OpExecError("Invalid job query field '%s'" % fname) 1407 return row
1408 1409 @utils.LockedMethod 1410 @_RequireOpenQueue
1411 - def QueryJobs(self, job_ids, fields):
1412 """Returns a list of jobs in queue. 1413 1414 This is a wrapper of L{_GetJobsUnlocked}, which actually does the 1415 processing for each job. 1416 1417 @type job_ids: list 1418 @param job_ids: sequence of job identifiers or None for all 1419 @type fields: list 1420 @param fields: names of fields to return 1421 @rtype: list 1422 @return: list one element per job, each element being list with 1423 the requested fields 1424 1425 """ 1426 jobs = [] 1427 1428 for job in self._GetJobsUnlocked(job_ids): 1429 if job is None: 1430 jobs.append(None) 1431 else: 1432 jobs.append(self._GetJobInfoUnlocked(job, fields)) 1433 1434 return jobs
1435 1436 @utils.LockedMethod 1437 @_RequireOpenQueue
1438 - def Shutdown(self):
1439 """Stops the job queue. 1440 1441 This shutdowns all the worker threads an closes the queue. 1442 1443 """ 1444 self._wpool.TerminateWorkers() 1445 1446 self._queue_lock.Close() 1447 self._queue_lock = None
1448