Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing the logic behind the cluster operations 
 32   
 33  This module implements the logic for doing operations in the cluster. There 
 34  are two kinds of classes defined: 
 35    - logical units, which know how to deal with their specific opcode only 
 36    - the processor, which dispatches the opcodes to their logical units 
 37   
 38  """ 
 39   
 40  import sys 
 41  import logging 
 42  import random 
 43  import time 
 44  import itertools 
 45  import traceback 
 46   
 47  from ganeti import opcodes 
 48  from ganeti import opcodes_base 
 49  from ganeti import constants 
 50  from ganeti import errors 
 51  from ganeti import hooksmaster 
 52  from ganeti import cmdlib 
 53  from ganeti import locking 
 54  from ganeti import utils 
 55  from ganeti import compat 
 56  from ganeti import wconfd 
 57   
 58   
 59  sighupReceived = [False] 
 60  lusExecuting = [0] 
 61   
 62  _OP_PREFIX = "Op" 
 63  _LU_PREFIX = "LU" 
 64   
 65  #: LU classes which don't need to acquire the node allocation lock 
 66  #: (L{locking.NAL}) when they acquire all node or node resource locks 
 67  _NODE_ALLOC_WHITELIST = frozenset([]) 
 68   
 69  #: LU classes which don't need to acquire the node allocation lock 
 70  #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node 
 71  #: or node resource locks 
 72  _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([ 
 73    cmdlib.LUBackupExport, 
 74    cmdlib.LUBackupRemove, 
 75    cmdlib.LUOobCommand, 
 76    ]) 
 77   
 78   
79 -class LockAcquireTimeout(Exception):
80 """Exception to report timeouts on acquiring locks. 81 82 """
83 84
85 -def _CalculateLockAttemptTimeouts():
86 """Calculate timeouts for lock attempts. 87 88 """ 89 result = [constants.LOCK_ATTEMPTS_MINWAIT] 90 running_sum = result[0] 91 92 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 93 # blocking acquire 94 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 95 timeout = (result[-1] * 1.05) ** 1.25 96 97 # Cap max timeout. This gives other jobs a chance to run even if 98 # we're still trying to get our locks, before finally moving to a 99 # blocking acquire. 100 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 101 # And also cap the lower boundary for safety 102 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 103 104 result.append(timeout) 105 running_sum += timeout 106 107 return result
108 109
110 -class LockAttemptTimeoutStrategy(object):
111 """Class with lock acquire timeout strategy. 112 113 """ 114 __slots__ = [ 115 "_timeouts", 116 "_random_fn", 117 "_time_fn", 118 ] 119 120 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 121
122 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
123 """Initializes this class. 124 125 @param _time_fn: Time function for unittests 126 @param _random_fn: Random number generator for unittests 127 128 """ 129 object.__init__(self) 130 131 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 132 self._time_fn = _time_fn 133 self._random_fn = _random_fn
134
135 - def NextAttempt(self):
136 """Returns the timeout for the next attempt. 137 138 """ 139 try: 140 timeout = self._timeouts.next() 141 except StopIteration: 142 # No more timeouts, do blocking acquire 143 timeout = None 144 145 if timeout is not None: 146 # Add a small variation (-/+ 5%) to timeout. This helps in situations 147 # where two or more jobs are fighting for the same lock(s). 148 variation_range = timeout * 0.1 149 timeout += ((self._random_fn() * variation_range) - 150 (variation_range * 0.5)) 151 152 return timeout
153 154
155 -class OpExecCbBase(object): # pylint: disable=W0232
156 """Base class for OpCode execution callbacks. 157 158 """
159 - def NotifyStart(self):
160 """Called when we are about to execute the LU. 161 162 This function is called when we're about to start the lu's Exec() method, 163 that is, after we have acquired all locks. 164 165 """
166
167 - def Feedback(self, *args):
168 """Sends feedback from the LU code to the end-user. 169 170 """
171
172 - def CurrentPriority(self): # pylint: disable=R0201
173 """Returns current priority or C{None}. 174 175 """ 176 return None 177
178 - def SubmitManyJobs(self, jobs):
179 """Submits jobs for processing. 180 181 See L{jqueue.JobQueue.SubmitManyJobs}. 182 183 """ 184 raise NotImplementedError
185 186
187 -def _LUNameForOpName(opname):
188 """Computes the LU name for a given OpCode name. 189 190 """ 191 assert opname.startswith(_OP_PREFIX), \ 192 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 193 194 return _LU_PREFIX + opname[len(_OP_PREFIX):]
195 196
197 -def _ComputeDispatchTable():
198 """Computes the opcode-to-lu dispatch table. 199 200 """ 201 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 202 for op in opcodes.OP_MAPPING.values() 203 if op.WITH_LU)
204 205
206 -def _SetBaseOpParams(src, defcomment, dst):
207 """Copies basic opcode parameters. 208 209 @type src: L{opcodes.OpCode} 210 @param src: Source opcode 211 @type defcomment: string 212 @param defcomment: Comment to specify if not already given 213 @type dst: L{opcodes.OpCode} 214 @param dst: Destination opcode 215 216 """ 217 if hasattr(src, "debug_level"): 218 dst.debug_level = src.debug_level 219 220 if (getattr(dst, "priority", None) is None and 221 hasattr(src, "priority")): 222 dst.priority = src.priority 223 224 if not getattr(dst, opcodes_base.COMMENT_ATTR, None): 225 dst.comment = defcomment 226 227 if hasattr(src, constants.OPCODE_REASON): 228 dst.reason = list(getattr(dst, constants.OPCODE_REASON, [])) 229 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
230 231
232 -def _ProcessResult(submit_fn, op, result):
233 """Examines opcode result. 234 235 If necessary, additional processing on the result is done. 236 237 """ 238 if isinstance(result, cmdlib.ResultWithJobs): 239 # Copy basic parameters (e.g. priority) 240 map(compat.partial(_SetBaseOpParams, op, 241 "Submitted by %s" % op.OP_ID), 242 itertools.chain(*result.jobs)) 243 244 # Submit jobs 245 job_submission = submit_fn(result.jobs) 246 247 # Build dictionary 248 result = result.other 249 250 assert constants.JOB_IDS_KEY not in result, \ 251 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 252 253 result[constants.JOB_IDS_KEY] = job_submission 254 255 return result
256 257
258 -def _FailingSubmitManyJobs(_):
259 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 260 261 """ 262 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 263 " queries) can not submit jobs")
264 265
266 -def _VerifyLocks(lu, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST, 267 _nal_whitelist=_NODE_ALLOC_WHITELIST):
268 """Performs consistency checks on locks acquired by a logical unit. 269 270 @type lu: L{cmdlib.LogicalUnit} 271 @param lu: Logical unit instance 272 273 """ 274 if not __debug__: 275 return 276 277 allocset = lu.owned_locks(locking.LEVEL_NODE_ALLOC) 278 have_nal = locking.NAL in allocset 279 280 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: 281 # TODO: Verify using actual lock mode, not using LU variables 282 if level in lu.needed_locks: 283 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC] 284 share_level = lu.share_locks[level] 285 286 if lu.__class__ in _mode_whitelist: 287 assert share_node_alloc != share_level, \ 288 "LU is whitelisted to use different modes for node allocation lock" 289 else: 290 assert bool(share_node_alloc) == bool(share_level), \ 291 ("Node allocation lock must be acquired using the same mode as nodes" 292 " and node resources") 293 294 if lu.__class__ in _nal_whitelist: 295 assert not have_nal, \ 296 "LU is whitelisted for not acquiring the node allocation lock" 297 elif lu.needed_locks[level] == locking.ALL_SET: 298 assert have_nal, \ 299 ("Node allocation lock must be used if an LU acquires all nodes" 300 " or node resources")
301 302
303 -def _LockList(names):
304 """If 'names' is a string, make it a single-element list. 305 306 @type names: list or string or NoneType 307 @param names: Lock names 308 @rtype: a list of strings 309 @return: if 'names' argument is an iterable, a list of it; 310 if it's a string, make it a one-element list; 311 if L{locking.ALL_SET}, L{locking.ALL_SET} 312 313 """ 314 if names == locking.ALL_SET: 315 return names 316 elif isinstance(names, basestring): 317 return [names] 318 else: 319 return list(names)
320 321
322 -class Processor(object):
323 """Object which runs OpCodes""" 324 DISPATCH_TABLE = _ComputeDispatchTable() 325
326 - def __init__(self, context, ec_id, enable_locks=True):
327 """Constructor for Processor 328 329 @type context: GanetiContext 330 @param context: global Ganeti context 331 @type ec_id: string 332 @param ec_id: execution context identifier 333 334 """ 335 self.context = context 336 self._ec_id = ec_id 337 self._cbs = None 338 self.cfg = context.GetConfig(ec_id) 339 self.rpc = context.GetRpc(self.cfg) 340 self.hmclass = hooksmaster.HooksMaster 341 self._enable_locks = enable_locks 342 self.wconfd = wconfd # Indirection to allow testing 343 self._wconfdcontext = context.GetWConfdContext(ec_id)
344
345 - def _CheckLocksEnabled(self):
346 """Checks if locking is enabled. 347 348 @raise errors.ProgrammerError: In case locking is not enabled 349 350 """ 351 if not self._enable_locks: 352 raise errors.ProgrammerError("Attempted to use disabled locks")
353
354 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout, 355 opportunistic_count=1):
356 """Acquires locks via the Ganeti lock manager. 357 358 @type level: int 359 @param level: Lock level 360 @type names: list or string 361 @param names: Lock names 362 @type shared: bool 363 @param shared: Whether the locks should be acquired in shared mode 364 @type opportunistic: bool 365 @param opportunistic: Whether to acquire opportunistically 366 @type timeout: None or float 367 @param timeout: Timeout for acquiring the locks 368 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 369 amount of time 370 371 """ 372 self._CheckLocksEnabled() 373 374 if self._cbs: 375 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 376 else: 377 priority = None 378 379 if priority is None: 380 priority = constants.OP_PRIO_DEFAULT 381 382 if names == locking.ALL_SET: 383 if opportunistic: 384 expand_fns = { 385 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]), 386 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList, 387 locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]), 388 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList, 389 locking.LEVEL_NODE: self.cfg.GetNodeList, 390 locking.LEVEL_NODE_RES: self.cfg.GetNodeList, 391 locking.LEVEL_NETWORK: self.cfg.GetNetworkList, 392 } 393 names = expand_fns[level]() 394 else: 395 names = locking.LOCKSET_NAME 396 397 names = _LockList(names) 398 399 # For locks of the same level, the lock order is lexicographic 400 names.sort() 401 402 levelname = locking.LEVEL_NAMES[level] 403 404 locks = ["%s/%s" % (levelname, lock) for lock in list(names)] 405 406 if not names: 407 logging.debug("Acquiring no locks for (%s) at level %s", 408 self._wconfdcontext, levelname) 409 return [] 410 411 if shared: 412 request = [[lock, "shared"] for lock in locks] 413 else: 414 request = [[lock, "exclusive"] for lock in locks] 415 416 self.cfg.OutDate() 417 418 if timeout is None: 419 ## Note: once we are so desperate for locks to request them 420 ## unconditionally, we no longer care about an original plan 421 ## to acquire locks opportunistically. 422 logging.info("Definitely requesting %s for %s", 423 request, self._wconfdcontext) 424 ## The only way to be sure of not getting starved is to sequentially 425 ## acquire the locks one by one (in lock order). 426 for r in request: 427 logging.debug("Definite request %s for %s", r, self._wconfdcontext) 428 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 429 [r]) 430 while True: 431 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 432 if not pending: 433 break 434 time.sleep(10.0 * random.random()) 435 436 elif opportunistic: 437 logging.debug("For %ss trying to opportunistically acquire" 438 " at least %d of %s for %s.", 439 timeout, opportunistic_count, locks, self._wconfdcontext) 440 locks = utils.SimpleRetry( 441 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion, 442 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request]) 443 logging.debug("Managed to get the following locks: %s", locks) 444 if locks == []: 445 raise LockAcquireTimeout() 446 else: 447 logging.debug("Trying %ss to request %s for %s", 448 timeout, request, self._wconfdcontext) 449 ## Expect a signal 450 if sighupReceived[0]: 451 logging.warning("Ignoring unexpected SIGHUP") 452 sighupReceived[0] = False 453 454 # Request locks 455 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 456 request) 457 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 458 459 if pending: 460 def _HasPending(): 461 if sighupReceived[0]: 462 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 463 else: 464 return True
465 466 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout) 467 468 signal = sighupReceived[0] 469 470 if pending: 471 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 472 473 if pending and signal: 474 logging.warning("Ignoring unexpected SIGHUP") 475 sighupReceived[0] = False 476 477 logging.debug("Finished trying. Pending: %s", pending) 478 if pending: 479 # drop the pending request and all locks potentially obtained in the 480 # time since the last poll. 481 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname) 482 raise LockAcquireTimeout() 483 484 return locks
485
486 - def _ExecLU(self, lu):
487 """Logical Unit execution sequence. 488 489 """ 490 write_count = self.cfg.write_count 491 lu.cfg.OutDate() 492 lu.CheckPrereq() 493 494 hm = self.BuildHooksManager(lu) 495 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 496 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 497 self.Log, None) 498 499 if getattr(lu.op, "dry_run", False): 500 # in this mode, no post-hooks are run, and the config is not 501 # written (as it might have been modified by another LU, and we 502 # shouldn't do writeout on behalf of other threads 503 self.LogInfo("dry-run mode requested, not actually executing" 504 " the operation") 505 return lu.dry_run_result 506 507 if self._cbs: 508 submit_mj_fn = self._cbs.SubmitManyJobs 509 else: 510 submit_mj_fn = _FailingSubmitManyJobs 511 512 lusExecuting[0] += 1 513 try: 514 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 515 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 516 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 517 self.Log, result) 518 finally: 519 # FIXME: This needs locks if not lu_class.REQ_BGL 520 lusExecuting[0] -= 1 521 if write_count != self.cfg.write_count: 522 hm.RunConfigUpdate() 523 524 return result
525
526 - def BuildHooksManager(self, lu):
527 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
528
529 - def _LockAndExecLU(self, lu, level, calc_timeout):
530 """Execute a Logical Unit, with the needed locks. 531 532 This is a recursive function that starts locking the given level, and 533 proceeds up, till there are no more locks to acquire. Then it executes the 534 given LU and its opcodes. 535 536 """ 537 adding_locks = level in lu.add_locks 538 acquiring_locks = level in lu.needed_locks 539 540 if level not in locking.LEVELS: 541 _VerifyLocks(lu) 542 543 if self._cbs: 544 self._cbs.NotifyStart() 545 546 try: 547 result = self._ExecLU(lu) 548 except AssertionError, err: 549 # this is a bit ugly, as we don't know from which phase 550 # (prereq, exec) this comes; but it's better than an exception 551 # with no information 552 (_, _, tb) = sys.exc_info() 553 err_info = traceback.format_tb(tb) 554 del tb 555 logging.exception("Detected AssertionError") 556 raise errors.OpExecError("Internal assertion error: please report" 557 " this as a bug.\nError message: '%s';" 558 " location:\n%s" % (str(err), err_info[-1])) 559 return result 560 561 # Determine if the acquiring is opportunistic up front 562 opportunistic = lu.opportunistic_locks[level] 563 564 if adding_locks and opportunistic: 565 # We could simultaneously acquire locks opportunistically and add new 566 # ones, but that would require altering the API, and no use cases are 567 # present in the system at the moment. 568 raise NotImplementedError("Can't opportunistically acquire locks when" 569 " adding new ones") 570 571 if adding_locks and acquiring_locks and \ 572 lu.needed_locks[level] == locking.ALL_SET: 573 # It would also probably be possible to acquire all locks of a certain 574 # type while adding new locks, but there is no use case at the moment. 575 raise NotImplementedError("Can't request all locks of a certain level" 576 " and add new locks") 577 578 if adding_locks or acquiring_locks: 579 self._CheckLocksEnabled() 580 581 lu.DeclareLocks(level) 582 share = lu.share_locks[level] 583 opportunistic_count = lu.opportunistic_locks_count[level] 584 585 try: 586 if acquiring_locks: 587 needed_locks = _LockList(lu.needed_locks[level]) 588 else: 589 needed_locks = [] 590 591 if adding_locks: 592 needed_locks.extend(_LockList(lu.add_locks[level])) 593 594 self._AcquireLocks(level, needed_locks, share, opportunistic, 595 calc_timeout(), 596 opportunistic_count=opportunistic_count) 597 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 598 599 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 600 finally: 601 levelname = locking.LEVEL_NAMES[level] 602 logging.debug("Freeing locks at level %s for %s", 603 levelname, self._wconfdcontext) 604 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname) 605 else: 606 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 607 608 return result
609 610 # pylint: disable=R0201
611 - def _CheckLUResult(self, op, result):
612 """Check the LU result against the contract in the opcode. 613 614 """ 615 resultcheck_fn = op.OP_RESULT 616 if not (resultcheck_fn is None or resultcheck_fn(result)): 617 logging.error("Expected opcode result matching %s, got %s", 618 resultcheck_fn, result) 619 if not getattr(op, "dry_run", False): 620 # FIXME: LUs should still behave in dry_run mode, or 621 # alternately we should have OP_DRYRUN_RESULT; in the 622 # meantime, we simply skip the OP_RESULT check in dry-run mode 623 raise errors.OpResultError("Opcode result does not match %s: %s" % 624 (resultcheck_fn, utils.Truncate(result, 80)))
625
626 - def ExecOpCode(self, op, cbs, timeout=None):
627 """Execute an opcode. 628 629 @type op: an OpCode instance 630 @param op: the opcode to be executed 631 @type cbs: L{OpExecCbBase} 632 @param cbs: Runtime callbacks 633 @type timeout: float or None 634 @param timeout: Maximum time to acquire all locks, None for no timeout 635 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 636 amount of time 637 638 """ 639 if not isinstance(op, opcodes.OpCode): 640 raise errors.ProgrammerError("Non-opcode instance passed" 641 " to ExecOpcode (%s)" % type(op)) 642 643 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 644 if lu_class is None: 645 raise errors.OpCodeUnknown("Unknown opcode") 646 647 if timeout is None: 648 calc_timeout = lambda: None 649 else: 650 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 651 652 self._cbs = cbs 653 try: 654 if self._enable_locks: 655 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 656 # and in a shared fashion otherwise (to prevent concurrent run with 657 # an exclusive LU. 658 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 659 not lu_class.REQ_BGL, False, calc_timeout()) 660 elif lu_class.REQ_BGL: 661 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 662 " disabled" % op.OP_ID) 663 664 lu = lu_class(self, op, self.context, self.cfg, self.rpc, 665 self._wconfdcontext, self.wconfd) 666 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 667 lu.ExpandNames() 668 assert lu.needed_locks is not None, "needed_locks not set by LU" 669 670 try: 671 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 672 calc_timeout) 673 finally: 674 if self._ec_id: 675 self.cfg.DropECReservations(self._ec_id) 676 finally: 677 self.wconfd.Client().FreeLocksLevel( 678 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER]) 679 self._cbs = None 680 681 self._CheckLUResult(op, result) 682 683 return result
684
685 - def Log(self, *args):
686 """Forward call to feedback callback function. 687 688 """ 689 if self._cbs: 690 self._cbs.Feedback(*args)
691
692 - def LogStep(self, current, total, message):
693 """Log a change in LU execution progress. 694 695 """ 696 logging.debug("Step %d/%d %s", current, total, message) 697 self.Log("STEP %d/%d %s" % (current, total, message))
698
699 - def LogWarning(self, message, *args, **kwargs):
700 """Log a warning to the logs and the user. 701 702 The optional keyword argument is 'hint' and can be used to show a 703 hint to the user (presumably related to the warning). If the 704 message is empty, it will not be printed at all, allowing one to 705 show only a hint. 706 707 """ 708 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 709 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 710 if args: 711 message = message % tuple(args) 712 if message: 713 logging.warning(message) 714 self.Log(" - WARNING: %s" % message) 715 if "hint" in kwargs: 716 self.Log(" Hint: %s" % kwargs["hint"])
717
718 - def LogInfo(self, message, *args):
719 """Log an informational message to the logs and the user. 720 721 """ 722 if args: 723 message = message % tuple(args) 724 logging.info(message) 725 self.Log(" - INFO: %s" % message)
726
727 - def GetECId(self):
728 """Returns the current execution context ID. 729 730 """ 731 if not self._ec_id: 732 raise errors.ProgrammerError("Tried to use execution context id when" 733 " not set") 734 return self._ec_id
735