Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing the logic behind the cluster operations 
 32   
 33  This module implements the logic for doing operations in the cluster. There 
 34  are two kinds of classes defined: 
 35    - logical units, which know how to deal with their specific opcode only 
 36    - the processor, which dispatches the opcodes to their logical units 
 37   
 38  """ 
 39   
 40  import sys 
 41  import logging 
 42  import random 
 43  import time 
 44  import itertools 
 45  import traceback 
 46   
 47  from ganeti import opcodes 
 48  from ganeti import opcodes_base 
 49  from ganeti import constants 
 50  from ganeti import errors 
 51  from ganeti import hooksmaster 
 52  from ganeti import cmdlib 
 53  from ganeti import locking 
 54  from ganeti import utils 
 55  from ganeti import compat 
 56  from ganeti import wconfd 
 57   
 58   
 59  sighupReceived = [False] 
 60  lusExecuting = [0] 
 61   
 62  _OP_PREFIX = "Op" 
 63  _LU_PREFIX = "LU" 
 64   
 65   
66 -class LockAcquireTimeout(Exception):
67 """Exception to report timeouts on acquiring locks. 68 69 """
70 71
72 -def _CalculateLockAttemptTimeouts():
73 """Calculate timeouts for lock attempts. 74 75 """ 76 result = [constants.LOCK_ATTEMPTS_MINWAIT] 77 running_sum = result[0] 78 79 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 80 # blocking acquire 81 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 82 timeout = (result[-1] * 1.05) ** 1.25 83 84 # Cap max timeout. This gives other jobs a chance to run even if 85 # we're still trying to get our locks, before finally moving to a 86 # blocking acquire. 87 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 88 # And also cap the lower boundary for safety 89 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 90 91 result.append(timeout) 92 running_sum += timeout 93 94 return result
95 96
97 -class LockAttemptTimeoutStrategy(object):
98 """Class with lock acquire timeout strategy. 99 100 """ 101 __slots__ = [ 102 "_timeouts", 103 "_random_fn", 104 "_time_fn", 105 ] 106 107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class. 111 112 @param _time_fn: Time function for unittests 113 @param _random_fn: Random number generator for unittests 114 115 """ 116 object.__init__(self) 117 118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 119 self._time_fn = _time_fn 120 self._random_fn = _random_fn
121
122 - def NextAttempt(self):
123 """Returns the timeout for the next attempt. 124 125 """ 126 try: 127 timeout = self._timeouts.next() 128 except StopIteration: 129 # No more timeouts, do blocking acquire 130 timeout = None 131 132 if timeout is not None: 133 # Add a small variation (-/+ 5%) to timeout. This helps in situations 134 # where two or more jobs are fighting for the same lock(s). 135 variation_range = timeout * 0.1 136 timeout += ((self._random_fn() * variation_range) - 137 (variation_range * 0.5)) 138 139 return timeout
140 141
142 -class OpExecCbBase(object): # pylint: disable=W0232
143 """Base class for OpCode execution callbacks. 144 145 """
146 - def NotifyStart(self):
147 """Called when we are about to execute the LU. 148 149 This function is called when we're about to start the lu's Exec() method, 150 that is, after we have acquired all locks. 151 152 """
153
154 - def NotifyRetry(self):
155 """Called when we are about to reset an LU to retry again. 156 157 This function is called after PrepareRetry successfully completed. 158 159 """
160
161 - def Feedback(self, *args):
162 """Sends feedback from the LU code to the end-user. 163 164 """
165
166 - def CurrentPriority(self): # pylint: disable=R0201
167 """Returns current priority or C{None}. 168 169 """ 170 return None 171
172 - def SubmitManyJobs(self, jobs):
173 """Submits jobs for processing. 174 175 See L{jqueue.JobQueue.SubmitManyJobs}. 176 177 """ 178 raise NotImplementedError
179 180
181 -def _LUNameForOpName(opname):
182 """Computes the LU name for a given OpCode name. 183 184 """ 185 assert opname.startswith(_OP_PREFIX), \ 186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 187 188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189 190
191 -def _ComputeDispatchTable():
192 """Computes the opcode-to-lu dispatch table. 193 194 """ 195 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 196 for op in opcodes.OP_MAPPING.values() 197 if op.WITH_LU)
198 199
200 -def _SetBaseOpParams(src, defcomment, dst):
201 """Copies basic opcode parameters. 202 203 @type src: L{opcodes.OpCode} 204 @param src: Source opcode 205 @type defcomment: string 206 @param defcomment: Comment to specify if not already given 207 @type dst: L{opcodes.OpCode} 208 @param dst: Destination opcode 209 210 """ 211 if hasattr(src, "debug_level"): 212 dst.debug_level = src.debug_level 213 214 if (getattr(dst, "priority", None) is None and 215 hasattr(src, "priority")): 216 dst.priority = src.priority 217 218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None): 219 dst.comment = defcomment 220 221 if hasattr(src, constants.OPCODE_REASON): 222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, [])) 223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224 225
226 -def _ProcessResult(submit_fn, op, result):
227 """Examines opcode result. 228 229 If necessary, additional processing on the result is done. 230 231 """ 232 if isinstance(result, cmdlib.ResultWithJobs): 233 # Copy basic parameters (e.g. priority) 234 map(compat.partial(_SetBaseOpParams, op, 235 "Submitted by %s" % op.OP_ID), 236 itertools.chain(*result.jobs)) 237 238 # Submit jobs 239 job_submission = submit_fn(result.jobs) 240 241 # Build dictionary 242 result = result.other 243 244 assert constants.JOB_IDS_KEY not in result, \ 245 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 246 247 result[constants.JOB_IDS_KEY] = job_submission 248 249 return result
250 251
252 -def _FailingSubmitManyJobs(_):
253 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 254 255 """ 256 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 257 " queries) can not submit jobs")
258 259
260 -def _LockList(names):
261 """If 'names' is a string, make it a single-element list. 262 263 @type names: list or string or NoneType 264 @param names: Lock names 265 @rtype: a list of strings 266 @return: if 'names' argument is an iterable, a list of it; 267 if it's a string, make it a one-element list; 268 if L{locking.ALL_SET}, L{locking.ALL_SET} 269 270 """ 271 if names == locking.ALL_SET: 272 return names 273 elif isinstance(names, basestring): 274 return [names] 275 else: 276 return list(names)
277 278
279 -class Processor(object):
280 """Object which runs OpCodes""" 281 DISPATCH_TABLE = _ComputeDispatchTable() 282
283 - def __init__(self, context, ec_id, enable_locks=True):
284 """Constructor for Processor 285 286 @type context: GanetiContext 287 @param context: global Ganeti context 288 @type ec_id: string 289 @param ec_id: execution context identifier 290 291 """ 292 self.context = context 293 self._ec_id = ec_id 294 self._cbs = None 295 self.cfg = context.GetConfig(ec_id) 296 self.rpc = context.GetRpc(self.cfg) 297 self.hmclass = hooksmaster.HooksMaster 298 self._enable_locks = enable_locks 299 self.wconfd = wconfd # Indirection to allow testing 300 self._wconfdcontext = context.GetWConfdContext(ec_id)
301
302 - def _CheckLocksEnabled(self):
303 """Checks if locking is enabled. 304 305 @raise errors.ProgrammerError: In case locking is not enabled 306 307 """ 308 if not self._enable_locks: 309 raise errors.ProgrammerError("Attempted to use disabled locks")
310
311 - def _RequestAndWait(self, request, timeout):
312 """Request locks from WConfD and wait for them to be granted. 313 314 @type request: list 315 @param request: the lock request to be sent to WConfD 316 @type timeout: float 317 @param timeout: the time to wait for the request to be granted 318 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 319 amount of time; in this case, locks still might be acquired or a request 320 pending. 321 322 """ 323 logging.debug("Trying %ss to request %s for %s", 324 timeout, request, self._wconfdcontext) 325 if self._cbs: 326 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 327 else: 328 priority = None 329 330 if priority is None: 331 priority = constants.OP_PRIO_DEFAULT 332 333 ## Expect a signal 334 if sighupReceived[0]: 335 logging.warning("Ignoring unexpected SIGHUP") 336 sighupReceived[0] = False 337 338 # Request locks 339 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 340 request) 341 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 342 343 if pending: 344 def _HasPending(): 345 if sighupReceived[0]: 346 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 347 else: 348 return True
349 350 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout) 351 352 signal = sighupReceived[0] 353 354 if pending: 355 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 356 357 if pending and signal: 358 logging.warning("Ignoring unexpected SIGHUP") 359 sighupReceived[0] = False 360 361 logging.debug("Finished trying. Pending: %s", pending) 362 if pending: 363 raise LockAcquireTimeout()
364
365 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout, 366 opportunistic_count=1, request_only=False):
367 """Acquires locks via the Ganeti lock manager. 368 369 @type level: int 370 @param level: Lock level 371 @type names: list or string 372 @param names: Lock names 373 @type shared: bool 374 @param shared: Whether the locks should be acquired in shared mode 375 @type opportunistic: bool 376 @param opportunistic: Whether to acquire opportunistically 377 @type timeout: None or float 378 @param timeout: Timeout for acquiring the locks 379 @type request_only: bool 380 @param request_only: do not acquire the locks, just return the request 381 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 382 amount of time; in this case, locks still might be acquired or a request 383 pending. 384 385 """ 386 self._CheckLocksEnabled() 387 388 if self._cbs: 389 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 390 else: 391 priority = None 392 393 if priority is None: 394 priority = constants.OP_PRIO_DEFAULT 395 396 if names == locking.ALL_SET: 397 if opportunistic: 398 expand_fns = { 399 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]), 400 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList, 401 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList, 402 locking.LEVEL_NODE: self.cfg.GetNodeList, 403 locking.LEVEL_NODE_RES: self.cfg.GetNodeList, 404 locking.LEVEL_NETWORK: self.cfg.GetNetworkList, 405 } 406 names = expand_fns[level]() 407 else: 408 names = locking.LOCKSET_NAME 409 410 names = _LockList(names) 411 412 # For locks of the same level, the lock order is lexicographic 413 names.sort() 414 415 levelname = locking.LEVEL_NAMES[level] 416 417 locks = ["%s/%s" % (levelname, lock) for lock in list(names)] 418 419 if not names: 420 logging.debug("Acquiring no locks for (%s) at level %s", 421 self._wconfdcontext, levelname) 422 return [] 423 424 if shared: 425 request = [[lock, "shared"] for lock in locks] 426 else: 427 request = [[lock, "exclusive"] for lock in locks] 428 429 if request_only: 430 logging.debug("Lock request for level %s is %s", level, request) 431 return request 432 433 self.cfg.OutDate() 434 435 if timeout is None: 436 ## Note: once we are so desperate for locks to request them 437 ## unconditionally, we no longer care about an original plan 438 ## to acquire locks opportunistically. 439 logging.info("Definitely requesting %s for %s", 440 request, self._wconfdcontext) 441 ## The only way to be sure of not getting starved is to sequentially 442 ## acquire the locks one by one (in lock order). 443 for r in request: 444 logging.debug("Definite request %s for %s", r, self._wconfdcontext) 445 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 446 [r]) 447 while True: 448 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 449 if not pending: 450 break 451 time.sleep(10.0 * random.random()) 452 453 elif opportunistic: 454 logging.debug("For %ss trying to opportunistically acquire" 455 " at least %d of %s for %s.", 456 timeout, opportunistic_count, locks, self._wconfdcontext) 457 locks = utils.SimpleRetry( 458 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion, 459 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request]) 460 logging.debug("Managed to get the following locks: %s", locks) 461 if locks == []: 462 raise LockAcquireTimeout() 463 else: 464 self._RequestAndWait(request, timeout) 465 466 return locks
467
468 - def _ExecLU(self, lu):
469 """Logical Unit execution sequence. 470 471 """ 472 write_count = self.cfg.write_count 473 lu.cfg.OutDate() 474 lu.CheckPrereq() 475 476 hm = self.BuildHooksManager(lu) 477 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 478 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 479 self.Log, None) 480 481 if getattr(lu.op, "dry_run", False): 482 # in this mode, no post-hooks are run, and the config is not 483 # written (as it might have been modified by another LU, and we 484 # shouldn't do writeout on behalf of other threads 485 self.LogInfo("dry-run mode requested, not actually executing" 486 " the operation") 487 return lu.dry_run_result 488 489 if self._cbs: 490 submit_mj_fn = self._cbs.SubmitManyJobs 491 else: 492 submit_mj_fn = _FailingSubmitManyJobs 493 494 lusExecuting[0] += 1 495 try: 496 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 497 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 498 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 499 self.Log, result) 500 finally: 501 # FIXME: This needs locks if not lu_class.REQ_BGL 502 lusExecuting[0] -= 1 503 if write_count != self.cfg.write_count: 504 hm.RunConfigUpdate() 505 506 return result
507
508 - def BuildHooksManager(self, lu):
509 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
510
511 - def _LockAndExecLU(self, lu, level, calc_timeout, pending=None):
512 """Execute a Logical Unit, with the needed locks. 513 514 This is a recursive function that starts locking the given level, and 515 proceeds up, till there are no more locks to acquire. Then it executes the 516 given LU and its opcodes. 517 518 """ 519 pending = pending or [] 520 logging.debug("Looking at locks of level %s, still need to obtain %s", 521 level, pending) 522 adding_locks = level in lu.add_locks 523 acquiring_locks = level in lu.needed_locks 524 525 if level not in locking.LEVELS: 526 if pending: 527 self._RequestAndWait(pending, calc_timeout()) 528 lu.cfg.OutDate() 529 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 530 pending = [] 531 532 logging.debug("Finished acquiring locks") 533 534 if self._cbs: 535 self._cbs.NotifyStart() 536 537 try: 538 result = self._ExecLU(lu) 539 except errors.OpPrereqError, err: 540 (_, ecode) = err.args 541 if ecode != errors.ECODE_TEMP_NORES: 542 raise 543 logging.debug("Temporarily out of resources; will retry internally") 544 try: 545 lu.PrepareRetry(self.Log) 546 if self._cbs: 547 self._cbs.NotifyRetry() 548 except errors.OpRetryNotSupportedError: 549 logging.debug("LU does not know how to retry.") 550 raise err 551 raise LockAcquireTimeout() 552 except AssertionError, err: 553 # this is a bit ugly, as we don't know from which phase 554 # (prereq, exec) this comes; but it's better than an exception 555 # with no information 556 (_, _, tb) = sys.exc_info() 557 err_info = traceback.format_tb(tb) 558 del tb 559 logging.exception("Detected AssertionError") 560 raise errors.OpExecError("Internal assertion error: please report" 561 " this as a bug.\nError message: '%s';" 562 " location:\n%s" % (str(err), err_info[-1])) 563 return result 564 565 # Determine if the acquiring is opportunistic up front 566 opportunistic = lu.opportunistic_locks[level] 567 568 dont_collate = lu.dont_collate_locks[level] 569 570 if dont_collate and pending: 571 self._RequestAndWait(pending, calc_timeout()) 572 lu.cfg.OutDate() 573 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 574 pending = [] 575 576 if adding_locks and opportunistic: 577 # We could simultaneously acquire locks opportunistically and add new 578 # ones, but that would require altering the API, and no use cases are 579 # present in the system at the moment. 580 raise NotImplementedError("Can't opportunistically acquire locks when" 581 " adding new ones") 582 583 if adding_locks and acquiring_locks and \ 584 lu.needed_locks[level] == locking.ALL_SET: 585 # It would also probably be possible to acquire all locks of a certain 586 # type while adding new locks, but there is no use case at the moment. 587 raise NotImplementedError("Can't request all locks of a certain level" 588 " and add new locks") 589 590 if adding_locks or acquiring_locks: 591 self._CheckLocksEnabled() 592 593 lu.DeclareLocks(level) 594 share = lu.share_locks[level] 595 opportunistic_count = lu.opportunistic_locks_count[level] 596 597 try: 598 if acquiring_locks: 599 needed_locks = _LockList(lu.needed_locks[level]) 600 else: 601 needed_locks = [] 602 603 if adding_locks: 604 needed_locks.extend(_LockList(lu.add_locks[level])) 605 606 timeout = calc_timeout() 607 if timeout is not None and not opportunistic: 608 pending = pending + self._AcquireLocks(level, needed_locks, share, 609 opportunistic, timeout, 610 request_only=True) 611 else: 612 if pending: 613 self._RequestAndWait(pending, calc_timeout()) 614 lu.cfg.OutDate() 615 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 616 pending = [] 617 self._AcquireLocks(level, needed_locks, share, opportunistic, 618 timeout, 619 opportunistic_count=opportunistic_count) 620 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 621 622 result = self._LockAndExecLU(lu, level + 1, calc_timeout, 623 pending=pending) 624 finally: 625 levelname = locking.LEVEL_NAMES[level] 626 logging.debug("Freeing locks at level %s for %s", 627 levelname, self._wconfdcontext) 628 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname) 629 else: 630 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending) 631 632 return result
633 634 # pylint: disable=R0201
635 - def _CheckLUResult(self, op, result):
636 """Check the LU result against the contract in the opcode. 637 638 """ 639 resultcheck_fn = op.OP_RESULT 640 if not (resultcheck_fn is None or resultcheck_fn(result)): 641 logging.error("Expected opcode result matching %s, got %s", 642 resultcheck_fn, result) 643 if not getattr(op, "dry_run", False): 644 # FIXME: LUs should still behave in dry_run mode, or 645 # alternately we should have OP_DRYRUN_RESULT; in the 646 # meantime, we simply skip the OP_RESULT check in dry-run mode 647 raise errors.OpResultError("Opcode result does not match %s: %s" % 648 (resultcheck_fn, utils.Truncate(result, 80)))
649
650 - def ExecOpCode(self, op, cbs, timeout=None):
651 """Execute an opcode. 652 653 @type op: an OpCode instance 654 @param op: the opcode to be executed 655 @type cbs: L{OpExecCbBase} 656 @param cbs: Runtime callbacks 657 @type timeout: float or None 658 @param timeout: Maximum time to acquire all locks, None for no timeout 659 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 660 amount of time 661 662 """ 663 if not isinstance(op, opcodes.OpCode): 664 raise errors.ProgrammerError("Non-opcode instance passed" 665 " to ExecOpcode (%s)" % type(op)) 666 667 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 668 if lu_class is None: 669 raise errors.OpCodeUnknown("Unknown opcode") 670 671 if timeout is None: 672 calc_timeout = lambda: None 673 else: 674 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 675 676 self._cbs = cbs 677 try: 678 if self._enable_locks: 679 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 680 # and in a shared fashion otherwise (to prevent concurrent run with 681 # an exclusive LU. 682 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 683 not lu_class.REQ_BGL, False, calc_timeout()) 684 elif lu_class.REQ_BGL: 685 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 686 " disabled" % op.OP_ID) 687 688 lu = lu_class(self, op, self.context, self.cfg, self.rpc, 689 self._wconfdcontext, self.wconfd) 690 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 691 lu.ExpandNames() 692 assert lu.needed_locks is not None, "needed_locks not set by LU" 693 694 try: 695 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 696 calc_timeout) 697 finally: 698 if self._ec_id: 699 self.cfg.DropECReservations(self._ec_id) 700 finally: 701 self.wconfd.Client().FreeLocksLevel( 702 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER]) 703 self._cbs = None 704 705 self._CheckLUResult(op, result) 706 707 return result
708
709 - def Log(self, *args):
710 """Forward call to feedback callback function. 711 712 """ 713 if self._cbs: 714 self._cbs.Feedback(*args)
715
716 - def LogStep(self, current, total, message):
717 """Log a change in LU execution progress. 718 719 """ 720 logging.debug("Step %d/%d %s", current, total, message) 721 self.Log("STEP %d/%d %s" % (current, total, message))
722
723 - def LogWarning(self, message, *args, **kwargs):
724 """Log a warning to the logs and the user. 725 726 The optional keyword argument is 'hint' and can be used to show a 727 hint to the user (presumably related to the warning). If the 728 message is empty, it will not be printed at all, allowing one to 729 show only a hint. 730 731 """ 732 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 733 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 734 if args: 735 message = message % tuple(args) 736 if message: 737 logging.warning(message) 738 self.Log(" - WARNING: %s" % message) 739 if "hint" in kwargs: 740 self.Log(" Hint: %s" % kwargs["hint"])
741
742 - def LogInfo(self, message, *args):
743 """Log an informational message to the logs and the user. 744 745 """ 746 if args: 747 message = message % tuple(args) 748 logging.info(message) 749 self.Log(" - INFO: %s" % message)
750
751 - def GetECId(self):
752 """Returns the current execution context ID. 753 754 """ 755 if not self._ec_id: 756 raise errors.ProgrammerError("Tried to use execution context id when" 757 " not set") 758 return self._ec_id
759