Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing the logic behind the cluster operations 
 32   
 33  This module implements the logic for doing operations in the cluster. There 
 34  are two kinds of classes defined: 
 35    - logical units, which know how to deal with their specific opcode only 
 36    - the processor, which dispatches the opcodes to their logical units 
 37   
 38  """ 
 39   
 40  import sys 
 41  import logging 
 42  import random 
 43  import time 
 44  import itertools 
 45  import traceback 
 46   
 47  from ganeti import opcodes 
 48  from ganeti import opcodes_base 
 49  from ganeti import constants 
 50  from ganeti import errors 
 51  from ganeti import hooksmaster 
 52  from ganeti import cmdlib 
 53  from ganeti import locking 
 54  from ganeti import utils 
 55  from ganeti import compat 
 56  from ganeti import wconfd 
 57   
 58   
 59  sighupReceived = [False] 
 60  lusExecuting = [0] 
 61   
 62  _OP_PREFIX = "Op" 
 63  _LU_PREFIX = "LU" 
 64   
 65   
66 -class LockAcquireTimeout(Exception):
67 """Exception to report timeouts on acquiring locks. 68 69 """
70 71
72 -def _CalculateLockAttemptTimeouts():
73 """Calculate timeouts for lock attempts. 74 75 """ 76 result = [constants.LOCK_ATTEMPTS_MINWAIT] 77 running_sum = result[0] 78 79 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 80 # blocking acquire 81 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 82 timeout = (result[-1] * 1.05) ** 1.25 83 84 # Cap max timeout. This gives other jobs a chance to run even if 85 # we're still trying to get our locks, before finally moving to a 86 # blocking acquire. 87 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 88 # And also cap the lower boundary for safety 89 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 90 91 result.append(timeout) 92 running_sum += timeout 93 94 return result
95 96
97 -class LockAttemptTimeoutStrategy(object):
98 """Class with lock acquire timeout strategy. 99 100 """ 101 __slots__ = [ 102 "_timeouts", 103 "_random_fn", 104 "_time_fn", 105 ] 106 107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class. 111 112 @param _time_fn: Time function for unittests 113 @param _random_fn: Random number generator for unittests 114 115 """ 116 object.__init__(self) 117 118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 119 self._time_fn = _time_fn 120 self._random_fn = _random_fn
121
122 - def NextAttempt(self):
123 """Returns the timeout for the next attempt. 124 125 """ 126 try: 127 timeout = self._timeouts.next() 128 except StopIteration: 129 # No more timeouts, do blocking acquire 130 timeout = None 131 132 if timeout is not None: 133 # Add a small variation (-/+ 5%) to timeout. This helps in situations 134 # where two or more jobs are fighting for the same lock(s). 135 variation_range = timeout * 0.1 136 timeout += ((self._random_fn() * variation_range) - 137 (variation_range * 0.5)) 138 139 return timeout
140 141
142 -class OpExecCbBase(object): # pylint: disable=W0232
143 """Base class for OpCode execution callbacks. 144 145 """
146 - def NotifyStart(self):
147 """Called when we are about to execute the LU. 148 149 This function is called when we're about to start the lu's Exec() method, 150 that is, after we have acquired all locks. 151 152 """
153
154 - def NotifyRetry(self):
155 """Called when we are about to reset an LU to retry again. 156 157 This function is called after PrepareRetry successfully completed. 158 159 """
160 161 # TODO: Cleanup calling conventions, make them explicit.
162 - def Feedback(self, *args):
163 """Sends feedback from the LU code to the end-user. 164 165 """
166
167 - def CurrentPriority(self): # pylint: disable=R0201
168 """Returns current priority or C{None}. 169 170 """ 171 return None 172
173 - def SubmitManyJobs(self, jobs):
174 """Submits jobs for processing. 175 176 See L{jqueue.JobQueue.SubmitManyJobs}. 177 178 """ 179 raise NotImplementedError
180 181
182 -def _LUNameForOpName(opname):
183 """Computes the LU name for a given OpCode name. 184 185 """ 186 assert opname.startswith(_OP_PREFIX), \ 187 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 188 189 return _LU_PREFIX + opname[len(_OP_PREFIX):]
190 191
192 -def _ComputeDispatchTable():
193 """Computes the opcode-to-lu dispatch table. 194 195 """ 196 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 197 for op in opcodes.OP_MAPPING.values() 198 if op.WITH_LU)
199 200
201 -def _SetBaseOpParams(src, defcomment, dst):
202 """Copies basic opcode parameters. 203 204 @type src: L{opcodes.OpCode} 205 @param src: Source opcode 206 @type defcomment: string 207 @param defcomment: Comment to specify if not already given 208 @type dst: L{opcodes.OpCode} 209 @param dst: Destination opcode 210 211 """ 212 if hasattr(src, "debug_level"): 213 dst.debug_level = src.debug_level 214 215 if (getattr(dst, "priority", None) is None and 216 hasattr(src, "priority")): 217 dst.priority = src.priority 218 219 if not getattr(dst, opcodes_base.COMMENT_ATTR, None): 220 dst.comment = defcomment 221 222 if hasattr(src, constants.OPCODE_REASON): 223 dst.reason = list(getattr(dst, constants.OPCODE_REASON, [])) 224 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
225 226
227 -def _ProcessResult(submit_fn, op, result):
228 """Examines opcode result. 229 230 If necessary, additional processing on the result is done. 231 232 """ 233 if isinstance(result, cmdlib.ResultWithJobs): 234 # Copy basic parameters (e.g. priority) 235 map(compat.partial(_SetBaseOpParams, op, 236 "Submitted by %s" % op.OP_ID), 237 itertools.chain(*result.jobs)) 238 239 # Submit jobs 240 job_submission = submit_fn(result.jobs) 241 242 # Build dictionary 243 result = result.other 244 245 assert constants.JOB_IDS_KEY not in result, \ 246 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 247 248 result[constants.JOB_IDS_KEY] = job_submission 249 250 return result
251 252
253 -def _FailingSubmitManyJobs(_):
254 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 255 256 """ 257 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 258 " queries) can not submit jobs")
259 260
261 -def _LockList(names):
262 """If 'names' is a string, make it a single-element list. 263 264 @type names: list or string or NoneType 265 @param names: Lock names 266 @rtype: a list of strings 267 @return: if 'names' argument is an iterable, a list of it; 268 if it's a string, make it a one-element list; 269 if L{locking.ALL_SET}, L{locking.ALL_SET} 270 271 """ 272 if names == locking.ALL_SET: 273 return names 274 elif isinstance(names, basestring): 275 return [names] 276 else: 277 return list(names)
278 279
280 -class Processor(object):
281 """Object which runs OpCodes""" 282 DISPATCH_TABLE = _ComputeDispatchTable() 283
284 - def __init__(self, context, ec_id, enable_locks=True):
285 """Constructor for Processor 286 287 @type context: GanetiContext 288 @param context: global Ganeti context 289 @type ec_id: string 290 @param ec_id: execution context identifier 291 292 """ 293 self.context = context 294 self._ec_id = ec_id 295 self._cbs = None 296 self.cfg = context.GetConfig(ec_id) 297 self.rpc = context.GetRpc(self.cfg) 298 self.hmclass = hooksmaster.HooksMaster 299 self._enable_locks = enable_locks 300 self.wconfd = wconfd # Indirection to allow testing 301 self._wconfdcontext = context.GetWConfdContext(ec_id)
302
303 - def _CheckLocksEnabled(self):
304 """Checks if locking is enabled. 305 306 @raise errors.ProgrammerError: In case locking is not enabled 307 308 """ 309 if not self._enable_locks: 310 raise errors.ProgrammerError("Attempted to use disabled locks")
311
312 - def _RequestAndWait(self, request, timeout):
313 """Request locks from WConfD and wait for them to be granted. 314 315 @type request: list 316 @param request: the lock request to be sent to WConfD 317 @type timeout: float 318 @param timeout: the time to wait for the request to be granted 319 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 320 amount of time; in this case, locks still might be acquired or a request 321 pending. 322 323 """ 324 logging.debug("Trying %ss to request %s for %s", 325 timeout, request, self._wconfdcontext) 326 if self._cbs: 327 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 328 else: 329 priority = None 330 331 if priority is None: 332 priority = constants.OP_PRIO_DEFAULT 333 334 ## Expect a signal 335 if sighupReceived[0]: 336 logging.warning("Ignoring unexpected SIGHUP") 337 sighupReceived[0] = False 338 339 # Request locks 340 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 341 request) 342 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 343 344 if pending: 345 def _HasPending(): 346 if sighupReceived[0]: 347 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 348 else: 349 return True
350 351 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout) 352 353 signal = sighupReceived[0] 354 355 if pending: 356 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 357 358 if pending and signal: 359 logging.warning("Ignoring unexpected SIGHUP") 360 sighupReceived[0] = False 361 362 logging.debug("Finished trying. Pending: %s", pending) 363 if pending: 364 raise LockAcquireTimeout()
365
366 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout, 367 opportunistic_count=1, request_only=False):
368 """Acquires locks via the Ganeti lock manager. 369 370 @type level: int 371 @param level: Lock level 372 @type names: list or string 373 @param names: Lock names 374 @type shared: bool 375 @param shared: Whether the locks should be acquired in shared mode 376 @type opportunistic: bool 377 @param opportunistic: Whether to acquire opportunistically 378 @type timeout: None or float 379 @param timeout: Timeout for acquiring the locks 380 @type request_only: bool 381 @param request_only: do not acquire the locks, just return the request 382 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 383 amount of time; in this case, locks still might be acquired or a request 384 pending. 385 386 """ 387 self._CheckLocksEnabled() 388 389 if self._cbs: 390 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 391 else: 392 priority = None 393 394 if priority is None: 395 priority = constants.OP_PRIO_DEFAULT 396 397 if names == locking.ALL_SET: 398 if opportunistic: 399 expand_fns = { 400 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]), 401 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList, 402 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList, 403 locking.LEVEL_NODE: self.cfg.GetNodeList, 404 locking.LEVEL_NODE_RES: self.cfg.GetNodeList, 405 locking.LEVEL_NETWORK: self.cfg.GetNetworkList, 406 } 407 names = expand_fns[level]() 408 else: 409 names = locking.LOCKSET_NAME 410 411 names = _LockList(names) 412 413 # For locks of the same level, the lock order is lexicographic 414 names.sort() 415 416 levelname = locking.LEVEL_NAMES[level] 417 418 locks = ["%s/%s" % (levelname, lock) for lock in list(names)] 419 420 if not names: 421 logging.debug("Acquiring no locks for (%s) at level %s", 422 self._wconfdcontext, levelname) 423 return [] 424 425 if shared: 426 request = [[lock, "shared"] for lock in locks] 427 else: 428 request = [[lock, "exclusive"] for lock in locks] 429 430 if request_only: 431 logging.debug("Lock request for level %s is %s", level, request) 432 return request 433 434 self.cfg.OutDate() 435 436 if timeout is None: 437 ## Note: once we are so desperate for locks to request them 438 ## unconditionally, we no longer care about an original plan 439 ## to acquire locks opportunistically. 440 logging.info("Definitely requesting %s for %s", 441 request, self._wconfdcontext) 442 ## The only way to be sure of not getting starved is to sequentially 443 ## acquire the locks one by one (in lock order). 444 for r in request: 445 logging.debug("Definite request %s for %s", r, self._wconfdcontext) 446 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 447 [r]) 448 while True: 449 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 450 if not pending: 451 break 452 time.sleep(10.0 * random.random()) 453 454 elif opportunistic: 455 logging.debug("For %ss trying to opportunistically acquire" 456 " at least %d of %s for %s.", 457 timeout, opportunistic_count, locks, self._wconfdcontext) 458 locks = utils.SimpleRetry( 459 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion, 460 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request]) 461 logging.debug("Managed to get the following locks: %s", locks) 462 if locks == []: 463 raise LockAcquireTimeout() 464 else: 465 self._RequestAndWait(request, timeout) 466 467 return locks
468
469 - def _ExecLU(self, lu):
470 """Logical Unit execution sequence. 471 472 """ 473 write_count = self.cfg.write_count 474 lu.cfg.OutDate() 475 lu.CheckPrereq() 476 477 hm = self.BuildHooksManager(lu) 478 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 479 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 480 self.Log, None) 481 482 if getattr(lu.op, "dry_run", False): 483 # in this mode, no post-hooks are run, and the config is not 484 # written (as it might have been modified by another LU, and we 485 # shouldn't do writeout on behalf of other threads 486 self.LogInfo("dry-run mode requested, not actually executing" 487 " the operation") 488 return lu.dry_run_result 489 490 if self._cbs: 491 submit_mj_fn = self._cbs.SubmitManyJobs 492 else: 493 submit_mj_fn = _FailingSubmitManyJobs 494 495 lusExecuting[0] += 1 496 try: 497 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 498 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 499 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 500 self.Log, result) 501 finally: 502 # FIXME: This needs locks if not lu_class.REQ_BGL 503 lusExecuting[0] -= 1 504 if write_count != self.cfg.write_count: 505 hm.RunConfigUpdate() 506 507 return result
508
509 - def BuildHooksManager(self, lu):
510 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
511
512 - def _LockAndExecLU(self, lu, level, calc_timeout, pending=None):
513 """Execute a Logical Unit, with the needed locks. 514 515 This is a recursive function that starts locking the given level, and 516 proceeds up, till there are no more locks to acquire. Then it executes the 517 given LU and its opcodes. 518 519 """ 520 pending = pending or [] 521 logging.debug("Looking at locks of level %s, still need to obtain %s", 522 level, pending) 523 adding_locks = level in lu.add_locks 524 acquiring_locks = level in lu.needed_locks 525 526 if level not in locking.LEVELS: 527 if pending: 528 self._RequestAndWait(pending, calc_timeout()) 529 lu.cfg.OutDate() 530 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 531 pending = [] 532 533 logging.debug("Finished acquiring locks") 534 535 if self._cbs: 536 self._cbs.NotifyStart() 537 538 try: 539 result = self._ExecLU(lu) 540 except errors.OpPrereqError, err: 541 if len(err.args) < 2 or err.args[1] != errors.ECODE_TEMP_NORES: 542 raise 543 544 logging.debug("Temporarily out of resources; will retry internally") 545 try: 546 lu.PrepareRetry(self.Log) 547 if self._cbs: 548 self._cbs.NotifyRetry() 549 except errors.OpRetryNotSupportedError: 550 logging.debug("LU does not know how to retry.") 551 raise err 552 raise LockAcquireTimeout() 553 except AssertionError, err: 554 # this is a bit ugly, as we don't know from which phase 555 # (prereq, exec) this comes; but it's better than an exception 556 # with no information 557 (_, _, tb) = sys.exc_info() 558 err_info = traceback.format_tb(tb) 559 del tb 560 logging.exception("Detected AssertionError") 561 raise errors.OpExecError("Internal assertion error: please report" 562 " this as a bug.\nError message: '%s';" 563 " location:\n%s" % (str(err), err_info[-1])) 564 return result 565 566 # Determine if the acquiring is opportunistic up front 567 opportunistic = lu.opportunistic_locks[level] 568 569 dont_collate = lu.dont_collate_locks[level] 570 571 if dont_collate and pending: 572 self._RequestAndWait(pending, calc_timeout()) 573 lu.cfg.OutDate() 574 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 575 pending = [] 576 577 if adding_locks and opportunistic: 578 # We could simultaneously acquire locks opportunistically and add new 579 # ones, but that would require altering the API, and no use cases are 580 # present in the system at the moment. 581 raise NotImplementedError("Can't opportunistically acquire locks when" 582 " adding new ones") 583 584 if adding_locks and acquiring_locks and \ 585 lu.needed_locks[level] == locking.ALL_SET: 586 # It would also probably be possible to acquire all locks of a certain 587 # type while adding new locks, but there is no use case at the moment. 588 raise NotImplementedError("Can't request all locks of a certain level" 589 " and add new locks") 590 591 if adding_locks or acquiring_locks: 592 self._CheckLocksEnabled() 593 594 lu.DeclareLocks(level) 595 share = lu.share_locks[level] 596 opportunistic_count = lu.opportunistic_locks_count[level] 597 598 try: 599 if acquiring_locks: 600 needed_locks = _LockList(lu.needed_locks[level]) 601 else: 602 needed_locks = [] 603 604 if adding_locks: 605 needed_locks.extend(_LockList(lu.add_locks[level])) 606 607 timeout = calc_timeout() 608 if timeout is not None and not opportunistic: 609 pending = pending + self._AcquireLocks(level, needed_locks, share, 610 opportunistic, timeout, 611 request_only=True) 612 else: 613 if pending: 614 self._RequestAndWait(pending, calc_timeout()) 615 lu.cfg.OutDate() 616 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 617 pending = [] 618 self._AcquireLocks(level, needed_locks, share, opportunistic, 619 timeout, 620 opportunistic_count=opportunistic_count) 621 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 622 623 result = self._LockAndExecLU(lu, level + 1, calc_timeout, 624 pending=pending) 625 finally: 626 levelname = locking.LEVEL_NAMES[level] 627 logging.debug("Freeing locks at level %s for %s", 628 levelname, self._wconfdcontext) 629 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname) 630 else: 631 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending) 632 633 return result
634 635 # pylint: disable=R0201
636 - def _CheckLUResult(self, op, result):
637 """Check the LU result against the contract in the opcode. 638 639 """ 640 resultcheck_fn = op.OP_RESULT 641 if not (resultcheck_fn is None or resultcheck_fn(result)): 642 logging.error("Expected opcode result matching %s, got %s", 643 resultcheck_fn, result) 644 if not getattr(op, "dry_run", False): 645 # FIXME: LUs should still behave in dry_run mode, or 646 # alternately we should have OP_DRYRUN_RESULT; in the 647 # meantime, we simply skip the OP_RESULT check in dry-run mode 648 raise errors.OpResultError("Opcode result does not match %s: %s" % 649 (resultcheck_fn, utils.Truncate(result, 80)))
650
651 - def ExecOpCode(self, op, cbs, timeout=None):
652 """Execute an opcode. 653 654 @type op: an OpCode instance 655 @param op: the opcode to be executed 656 @type cbs: L{OpExecCbBase} 657 @param cbs: Runtime callbacks 658 @type timeout: float or None 659 @param timeout: Maximum time to acquire all locks, None for no timeout 660 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 661 amount of time 662 663 """ 664 if not isinstance(op, opcodes.OpCode): 665 raise errors.ProgrammerError("Non-opcode instance passed" 666 " to ExecOpcode (%s)" % type(op)) 667 668 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 669 if lu_class is None: 670 raise errors.OpCodeUnknown("Unknown opcode") 671 672 if timeout is None: 673 calc_timeout = lambda: None 674 else: 675 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 676 677 self._cbs = cbs 678 try: 679 if self._enable_locks: 680 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 681 # and in a shared fashion otherwise (to prevent concurrent run with 682 # an exclusive LU. 683 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 684 not lu_class.REQ_BGL, False, calc_timeout()) 685 elif lu_class.REQ_BGL: 686 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 687 " disabled" % op.OP_ID) 688 689 lu = lu_class(self, op, self.context, self.cfg, self.rpc, 690 self._wconfdcontext, self.wconfd) 691 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 692 lu.ExpandNames() 693 assert lu.needed_locks is not None, "needed_locks not set by LU" 694 695 try: 696 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 697 calc_timeout) 698 finally: 699 if self._ec_id: 700 self.cfg.DropECReservations(self._ec_id) 701 finally: 702 self.wconfd.Client().FreeLocksLevel( 703 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER]) 704 self._cbs = None 705 706 self._CheckLUResult(op, result) 707 708 return result
709
710 - def Log(self, *args):
711 """Forward call to feedback callback function. 712 713 """ 714 if self._cbs: 715 self._cbs.Feedback(*args)
716
717 - def LogStep(self, current, total, message):
718 """Log a change in LU execution progress. 719 720 """ 721 logging.debug("Step %d/%d %s", current, total, message) 722 self.Log("STEP %d/%d %s" % (current, total, message))
723
724 - def LogWarning(self, message, *args, **kwargs):
725 """Log a warning to the logs and the user. 726 727 The optional keyword argument is 'hint' and can be used to show a 728 hint to the user (presumably related to the warning). If the 729 message is empty, it will not be printed at all, allowing one to 730 show only a hint. 731 732 """ 733 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 734 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 735 if args: 736 message = message % tuple(args) 737 if message: 738 logging.warning(message) 739 self.Log(" - WARNING: %s" % message) 740 if "hint" in kwargs: 741 self.Log(" Hint: %s" % kwargs["hint"])
742
743 - def LogInfo(self, message, *args):
744 """Log an informational message to the logs and the user. 745 746 """ 747 if args: 748 message = message % tuple(args) 749 logging.info(message) 750 self.Log(" - INFO: %s" % message)
751
752 - def GetECId(self):
753 """Returns the current execution context ID. 754 755 """ 756 if not self._ec_id: 757 raise errors.ProgrammerError("Tried to use execution context id when" 758 " not set") 759 return self._ec_id
760