Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing the logic behind the cluster operations 
 32   
 33  This module implements the logic for doing operations in the cluster. There 
 34  are two kinds of classes defined: 
 35    - logical units, which know how to deal with their specific opcode only 
 36    - the processor, which dispatches the opcodes to their logical units 
 37   
 38  """ 
 39   
 40  import sys 
 41  import logging 
 42  import random 
 43  import time 
 44  import itertools 
 45  import traceback 
 46   
 47  from ganeti import opcodes 
 48  from ganeti import opcodes_base 
 49  from ganeti import constants 
 50  from ganeti import errors 
 51  from ganeti import hooksmaster 
 52  from ganeti import cmdlib 
 53  from ganeti import locking 
 54  from ganeti import utils 
 55  from ganeti import compat 
 56  from ganeti import wconfd 
 57   
 58   
 59  sighupReceived = [False] 
 60  lusExecuting = [0] 
 61   
 62  _OP_PREFIX = "Op" 
 63  _LU_PREFIX = "LU" 
 64   
 65  #: LU classes which don't need to acquire the node allocation lock 
 66  #: (L{locking.NAL}) when they acquire all node or node resource locks 
 67  _NODE_ALLOC_WHITELIST = frozenset([]) 
 68   
 69  #: LU classes which don't need to acquire the node allocation lock 
 70  #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node 
 71  #: or node resource locks 
 72  _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([ 
 73    cmdlib.LUBackupExport, 
 74    cmdlib.LUBackupRemove, 
 75    cmdlib.LUOobCommand, 
 76    ]) 
 77   
 78   
79 -class LockAcquireTimeout(Exception):
80 """Exception to report timeouts on acquiring locks. 81 82 """
83 84
85 -def _CalculateLockAttemptTimeouts():
86 """Calculate timeouts for lock attempts. 87 88 """ 89 result = [constants.LOCK_ATTEMPTS_MINWAIT] 90 running_sum = result[0] 91 92 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 93 # blocking acquire 94 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 95 timeout = (result[-1] * 1.05) ** 1.25 96 97 # Cap max timeout. This gives other jobs a chance to run even if 98 # we're still trying to get our locks, before finally moving to a 99 # blocking acquire. 100 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 101 # And also cap the lower boundary for safety 102 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 103 104 result.append(timeout) 105 running_sum += timeout 106 107 return result
108 109
110 -class LockAttemptTimeoutStrategy(object):
111 """Class with lock acquire timeout strategy. 112 113 """ 114 __slots__ = [ 115 "_timeouts", 116 "_random_fn", 117 "_time_fn", 118 ] 119 120 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 121
122 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
123 """Initializes this class. 124 125 @param _time_fn: Time function for unittests 126 @param _random_fn: Random number generator for unittests 127 128 """ 129 object.__init__(self) 130 131 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 132 self._time_fn = _time_fn 133 self._random_fn = _random_fn
134
135 - def NextAttempt(self):
136 """Returns the timeout for the next attempt. 137 138 """ 139 try: 140 timeout = self._timeouts.next() 141 except StopIteration: 142 # No more timeouts, do blocking acquire 143 timeout = None 144 145 if timeout is not None: 146 # Add a small variation (-/+ 5%) to timeout. This helps in situations 147 # where two or more jobs are fighting for the same lock(s). 148 variation_range = timeout * 0.1 149 timeout += ((self._random_fn() * variation_range) - 150 (variation_range * 0.5)) 151 152 return timeout
153 154
155 -class OpExecCbBase(object): # pylint: disable=W0232
156 """Base class for OpCode execution callbacks. 157 158 """
159 - def NotifyStart(self):
160 """Called when we are about to execute the LU. 161 162 This function is called when we're about to start the lu's Exec() method, 163 that is, after we have acquired all locks. 164 165 """
166
167 - def NotifyRetry(self):
168 """Called when we are about to reset an LU to retry again. 169 170 This function is called after PrepareRetry successfully completed. 171 172 """
173
174 - def Feedback(self, *args):
175 """Sends feedback from the LU code to the end-user. 176 177 """
178
179 - def CurrentPriority(self): # pylint: disable=R0201
180 """Returns current priority or C{None}. 181 182 """ 183 return None 184
185 - def SubmitManyJobs(self, jobs):
186 """Submits jobs for processing. 187 188 See L{jqueue.JobQueue.SubmitManyJobs}. 189 190 """ 191 raise NotImplementedError
192 193
194 -def _LUNameForOpName(opname):
195 """Computes the LU name for a given OpCode name. 196 197 """ 198 assert opname.startswith(_OP_PREFIX), \ 199 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 200 201 return _LU_PREFIX + opname[len(_OP_PREFIX):]
202 203
204 -def _ComputeDispatchTable():
205 """Computes the opcode-to-lu dispatch table. 206 207 """ 208 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 209 for op in opcodes.OP_MAPPING.values() 210 if op.WITH_LU)
211 212
213 -def _SetBaseOpParams(src, defcomment, dst):
214 """Copies basic opcode parameters. 215 216 @type src: L{opcodes.OpCode} 217 @param src: Source opcode 218 @type defcomment: string 219 @param defcomment: Comment to specify if not already given 220 @type dst: L{opcodes.OpCode} 221 @param dst: Destination opcode 222 223 """ 224 if hasattr(src, "debug_level"): 225 dst.debug_level = src.debug_level 226 227 if (getattr(dst, "priority", None) is None and 228 hasattr(src, "priority")): 229 dst.priority = src.priority 230 231 if not getattr(dst, opcodes_base.COMMENT_ATTR, None): 232 dst.comment = defcomment 233 234 if hasattr(src, constants.OPCODE_REASON): 235 dst.reason = list(getattr(dst, constants.OPCODE_REASON, [])) 236 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
237 238
239 -def _ProcessResult(submit_fn, op, result):
240 """Examines opcode result. 241 242 If necessary, additional processing on the result is done. 243 244 """ 245 if isinstance(result, cmdlib.ResultWithJobs): 246 # Copy basic parameters (e.g. priority) 247 map(compat.partial(_SetBaseOpParams, op, 248 "Submitted by %s" % op.OP_ID), 249 itertools.chain(*result.jobs)) 250 251 # Submit jobs 252 job_submission = submit_fn(result.jobs) 253 254 # Build dictionary 255 result = result.other 256 257 assert constants.JOB_IDS_KEY not in result, \ 258 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 259 260 result[constants.JOB_IDS_KEY] = job_submission 261 262 return result
263 264
265 -def _FailingSubmitManyJobs(_):
266 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 267 268 """ 269 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 270 " queries) can not submit jobs")
271 272
273 -def _VerifyLocks(lu, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST, 274 _nal_whitelist=_NODE_ALLOC_WHITELIST):
275 """Performs consistency checks on locks acquired by a logical unit. 276 277 @type lu: L{cmdlib.LogicalUnit} 278 @param lu: Logical unit instance 279 280 """ 281 if not __debug__: 282 return 283 284 allocset = lu.owned_locks(locking.LEVEL_NODE_ALLOC) 285 have_nal = locking.NAL in allocset 286 287 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: 288 # TODO: Verify using actual lock mode, not using LU variables 289 if level in lu.needed_locks: 290 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC] 291 share_level = lu.share_locks[level] 292 293 if lu.__class__ in _mode_whitelist: 294 assert share_node_alloc != share_level, \ 295 "LU is whitelisted to use different modes for node allocation lock" 296 else: 297 assert bool(share_node_alloc) == bool(share_level), \ 298 ("Node allocation lock must be acquired using the same mode as nodes" 299 " and node resources") 300 301 if lu.__class__ in _nal_whitelist: 302 assert not have_nal, \ 303 "LU is whitelisted for not acquiring the node allocation lock" 304 elif lu.needed_locks[level] == locking.ALL_SET: 305 assert have_nal, \ 306 ("Node allocation lock must be used if an LU acquires all nodes" 307 " or node resources")
308 309
310 -def _LockList(names):
311 """If 'names' is a string, make it a single-element list. 312 313 @type names: list or string or NoneType 314 @param names: Lock names 315 @rtype: a list of strings 316 @return: if 'names' argument is an iterable, a list of it; 317 if it's a string, make it a one-element list; 318 if L{locking.ALL_SET}, L{locking.ALL_SET} 319 320 """ 321 if names == locking.ALL_SET: 322 return names 323 elif isinstance(names, basestring): 324 return [names] 325 else: 326 return list(names)
327 328
329 -class Processor(object):
330 """Object which runs OpCodes""" 331 DISPATCH_TABLE = _ComputeDispatchTable() 332
333 - def __init__(self, context, ec_id, enable_locks=True):
334 """Constructor for Processor 335 336 @type context: GanetiContext 337 @param context: global Ganeti context 338 @type ec_id: string 339 @param ec_id: execution context identifier 340 341 """ 342 self.context = context 343 self._ec_id = ec_id 344 self._cbs = None 345 self.cfg = context.GetConfig(ec_id) 346 self.rpc = context.GetRpc(self.cfg) 347 self.hmclass = hooksmaster.HooksMaster 348 self._enable_locks = enable_locks 349 self.wconfd = wconfd # Indirection to allow testing 350 self._wconfdcontext = context.GetWConfdContext(ec_id)
351
352 - def _CheckLocksEnabled(self):
353 """Checks if locking is enabled. 354 355 @raise errors.ProgrammerError: In case locking is not enabled 356 357 """ 358 if not self._enable_locks: 359 raise errors.ProgrammerError("Attempted to use disabled locks")
360
361 - def _RequestAndWait(self, request, timeout):
362 """Request locks from WConfD and wait for them to be granted. 363 364 @type request: list 365 @param request: the lock request to be sent to WConfD 366 @type timeout: float 367 @param timeout: the time to wait for the request to be granted 368 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 369 amount of time; in this case, locks still might be acquired or a request 370 pending. 371 372 """ 373 logging.debug("Trying %ss to request %s for %s", 374 timeout, request, self._wconfdcontext) 375 if self._cbs: 376 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 377 else: 378 priority = None 379 380 if priority is None: 381 priority = constants.OP_PRIO_DEFAULT 382 383 ## Expect a signal 384 if sighupReceived[0]: 385 logging.warning("Ignoring unexpected SIGHUP") 386 sighupReceived[0] = False 387 388 # Request locks 389 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 390 request) 391 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 392 393 if pending: 394 def _HasPending(): 395 if sighupReceived[0]: 396 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 397 else: 398 return True
399 400 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout) 401 402 signal = sighupReceived[0] 403 404 if pending: 405 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 406 407 if pending and signal: 408 logging.warning("Ignoring unexpected SIGHUP") 409 sighupReceived[0] = False 410 411 logging.debug("Finished trying. Pending: %s", pending) 412 if pending: 413 raise LockAcquireTimeout()
414
415 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout, 416 opportunistic_count=1, request_only=False):
417 """Acquires locks via the Ganeti lock manager. 418 419 @type level: int 420 @param level: Lock level 421 @type names: list or string 422 @param names: Lock names 423 @type shared: bool 424 @param shared: Whether the locks should be acquired in shared mode 425 @type opportunistic: bool 426 @param opportunistic: Whether to acquire opportunistically 427 @type timeout: None or float 428 @param timeout: Timeout for acquiring the locks 429 @type request_only: bool 430 @param request_only: do not acquire the locks, just return the request 431 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 432 amount of time; in this case, locks still might be acquired or a request 433 pending. 434 435 """ 436 self._CheckLocksEnabled() 437 438 if self._cbs: 439 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 440 else: 441 priority = None 442 443 if priority is None: 444 priority = constants.OP_PRIO_DEFAULT 445 446 if names == locking.ALL_SET: 447 if opportunistic: 448 expand_fns = { 449 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]), 450 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList, 451 locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]), 452 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList, 453 locking.LEVEL_NODE: self.cfg.GetNodeList, 454 locking.LEVEL_NODE_RES: self.cfg.GetNodeList, 455 locking.LEVEL_NETWORK: self.cfg.GetNetworkList, 456 } 457 names = expand_fns[level]() 458 else: 459 names = locking.LOCKSET_NAME 460 461 names = _LockList(names) 462 463 # For locks of the same level, the lock order is lexicographic 464 names.sort() 465 466 levelname = locking.LEVEL_NAMES[level] 467 468 locks = ["%s/%s" % (levelname, lock) for lock in list(names)] 469 470 if not names: 471 logging.debug("Acquiring no locks for (%s) at level %s", 472 self._wconfdcontext, levelname) 473 return [] 474 475 if shared: 476 request = [[lock, "shared"] for lock in locks] 477 else: 478 request = [[lock, "exclusive"] for lock in locks] 479 480 if request_only: 481 logging.debug("Lock request for level %s is %s", level, request) 482 return request 483 484 self.cfg.OutDate() 485 486 if timeout is None: 487 ## Note: once we are so desperate for locks to request them 488 ## unconditionally, we no longer care about an original plan 489 ## to acquire locks opportunistically. 490 logging.info("Definitely requesting %s for %s", 491 request, self._wconfdcontext) 492 ## The only way to be sure of not getting starved is to sequentially 493 ## acquire the locks one by one (in lock order). 494 for r in request: 495 logging.debug("Definite request %s for %s", r, self._wconfdcontext) 496 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 497 [r]) 498 while True: 499 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 500 if not pending: 501 break 502 time.sleep(10.0 * random.random()) 503 504 elif opportunistic: 505 logging.debug("For %ss trying to opportunistically acquire" 506 " at least %d of %s for %s.", 507 timeout, opportunistic_count, locks, self._wconfdcontext) 508 locks = utils.SimpleRetry( 509 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion, 510 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request]) 511 logging.debug("Managed to get the following locks: %s", locks) 512 if locks == []: 513 raise LockAcquireTimeout() 514 else: 515 self._RequestAndWait(request, timeout) 516 517 return locks
518
519 - def _ExecLU(self, lu):
520 """Logical Unit execution sequence. 521 522 """ 523 write_count = self.cfg.write_count 524 lu.cfg.OutDate() 525 lu.CheckPrereq() 526 527 hm = self.BuildHooksManager(lu) 528 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 529 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 530 self.Log, None) 531 532 if getattr(lu.op, "dry_run", False): 533 # in this mode, no post-hooks are run, and the config is not 534 # written (as it might have been modified by another LU, and we 535 # shouldn't do writeout on behalf of other threads 536 self.LogInfo("dry-run mode requested, not actually executing" 537 " the operation") 538 return lu.dry_run_result 539 540 if self._cbs: 541 submit_mj_fn = self._cbs.SubmitManyJobs 542 else: 543 submit_mj_fn = _FailingSubmitManyJobs 544 545 lusExecuting[0] += 1 546 try: 547 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 548 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 549 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 550 self.Log, result) 551 finally: 552 # FIXME: This needs locks if not lu_class.REQ_BGL 553 lusExecuting[0] -= 1 554 if write_count != self.cfg.write_count: 555 hm.RunConfigUpdate() 556 557 return result
558
559 - def BuildHooksManager(self, lu):
560 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
561
562 - def _LockAndExecLU(self, lu, level, calc_timeout, pending=None):
563 """Execute a Logical Unit, with the needed locks. 564 565 This is a recursive function that starts locking the given level, and 566 proceeds up, till there are no more locks to acquire. Then it executes the 567 given LU and its opcodes. 568 569 """ 570 pending = pending or [] 571 logging.debug("Looking at locks of level %s, still need to obtain %s", 572 level, pending) 573 adding_locks = level in lu.add_locks 574 acquiring_locks = level in lu.needed_locks 575 576 if level not in locking.LEVELS: 577 if pending: 578 self._RequestAndWait(pending, calc_timeout()) 579 lu.cfg.OutDate() 580 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 581 pending = [] 582 583 logging.debug("Finished acquiring locks") 584 585 _VerifyLocks(lu) 586 587 if self._cbs: 588 self._cbs.NotifyStart() 589 590 try: 591 result = self._ExecLU(lu) 592 except errors.OpPrereqError, err: 593 (_, ecode) = err.args 594 if ecode != errors.ECODE_TEMP_NORES: 595 raise 596 logging.debug("Temporarily out of resources; will retry internally") 597 try: 598 lu.PrepareRetry(self.Log) 599 if self._cbs: 600 self._cbs.NotifyRetry() 601 except errors.OpRetryNotSupportedError: 602 logging.debug("LU does not know how to retry.") 603 raise err 604 raise LockAcquireTimeout() 605 except AssertionError, err: 606 # this is a bit ugly, as we don't know from which phase 607 # (prereq, exec) this comes; but it's better than an exception 608 # with no information 609 (_, _, tb) = sys.exc_info() 610 err_info = traceback.format_tb(tb) 611 del tb 612 logging.exception("Detected AssertionError") 613 raise errors.OpExecError("Internal assertion error: please report" 614 " this as a bug.\nError message: '%s';" 615 " location:\n%s" % (str(err), err_info[-1])) 616 return result 617 618 # Determine if the acquiring is opportunistic up front 619 opportunistic = lu.opportunistic_locks[level] 620 621 dont_collate = lu.dont_collate_locks[level] 622 623 if dont_collate and pending: 624 self._RequestAndWait(pending, calc_timeout()) 625 lu.cfg.OutDate() 626 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 627 pending = [] 628 629 if adding_locks and opportunistic: 630 # We could simultaneously acquire locks opportunistically and add new 631 # ones, but that would require altering the API, and no use cases are 632 # present in the system at the moment. 633 raise NotImplementedError("Can't opportunistically acquire locks when" 634 " adding new ones") 635 636 if adding_locks and acquiring_locks and \ 637 lu.needed_locks[level] == locking.ALL_SET: 638 # It would also probably be possible to acquire all locks of a certain 639 # type while adding new locks, but there is no use case at the moment. 640 raise NotImplementedError("Can't request all locks of a certain level" 641 " and add new locks") 642 643 if adding_locks or acquiring_locks: 644 self._CheckLocksEnabled() 645 646 lu.DeclareLocks(level) 647 share = lu.share_locks[level] 648 opportunistic_count = lu.opportunistic_locks_count[level] 649 650 try: 651 if acquiring_locks: 652 needed_locks = _LockList(lu.needed_locks[level]) 653 else: 654 needed_locks = [] 655 656 if adding_locks: 657 needed_locks.extend(_LockList(lu.add_locks[level])) 658 659 timeout = calc_timeout() 660 if timeout is not None and not opportunistic: 661 pending = pending + self._AcquireLocks(level, needed_locks, share, 662 opportunistic, timeout, 663 request_only=True) 664 else: 665 if pending: 666 self._RequestAndWait(pending, calc_timeout()) 667 lu.cfg.OutDate() 668 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 669 pending = [] 670 self._AcquireLocks(level, needed_locks, share, opportunistic, 671 timeout, 672 opportunistic_count=opportunistic_count) 673 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 674 675 result = self._LockAndExecLU(lu, level + 1, calc_timeout, 676 pending=pending) 677 finally: 678 levelname = locking.LEVEL_NAMES[level] 679 logging.debug("Freeing locks at level %s for %s", 680 levelname, self._wconfdcontext) 681 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname) 682 else: 683 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending) 684 685 return result
686 687 # pylint: disable=R0201
688 - def _CheckLUResult(self, op, result):
689 """Check the LU result against the contract in the opcode. 690 691 """ 692 resultcheck_fn = op.OP_RESULT 693 if not (resultcheck_fn is None or resultcheck_fn(result)): 694 logging.error("Expected opcode result matching %s, got %s", 695 resultcheck_fn, result) 696 if not getattr(op, "dry_run", False): 697 # FIXME: LUs should still behave in dry_run mode, or 698 # alternately we should have OP_DRYRUN_RESULT; in the 699 # meantime, we simply skip the OP_RESULT check in dry-run mode 700 raise errors.OpResultError("Opcode result does not match %s: %s" % 701 (resultcheck_fn, utils.Truncate(result, 80)))
702
703 - def ExecOpCode(self, op, cbs, timeout=None):
704 """Execute an opcode. 705 706 @type op: an OpCode instance 707 @param op: the opcode to be executed 708 @type cbs: L{OpExecCbBase} 709 @param cbs: Runtime callbacks 710 @type timeout: float or None 711 @param timeout: Maximum time to acquire all locks, None for no timeout 712 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 713 amount of time 714 715 """ 716 if not isinstance(op, opcodes.OpCode): 717 raise errors.ProgrammerError("Non-opcode instance passed" 718 " to ExecOpcode (%s)" % type(op)) 719 720 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 721 if lu_class is None: 722 raise errors.OpCodeUnknown("Unknown opcode") 723 724 if timeout is None: 725 calc_timeout = lambda: None 726 else: 727 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 728 729 self._cbs = cbs 730 try: 731 if self._enable_locks: 732 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 733 # and in a shared fashion otherwise (to prevent concurrent run with 734 # an exclusive LU. 735 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 736 not lu_class.REQ_BGL, False, calc_timeout()) 737 elif lu_class.REQ_BGL: 738 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 739 " disabled" % op.OP_ID) 740 741 lu = lu_class(self, op, self.context, self.cfg, self.rpc, 742 self._wconfdcontext, self.wconfd) 743 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 744 lu.ExpandNames() 745 assert lu.needed_locks is not None, "needed_locks not set by LU" 746 747 try: 748 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 749 calc_timeout) 750 finally: 751 if self._ec_id: 752 self.cfg.DropECReservations(self._ec_id) 753 finally: 754 self.wconfd.Client().FreeLocksLevel( 755 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER]) 756 self._cbs = None 757 758 self._CheckLUResult(op, result) 759 760 return result
761
762 - def Log(self, *args):
763 """Forward call to feedback callback function. 764 765 """ 766 if self._cbs: 767 self._cbs.Feedback(*args)
768
769 - def LogStep(self, current, total, message):
770 """Log a change in LU execution progress. 771 772 """ 773 logging.debug("Step %d/%d %s", current, total, message) 774 self.Log("STEP %d/%d %s" % (current, total, message))
775
776 - def LogWarning(self, message, *args, **kwargs):
777 """Log a warning to the logs and the user. 778 779 The optional keyword argument is 'hint' and can be used to show a 780 hint to the user (presumably related to the warning). If the 781 message is empty, it will not be printed at all, allowing one to 782 show only a hint. 783 784 """ 785 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 786 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 787 if args: 788 message = message % tuple(args) 789 if message: 790 logging.warning(message) 791 self.Log(" - WARNING: %s" % message) 792 if "hint" in kwargs: 793 self.Log(" Hint: %s" % kwargs["hint"])
794
795 - def LogInfo(self, message, *args):
796 """Log an informational message to the logs and the user. 797 798 """ 799 if args: 800 message = message % tuple(args) 801 logging.info(message) 802 self.Log(" - INFO: %s" % message)
803
804 - def GetECId(self):
805 """Returns the current execution context ID. 806 807 """ 808 if not self._ec_id: 809 raise errors.ProgrammerError("Tried to use execution context id when" 810 " not set") 811 return self._ec_id
812