Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing the logic behind the cluster operations 
 32   
 33  This module implements the logic for doing operations in the cluster. There 
 34  are two kinds of classes defined: 
 35    - logical units, which know how to deal with their specific opcode only 
 36    - the processor, which dispatches the opcodes to their logical units 
 37   
 38  """ 
 39   
 40  import sys 
 41  import logging 
 42  import random 
 43  import time 
 44  import itertools 
 45  import traceback 
 46   
 47  from ganeti import opcodes 
 48  from ganeti import opcodes_base 
 49  from ganeti import constants 
 50  from ganeti import errors 
 51  from ganeti import hooksmaster 
 52  from ganeti import cmdlib 
 53  from ganeti import locking 
 54  from ganeti import utils 
 55  from ganeti import compat 
 56  from ganeti import wconfd 
 57   
 58   
 59  sighupReceived = [False] 
 60  lusExecuting = [0] 
 61   
 62  _OP_PREFIX = "Op" 
 63  _LU_PREFIX = "LU" 
 64   
 65   
66 -class LockAcquireTimeout(Exception):
67 """Exception to report timeouts on acquiring locks. 68 69 """
70 71
72 -def _CalculateLockAttemptTimeouts():
73 """Calculate timeouts for lock attempts. 74 75 """ 76 result = [constants.LOCK_ATTEMPTS_MINWAIT] 77 running_sum = result[0] 78 79 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 80 # blocking acquire 81 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 82 timeout = (result[-1] * 1.05) ** 1.25 83 84 # Cap max timeout. This gives other jobs a chance to run even if 85 # we're still trying to get our locks, before finally moving to a 86 # blocking acquire. 87 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 88 # And also cap the lower boundary for safety 89 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 90 91 result.append(timeout) 92 running_sum += timeout 93 94 return result
95 96
97 -class LockAttemptTimeoutStrategy(object):
98 """Class with lock acquire timeout strategy. 99 100 """ 101 __slots__ = [ 102 "_timeouts", 103 "_random_fn", 104 "_time_fn", 105 ] 106 107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class. 111 112 @param _time_fn: Time function for unittests 113 @param _random_fn: Random number generator for unittests 114 115 """ 116 object.__init__(self) 117 118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 119 self._time_fn = _time_fn 120 self._random_fn = _random_fn
121
122 - def NextAttempt(self):
123 """Returns the timeout for the next attempt. 124 125 """ 126 try: 127 timeout = self._timeouts.next() 128 except StopIteration: 129 # No more timeouts, do blocking acquire 130 timeout = None 131 132 if timeout is not None: 133 # Add a small variation (-/+ 5%) to timeout. This helps in situations 134 # where two or more jobs are fighting for the same lock(s). 135 variation_range = timeout * 0.1 136 timeout += ((self._random_fn() * variation_range) - 137 (variation_range * 0.5)) 138 139 return timeout
140 141
142 -class OpExecCbBase(object): # pylint: disable=W0232
143 """Base class for OpCode execution callbacks. 144 145 """
146 - def NotifyStart(self):
147 """Called when we are about to execute the LU. 148 149 This function is called when we're about to start the lu's Exec() method, 150 that is, after we have acquired all locks. 151 152 """
153
154 - def NotifyRetry(self):
155 """Called when we are about to reset an LU to retry again. 156 157 This function is called after PrepareRetry successfully completed. 158 159 """
160
161 - def Feedback(self, *args):
162 """Sends feedback from the LU code to the end-user. 163 164 """
165
166 - def CurrentPriority(self): # pylint: disable=R0201
167 """Returns current priority or C{None}. 168 169 """ 170 return None 171
172 - def SubmitManyJobs(self, jobs):
173 """Submits jobs for processing. 174 175 See L{jqueue.JobQueue.SubmitManyJobs}. 176 177 """ 178 raise NotImplementedError
179 180
181 -def _LUNameForOpName(opname):
182 """Computes the LU name for a given OpCode name. 183 184 """ 185 assert opname.startswith(_OP_PREFIX), \ 186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 187 188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189 190
191 -def _ComputeDispatchTable():
192 """Computes the opcode-to-lu dispatch table. 193 194 """ 195 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 196 for op in opcodes.OP_MAPPING.values() 197 if op.WITH_LU)
198 199
200 -def _SetBaseOpParams(src, defcomment, dst):
201 """Copies basic opcode parameters. 202 203 @type src: L{opcodes.OpCode} 204 @param src: Source opcode 205 @type defcomment: string 206 @param defcomment: Comment to specify if not already given 207 @type dst: L{opcodes.OpCode} 208 @param dst: Destination opcode 209 210 """ 211 if hasattr(src, "debug_level"): 212 dst.debug_level = src.debug_level 213 214 if (getattr(dst, "priority", None) is None and 215 hasattr(src, "priority")): 216 dst.priority = src.priority 217 218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None): 219 dst.comment = defcomment 220 221 if hasattr(src, constants.OPCODE_REASON): 222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, [])) 223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224 225
226 -def _ProcessResult(submit_fn, op, result):
227 """Examines opcode result. 228 229 If necessary, additional processing on the result is done. 230 231 """ 232 if isinstance(result, cmdlib.ResultWithJobs): 233 # Copy basic parameters (e.g. priority) 234 map(compat.partial(_SetBaseOpParams, op, 235 "Submitted by %s" % op.OP_ID), 236 itertools.chain(*result.jobs)) 237 238 # Submit jobs 239 job_submission = submit_fn(result.jobs) 240 241 # Build dictionary 242 result = result.other 243 244 assert constants.JOB_IDS_KEY not in result, \ 245 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 246 247 result[constants.JOB_IDS_KEY] = job_submission 248 249 return result
250 251
252 -def _FailingSubmitManyJobs(_):
253 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 254 255 """ 256 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 257 " queries) can not submit jobs")
258 259
260 -def _LockList(names):
261 """If 'names' is a string, make it a single-element list. 262 263 @type names: list or string or NoneType 264 @param names: Lock names 265 @rtype: a list of strings 266 @return: if 'names' argument is an iterable, a list of it; 267 if it's a string, make it a one-element list; 268 if L{locking.ALL_SET}, L{locking.ALL_SET} 269 270 """ 271 if names == locking.ALL_SET: 272 return names 273 elif isinstance(names, basestring): 274 return [names] 275 else: 276 return list(names)
277 278
279 -def _CheckSecretParameters(op):
280 """Check if secret parameters are expected, but missing. 281 282 """ 283 if hasattr(op, "osparams_secret") and op.osparams_secret: 284 for secret_param in op.osparams_secret: 285 if op.osparams_secret[secret_param].Get() == constants.REDACTED: 286 raise errors.OpPrereqError("Please re-submit secret parameters to job.", 287 errors.ECODE_INVAL)
288 289
290 -class Processor(object):
291 """Object which runs OpCodes""" 292 DISPATCH_TABLE = _ComputeDispatchTable() 293
294 - def __init__(self, context, ec_id, enable_locks=True):
295 """Constructor for Processor 296 297 @type context: GanetiContext 298 @param context: global Ganeti context 299 @type ec_id: string 300 @param ec_id: execution context identifier 301 302 """ 303 self._ec_id = ec_id 304 self._cbs = None 305 self.cfg = context.GetConfig(ec_id) 306 self.rpc = context.GetRpc(self.cfg) 307 self.hmclass = hooksmaster.HooksMaster 308 self._enable_locks = enable_locks 309 self.wconfd = wconfd # Indirection to allow testing 310 self._wconfdcontext = context.GetWConfdContext(ec_id)
311
312 - def _CheckLocksEnabled(self):
313 """Checks if locking is enabled. 314 315 @raise errors.ProgrammerError: In case locking is not enabled 316 317 """ 318 if not self._enable_locks: 319 raise errors.ProgrammerError("Attempted to use disabled locks")
320
321 - def _RequestAndWait(self, request, timeout):
322 """Request locks from WConfD and wait for them to be granted. 323 324 @type request: list 325 @param request: the lock request to be sent to WConfD 326 @type timeout: float 327 @param timeout: the time to wait for the request to be granted 328 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 329 amount of time; in this case, locks still might be acquired or a request 330 pending. 331 332 """ 333 logging.debug("Trying %ss to request %s for %s", 334 timeout, request, self._wconfdcontext) 335 if self._cbs: 336 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 337 else: 338 priority = None 339 340 if priority is None: 341 priority = constants.OP_PRIO_DEFAULT 342 343 ## Expect a signal 344 if sighupReceived[0]: 345 logging.warning("Ignoring unexpected SIGHUP") 346 sighupReceived[0] = False 347 348 # Request locks 349 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 350 request) 351 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 352 353 if pending: 354 def _HasPending(): 355 if sighupReceived[0]: 356 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 357 else: 358 return True
359 360 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout) 361 362 signal = sighupReceived[0] 363 364 if pending: 365 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 366 367 if pending and signal: 368 logging.warning("Ignoring unexpected SIGHUP") 369 sighupReceived[0] = False 370 371 logging.debug("Finished trying. Pending: %s", pending) 372 if pending: 373 raise LockAcquireTimeout()
374
375 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout, 376 opportunistic_count=1, request_only=False):
377 """Acquires locks via the Ganeti lock manager. 378 379 @type level: int 380 @param level: Lock level 381 @type names: list or string 382 @param names: Lock names 383 @type shared: bool 384 @param shared: Whether the locks should be acquired in shared mode 385 @type opportunistic: bool 386 @param opportunistic: Whether to acquire opportunistically 387 @type timeout: None or float 388 @param timeout: Timeout for acquiring the locks 389 @type request_only: bool 390 @param request_only: do not acquire the locks, just return the request 391 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 392 amount of time; in this case, locks still might be acquired or a request 393 pending. 394 395 """ 396 self._CheckLocksEnabled() 397 398 if self._cbs: 399 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 400 else: 401 priority = None 402 403 if priority is None: 404 priority = constants.OP_PRIO_DEFAULT 405 406 if names == locking.ALL_SET: 407 if opportunistic: 408 expand_fns = { 409 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]), 410 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList, 411 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList, 412 locking.LEVEL_NODE: self.cfg.GetNodeList, 413 locking.LEVEL_NODE_RES: self.cfg.GetNodeList, 414 locking.LEVEL_NETWORK: self.cfg.GetNetworkList, 415 } 416 names = expand_fns[level]() 417 else: 418 names = locking.LOCKSET_NAME 419 420 names = _LockList(names) 421 422 # For locks of the same level, the lock order is lexicographic 423 names.sort() 424 425 levelname = locking.LEVEL_NAMES[level] 426 427 locks = ["%s/%s" % (levelname, lock) for lock in list(names)] 428 429 if not names: 430 logging.debug("Acquiring no locks for (%s) at level %s", 431 self._wconfdcontext, levelname) 432 return [] 433 434 if shared: 435 request = [[lock, "shared"] for lock in locks] 436 else: 437 request = [[lock, "exclusive"] for lock in locks] 438 439 if request_only: 440 logging.debug("Lock request for level %s is %s", level, request) 441 return request 442 443 self.cfg.OutDate() 444 445 if timeout is None: 446 ## Note: once we are so desperate for locks to request them 447 ## unconditionally, we no longer care about an original plan 448 ## to acquire locks opportunistically. 449 logging.info("Definitely requesting %s for %s", 450 request, self._wconfdcontext) 451 ## The only way to be sure of not getting starved is to sequentially 452 ## acquire the locks one by one (in lock order). 453 for r in request: 454 logging.debug("Definite request %s for %s", r, self._wconfdcontext) 455 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 456 [r]) 457 while True: 458 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 459 if not pending: 460 break 461 time.sleep(10.0 * random.random()) 462 463 elif opportunistic: 464 logging.debug("For %ss trying to opportunistically acquire" 465 " at least %d of %s for %s.", 466 timeout, opportunistic_count, locks, self._wconfdcontext) 467 locks = utils.SimpleRetry( 468 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion, 469 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request]) 470 logging.debug("Managed to get the following locks: %s", locks) 471 if locks == []: 472 raise LockAcquireTimeout() 473 else: 474 self._RequestAndWait(request, timeout) 475 476 return locks
477
478 - def _ExecLU(self, lu):
479 """Logical Unit execution sequence. 480 481 """ 482 write_count = self.cfg.write_count 483 lu.cfg.OutDate() 484 lu.CheckPrereq() 485 486 hm = self.BuildHooksManager(lu) 487 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 488 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 489 self.Log, None) 490 491 if getattr(lu.op, "dry_run", False): 492 # in this mode, no post-hooks are run, and the config is not 493 # written (as it might have been modified by another LU, and we 494 # shouldn't do writeout on behalf of other threads 495 self.LogInfo("dry-run mode requested, not actually executing" 496 " the operation") 497 return lu.dry_run_result 498 499 if self._cbs: 500 submit_mj_fn = self._cbs.SubmitManyJobs 501 else: 502 submit_mj_fn = _FailingSubmitManyJobs 503 504 lusExecuting[0] += 1 505 try: 506 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 507 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 508 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 509 self.Log, result) 510 finally: 511 # FIXME: This needs locks if not lu_class.REQ_BGL 512 lusExecuting[0] -= 1 513 if write_count != self.cfg.write_count: 514 hm.RunConfigUpdate() 515 516 return result
517
518 - def BuildHooksManager(self, lu):
519 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
520
521 - def _LockAndExecLU(self, lu, level, calc_timeout, pending=None):
522 """Execute a Logical Unit, with the needed locks. 523 524 This is a recursive function that starts locking the given level, and 525 proceeds up, till there are no more locks to acquire. Then it executes the 526 given LU and its opcodes. 527 528 """ 529 pending = pending or [] 530 logging.debug("Looking at locks of level %s, still need to obtain %s", 531 level, pending) 532 adding_locks = level in lu.add_locks 533 acquiring_locks = level in lu.needed_locks 534 535 if level not in locking.LEVELS: 536 if pending: 537 self._RequestAndWait(pending, calc_timeout()) 538 lu.cfg.OutDate() 539 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 540 pending = [] 541 542 logging.debug("Finished acquiring locks") 543 544 if self._cbs: 545 self._cbs.NotifyStart() 546 547 try: 548 result = self._ExecLU(lu) 549 except errors.OpPrereqError, err: 550 (_, ecode) = err.args 551 if ecode != errors.ECODE_TEMP_NORES: 552 raise 553 logging.debug("Temporarily out of resources; will retry internally") 554 try: 555 lu.PrepareRetry(self.Log) 556 if self._cbs: 557 self._cbs.NotifyRetry() 558 except errors.OpRetryNotSupportedError: 559 logging.debug("LU does not know how to retry.") 560 raise err 561 raise LockAcquireTimeout() 562 except AssertionError, err: 563 # this is a bit ugly, as we don't know from which phase 564 # (prereq, exec) this comes; but it's better than an exception 565 # with no information 566 (_, _, tb) = sys.exc_info() 567 err_info = traceback.format_tb(tb) 568 del tb 569 logging.exception("Detected AssertionError") 570 raise errors.OpExecError("Internal assertion error: please report" 571 " this as a bug.\nError message: '%s';" 572 " location:\n%s" % (str(err), err_info[-1])) 573 return result 574 575 # Determine if the acquiring is opportunistic up front 576 opportunistic = lu.opportunistic_locks[level] 577 578 dont_collate = lu.dont_collate_locks[level] 579 580 if dont_collate and pending: 581 self._RequestAndWait(pending, calc_timeout()) 582 lu.cfg.OutDate() 583 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 584 pending = [] 585 586 if adding_locks and opportunistic: 587 # We could simultaneously acquire locks opportunistically and add new 588 # ones, but that would require altering the API, and no use cases are 589 # present in the system at the moment. 590 raise NotImplementedError("Can't opportunistically acquire locks when" 591 " adding new ones") 592 593 if adding_locks and acquiring_locks and \ 594 lu.needed_locks[level] == locking.ALL_SET: 595 # It would also probably be possible to acquire all locks of a certain 596 # type while adding new locks, but there is no use case at the moment. 597 raise NotImplementedError("Can't request all locks of a certain level" 598 " and add new locks") 599 600 if adding_locks or acquiring_locks: 601 self._CheckLocksEnabled() 602 603 lu.DeclareLocks(level) 604 share = lu.share_locks[level] 605 opportunistic_count = lu.opportunistic_locks_count[level] 606 607 try: 608 if acquiring_locks: 609 needed_locks = _LockList(lu.needed_locks[level]) 610 else: 611 needed_locks = [] 612 613 if adding_locks: 614 needed_locks.extend(_LockList(lu.add_locks[level])) 615 616 timeout = calc_timeout() 617 if timeout is not None and not opportunistic: 618 pending = pending + self._AcquireLocks(level, needed_locks, share, 619 opportunistic, timeout, 620 request_only=True) 621 else: 622 if pending: 623 self._RequestAndWait(pending, calc_timeout()) 624 lu.cfg.OutDate() 625 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 626 pending = [] 627 self._AcquireLocks(level, needed_locks, share, opportunistic, 628 timeout, 629 opportunistic_count=opportunistic_count) 630 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 631 632 result = self._LockAndExecLU(lu, level + 1, calc_timeout, 633 pending=pending) 634 finally: 635 levelname = locking.LEVEL_NAMES[level] 636 logging.debug("Freeing locks at level %s for %s", 637 levelname, self._wconfdcontext) 638 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname) 639 else: 640 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending) 641 642 return result
643 644 # pylint: disable=R0201
645 - def _CheckLUResult(self, op, result):
646 """Check the LU result against the contract in the opcode. 647 648 """ 649 resultcheck_fn = op.OP_RESULT 650 if not (resultcheck_fn is None or resultcheck_fn(result)): 651 logging.error("Expected opcode result matching %s, got %s", 652 resultcheck_fn, result) 653 if not getattr(op, "dry_run", False): 654 # FIXME: LUs should still behave in dry_run mode, or 655 # alternately we should have OP_DRYRUN_RESULT; in the 656 # meantime, we simply skip the OP_RESULT check in dry-run mode 657 raise errors.OpResultError("Opcode result does not match %s: %s" % 658 (resultcheck_fn, utils.Truncate(result, 80)))
659
660 - def ExecOpCode(self, op, cbs, timeout=None):
661 """Execute an opcode. 662 663 @type op: an OpCode instance 664 @param op: the opcode to be executed 665 @type cbs: L{OpExecCbBase} 666 @param cbs: Runtime callbacks 667 @type timeout: float or None 668 @param timeout: Maximum time to acquire all locks, None for no timeout 669 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 670 amount of time 671 672 """ 673 if not isinstance(op, opcodes.OpCode): 674 raise errors.ProgrammerError("Non-opcode instance passed" 675 " to ExecOpcode (%s)" % type(op)) 676 677 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 678 if lu_class is None: 679 raise errors.OpCodeUnknown("Unknown opcode") 680 681 if timeout is None: 682 calc_timeout = lambda: None 683 else: 684 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 685 686 self._cbs = cbs 687 try: 688 if self._enable_locks: 689 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 690 # and in a shared fashion otherwise (to prevent concurrent run with 691 # an exclusive LU. 692 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 693 not lu_class.REQ_BGL, False, calc_timeout()) 694 elif lu_class.REQ_BGL: 695 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 696 " disabled" % op.OP_ID) 697 698 lu = lu_class(self, op, self.cfg, self.rpc, 699 self._wconfdcontext, self.wconfd) 700 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 701 _CheckSecretParameters(op) 702 lu.ExpandNames() 703 assert lu.needed_locks is not None, "needed_locks not set by LU" 704 705 try: 706 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 707 calc_timeout) 708 finally: 709 if self._ec_id: 710 self.cfg.DropECReservations(self._ec_id) 711 finally: 712 self.wconfd.Client().FreeLocksLevel( 713 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER]) 714 self._cbs = None 715 716 self._CheckLUResult(op, result) 717 718 return result
719
720 - def Log(self, *args):
721 """Forward call to feedback callback function. 722 723 """ 724 if self._cbs: 725 self._cbs.Feedback(*args)
726
727 - def LogStep(self, current, total, message):
728 """Log a change in LU execution progress. 729 730 """ 731 logging.debug("Step %d/%d %s", current, total, message) 732 self.Log("STEP %d/%d %s" % (current, total, message))
733
734 - def LogWarning(self, message, *args, **kwargs):
735 """Log a warning to the logs and the user. 736 737 The optional keyword argument is 'hint' and can be used to show a 738 hint to the user (presumably related to the warning). If the 739 message is empty, it will not be printed at all, allowing one to 740 show only a hint. 741 742 """ 743 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 744 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 745 if args: 746 message = message % tuple(args) 747 if message: 748 logging.warning(message) 749 self.Log(" - WARNING: %s" % message) 750 if "hint" in kwargs: 751 self.Log(" Hint: %s" % kwargs["hint"])
752
753 - def LogInfo(self, message, *args):
754 """Log an informational message to the logs and the user. 755 756 """ 757 if args: 758 message = message % tuple(args) 759 logging.info(message) 760 self.Log(" - INFO: %s" % message)
761
762 - def GetECId(self):
763 """Returns the current execution context ID. 764 765 """ 766 if not self._ec_id: 767 raise errors.ProgrammerError("Tried to use execution context id when" 768 " not set") 769 return self._ec_id
770