Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing the logic behind the cluster operations 
 32   
 33  This module implements the logic for doing operations in the cluster. There 
 34  are two kinds of classes defined: 
 35    - logical units, which know how to deal with their specific opcode only 
 36    - the processor, which dispatches the opcodes to their logical units 
 37   
 38  """ 
 39   
 40  import sys 
 41  import logging 
 42  import random 
 43  import time 
 44  import itertools 
 45  import traceback 
 46   
 47  from ganeti import opcodes 
 48  from ganeti import opcodes_base 
 49  from ganeti import constants 
 50  from ganeti import errors 
 51  from ganeti import hooksmaster 
 52  from ganeti import cmdlib 
 53  from ganeti import locking 
 54  from ganeti import utils 
 55  from ganeti import compat 
 56  from ganeti import wconfd 
 57   
 58   
 59  sighupReceived = [False] 
 60  lusExecuting = [0] 
 61   
 62  _OP_PREFIX = "Op" 
 63  _LU_PREFIX = "LU" 
 64   
 65   
66 -class LockAcquireTimeout(Exception):
67 """Exception to report timeouts on acquiring locks. 68 69 """
70 71
72 -def _CalculateLockAttemptTimeouts():
73 """Calculate timeouts for lock attempts. 74 75 """ 76 result = [constants.LOCK_ATTEMPTS_MINWAIT] 77 running_sum = result[0] 78 79 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 80 # blocking acquire 81 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 82 timeout = (result[-1] * 1.05) ** 1.25 83 84 # Cap max timeout. This gives other jobs a chance to run even if 85 # we're still trying to get our locks, before finally moving to a 86 # blocking acquire. 87 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 88 # And also cap the lower boundary for safety 89 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 90 91 result.append(timeout) 92 running_sum += timeout 93 94 return result
95 96
97 -class LockAttemptTimeoutStrategy(object):
98 """Class with lock acquire timeout strategy. 99 100 """ 101 __slots__ = [ 102 "_timeouts", 103 "_random_fn", 104 "_time_fn", 105 ] 106 107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class. 111 112 @param _time_fn: Time function for unittests 113 @param _random_fn: Random number generator for unittests 114 115 """ 116 object.__init__(self) 117 118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 119 self._time_fn = _time_fn 120 self._random_fn = _random_fn
121
122 - def NextAttempt(self):
123 """Returns the timeout for the next attempt. 124 125 """ 126 try: 127 timeout = self._timeouts.next() 128 except StopIteration: 129 # No more timeouts, do blocking acquire 130 timeout = None 131 132 if timeout is not None: 133 # Add a small variation (-/+ 5%) to timeout. This helps in situations 134 # where two or more jobs are fighting for the same lock(s). 135 variation_range = timeout * 0.1 136 timeout += ((self._random_fn() * variation_range) - 137 (variation_range * 0.5)) 138 139 return timeout
140 141
142 -class OpExecCbBase(object): # pylint: disable=W0232
143 """Base class for OpCode execution callbacks. 144 145 """
146 - def NotifyStart(self):
147 """Called when we are about to execute the LU. 148 149 This function is called when we're about to start the lu's Exec() method, 150 that is, after we have acquired all locks. 151 152 """
153
154 - def NotifyRetry(self):
155 """Called when we are about to reset an LU to retry again. 156 157 This function is called after PrepareRetry successfully completed. 158 159 """
160
161 - def Feedback(self, *args):
162 """Sends feedback from the LU code to the end-user. 163 164 """
165
166 - def CurrentPriority(self): # pylint: disable=R0201
167 """Returns current priority or C{None}. 168 169 """ 170 return None 171
172 - def SubmitManyJobs(self, jobs):
173 """Submits jobs for processing. 174 175 See L{jqueue.JobQueue.SubmitManyJobs}. 176 177 """ 178 raise NotImplementedError
179 180
181 -def _LUNameForOpName(opname):
182 """Computes the LU name for a given OpCode name. 183 184 """ 185 assert opname.startswith(_OP_PREFIX), \ 186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 187 188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189 190
191 -def _ComputeDispatchTable():
192 """Computes the opcode-to-lu dispatch table. 193 194 """ 195 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 196 for op in opcodes.OP_MAPPING.values() 197 if op.WITH_LU)
198 199
200 -def _SetBaseOpParams(src, defcomment, dst):
201 """Copies basic opcode parameters. 202 203 @type src: L{opcodes.OpCode} 204 @param src: Source opcode 205 @type defcomment: string 206 @param defcomment: Comment to specify if not already given 207 @type dst: L{opcodes.OpCode} 208 @param dst: Destination opcode 209 210 """ 211 if hasattr(src, "debug_level"): 212 dst.debug_level = src.debug_level 213 214 if (getattr(dst, "priority", None) is None and 215 hasattr(src, "priority")): 216 dst.priority = src.priority 217 218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None): 219 dst.comment = defcomment 220 221 if hasattr(src, constants.OPCODE_REASON): 222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, [])) 223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224 225
226 -def _ProcessResult(submit_fn, op, result):
227 """Examines opcode result. 228 229 If necessary, additional processing on the result is done. 230 231 """ 232 if isinstance(result, cmdlib.ResultWithJobs): 233 # Copy basic parameters (e.g. priority) 234 map(compat.partial(_SetBaseOpParams, op, 235 "Submitted by %s" % op.OP_ID), 236 itertools.chain(*result.jobs)) 237 238 # Submit jobs 239 job_submission = submit_fn(result.jobs) 240 241 # Build dictionary 242 result = result.other 243 244 assert constants.JOB_IDS_KEY not in result, \ 245 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 246 247 result[constants.JOB_IDS_KEY] = job_submission 248 249 return result
250 251
252 -def _FailingSubmitManyJobs(_):
253 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 254 255 """ 256 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 257 " queries) can not submit jobs")
258 259
260 -def _LockList(names):
261 """If 'names' is a string, make it a single-element list. 262 263 @type names: list or string or NoneType 264 @param names: Lock names 265 @rtype: a list of strings 266 @return: if 'names' argument is an iterable, a list of it; 267 if it's a string, make it a one-element list; 268 if L{locking.ALL_SET}, L{locking.ALL_SET} 269 270 """ 271 if names == locking.ALL_SET: 272 return names 273 elif isinstance(names, basestring): 274 return [names] 275 else: 276 return list(names)
277 278
279 -def _CheckSecretParameters(op):
280 """Check if secret parameters are expected, but missing. 281 282 """ 283 if hasattr(op, "osparams_secret") and op.osparams_secret: 284 for secret_param in op.osparams_secret: 285 if op.osparams_secret[secret_param].Get() == constants.REDACTED: 286 raise errors.OpPrereqError("Please re-submit secret parameters to job.", 287 errors.ECODE_INVAL)
288 289
290 -class Processor(object):
291 """Object which runs OpCodes""" 292 DISPATCH_TABLE = _ComputeDispatchTable() 293
294 - def __init__(self, context, ec_id, enable_locks=True):
295 """Constructor for Processor 296 297 @type context: GanetiContext 298 @param context: global Ganeti context 299 @type ec_id: string 300 @param ec_id: execution context identifier 301 302 """ 303 self._ec_id = ec_id 304 self._cbs = None 305 self.cfg = context.GetConfig(ec_id) 306 self.rpc = context.GetRpc(self.cfg) 307 self.hmclass = hooksmaster.HooksMaster 308 self._enable_locks = enable_locks 309 self.wconfd = wconfd # Indirection to allow testing 310 self._wconfdcontext = context.GetWConfdContext(ec_id)
311
312 - def _CheckLocksEnabled(self):
313 """Checks if locking is enabled. 314 315 @raise errors.ProgrammerError: In case locking is not enabled 316 317 """ 318 if not self._enable_locks: 319 raise errors.ProgrammerError("Attempted to use disabled locks")
320
321 - def _RequestAndWait(self, request, timeout):
322 """Request locks from WConfD and wait for them to be granted. 323 324 @type request: list 325 @param request: the lock request to be sent to WConfD 326 @type timeout: float 327 @param timeout: the time to wait for the request to be granted 328 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 329 amount of time; in this case, locks still might be acquired or a request 330 pending. 331 332 """ 333 logging.debug("Trying %ss to request %s for %s", 334 timeout, request, self._wconfdcontext) 335 if self._cbs: 336 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 337 else: 338 priority = None 339 340 if priority is None: 341 priority = constants.OP_PRIO_DEFAULT 342 343 ## Expect a signal 344 if sighupReceived[0]: 345 logging.warning("Ignoring unexpected SIGHUP") 346 sighupReceived[0] = False 347 348 # Request locks 349 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 350 request) 351 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 352 353 if pending: 354 def _HasPending(): 355 if sighupReceived[0]: 356 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 357 else: 358 return True
359 360 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout) 361 362 signal = sighupReceived[0] 363 364 if pending: 365 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 366 367 if pending and signal: 368 logging.warning("Ignoring unexpected SIGHUP") 369 sighupReceived[0] = False 370 371 logging.debug("Finished trying. Pending: %s", pending) 372 if pending: 373 raise LockAcquireTimeout()
374
375 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout, 376 opportunistic_count=1, request_only=False):
377 """Acquires locks via the Ganeti lock manager. 378 379 @type level: int 380 @param level: Lock level 381 @type names: list or string 382 @param names: Lock names 383 @type shared: bool 384 @param shared: Whether the locks should be acquired in shared mode 385 @type opportunistic: bool 386 @param opportunistic: Whether to acquire opportunistically 387 @type timeout: None or float 388 @param timeout: Timeout for acquiring the locks 389 @type request_only: bool 390 @param request_only: do not acquire the locks, just return the request 391 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 392 amount of time; in this case, locks still might be acquired or a request 393 pending. 394 395 """ 396 self._CheckLocksEnabled() 397 398 if self._cbs: 399 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 400 else: 401 priority = None 402 403 if priority is None: 404 priority = constants.OP_PRIO_DEFAULT 405 406 if names == locking.ALL_SET: 407 if opportunistic: 408 expand_fns = { 409 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]), 410 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList, 411 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList, 412 locking.LEVEL_NODE: self.cfg.GetNodeList, 413 locking.LEVEL_NODE_RES: self.cfg.GetNodeList, 414 locking.LEVEL_NETWORK: self.cfg.GetNetworkList, 415 } 416 names = expand_fns[level]() 417 else: 418 names = locking.LOCKSET_NAME 419 420 names = _LockList(names) 421 422 # For locks of the same level, the lock order is lexicographic 423 names.sort() 424 425 levelname = locking.LEVEL_NAMES[level] 426 427 locks = ["%s/%s" % (levelname, lock) for lock in list(names)] 428 429 if not names: 430 logging.debug("Acquiring no locks for (%s) at level %s", 431 self._wconfdcontext, levelname) 432 return [] 433 434 if shared: 435 request = [[lock, "shared"] for lock in locks] 436 else: 437 request = [[lock, "exclusive"] for lock in locks] 438 439 if request_only: 440 logging.debug("Lock request for level %s is %s", level, request) 441 return request 442 443 self.cfg.OutDate() 444 445 if timeout is None: 446 ## Note: once we are so desperate for locks to request them 447 ## unconditionally, we no longer care about an original plan 448 ## to acquire locks opportunistically. 449 logging.info("Definitely requesting %s for %s", 450 request, self._wconfdcontext) 451 ## The only way to be sure of not getting starved is to sequentially 452 ## acquire the locks one by one (in lock order). 453 for r in request: 454 logging.debug("Definite request %s for %s", r, self._wconfdcontext) 455 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 456 [r]) 457 while True: 458 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 459 if not pending: 460 break 461 time.sleep(10.0 * random.random()) 462 463 elif opportunistic: 464 logging.debug("For %ss trying to opportunistically acquire" 465 " at least %d of %s for %s.", 466 timeout, opportunistic_count, locks, self._wconfdcontext) 467 locks = utils.SimpleRetry( 468 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion, 469 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request]) 470 logging.debug("Managed to get the following locks: %s", locks) 471 if locks == []: 472 raise LockAcquireTimeout() 473 else: 474 self._RequestAndWait(request, timeout) 475 476 return locks
477
478 - def _ExecLU(self, lu):
479 """Logical Unit execution sequence. 480 481 """ 482 write_count = self.cfg.write_count 483 lu.cfg.OutDate() 484 lu.CheckPrereq() 485 486 hm = self.BuildHooksManager(lu) 487 try: 488 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 489 except Exception, err: # pylint: disable=W0703 490 # This gives the LU a chance of cleaning up in case of an hooks failure. 491 # The type of exception is deliberately broad to be able to react to 492 # any kind of failure. 493 lu.HooksAbortCallBack(constants.HOOKS_PHASE_PRE, self.Log, err) 494 # We re-raise the exception to not alter the behavior of LU handling 495 # otherwise. 496 raise err 497 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 498 self.Log, None) 499 500 if getattr(lu.op, "dry_run", False): 501 # in this mode, no post-hooks are run, and the config is not 502 # written (as it might have been modified by another LU, and we 503 # shouldn't do writeout on behalf of other threads 504 self.LogInfo("dry-run mode requested, not actually executing" 505 " the operation") 506 return lu.dry_run_result 507 508 if self._cbs: 509 submit_mj_fn = self._cbs.SubmitManyJobs 510 else: 511 submit_mj_fn = _FailingSubmitManyJobs 512 513 lusExecuting[0] += 1 514 try: 515 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 516 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 517 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 518 self.Log, result) 519 finally: 520 # FIXME: This needs locks if not lu_class.REQ_BGL 521 lusExecuting[0] -= 1 522 if write_count != self.cfg.write_count: 523 hm.RunConfigUpdate() 524 525 return result
526
527 - def BuildHooksManager(self, lu):
528 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
529
530 - def _LockAndExecLU(self, lu, level, calc_timeout, pending=None):
531 """Execute a Logical Unit, with the needed locks. 532 533 This is a recursive function that starts locking the given level, and 534 proceeds up, till there are no more locks to acquire. Then it executes the 535 given LU and its opcodes. 536 537 """ 538 pending = pending or [] 539 logging.debug("Looking at locks of level %s, still need to obtain %s", 540 level, pending) 541 adding_locks = level in lu.add_locks 542 acquiring_locks = level in lu.needed_locks 543 544 if level not in locking.LEVELS: 545 if pending: 546 self._RequestAndWait(pending, calc_timeout()) 547 lu.cfg.OutDate() 548 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 549 pending = [] 550 551 logging.debug("Finished acquiring locks") 552 553 if self._cbs: 554 self._cbs.NotifyStart() 555 556 try: 557 result = self._ExecLU(lu) 558 except errors.OpPrereqError, err: 559 (_, ecode) = err.args 560 if ecode != errors.ECODE_TEMP_NORES: 561 raise 562 logging.debug("Temporarily out of resources; will retry internally") 563 try: 564 lu.PrepareRetry(self.Log) 565 if self._cbs: 566 self._cbs.NotifyRetry() 567 except errors.OpRetryNotSupportedError: 568 logging.debug("LU does not know how to retry.") 569 raise err 570 raise LockAcquireTimeout() 571 except AssertionError, err: 572 # this is a bit ugly, as we don't know from which phase 573 # (prereq, exec) this comes; but it's better than an exception 574 # with no information 575 (_, _, tb) = sys.exc_info() 576 err_info = traceback.format_tb(tb) 577 del tb 578 logging.exception("Detected AssertionError") 579 raise errors.OpExecError("Internal assertion error: please report" 580 " this as a bug.\nError message: '%s';" 581 " location:\n%s" % (str(err), err_info[-1])) 582 return result 583 584 # Determine if the acquiring is opportunistic up front 585 opportunistic = lu.opportunistic_locks[level] 586 587 dont_collate = lu.dont_collate_locks[level] 588 589 if dont_collate and pending: 590 self._RequestAndWait(pending, calc_timeout()) 591 lu.cfg.OutDate() 592 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 593 pending = [] 594 595 if adding_locks and opportunistic: 596 # We could simultaneously acquire locks opportunistically and add new 597 # ones, but that would require altering the API, and no use cases are 598 # present in the system at the moment. 599 raise NotImplementedError("Can't opportunistically acquire locks when" 600 " adding new ones") 601 602 if adding_locks and acquiring_locks and \ 603 lu.needed_locks[level] == locking.ALL_SET: 604 # It would also probably be possible to acquire all locks of a certain 605 # type while adding new locks, but there is no use case at the moment. 606 raise NotImplementedError("Can't request all locks of a certain level" 607 " and add new locks") 608 609 if adding_locks or acquiring_locks: 610 self._CheckLocksEnabled() 611 612 lu.DeclareLocks(level) 613 share = lu.share_locks[level] 614 opportunistic_count = lu.opportunistic_locks_count[level] 615 616 try: 617 if acquiring_locks: 618 needed_locks = _LockList(lu.needed_locks[level]) 619 else: 620 needed_locks = [] 621 622 if adding_locks: 623 needed_locks.extend(_LockList(lu.add_locks[level])) 624 625 timeout = calc_timeout() 626 if timeout is not None and not opportunistic: 627 pending = pending + self._AcquireLocks(level, needed_locks, share, 628 opportunistic, timeout, 629 request_only=True) 630 else: 631 if pending: 632 self._RequestAndWait(pending, calc_timeout()) 633 lu.cfg.OutDate() 634 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 635 pending = [] 636 self._AcquireLocks(level, needed_locks, share, opportunistic, 637 timeout, 638 opportunistic_count=opportunistic_count) 639 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 640 641 result = self._LockAndExecLU(lu, level + 1, calc_timeout, 642 pending=pending) 643 finally: 644 levelname = locking.LEVEL_NAMES[level] 645 logging.debug("Freeing locks at level %s for %s", 646 levelname, self._wconfdcontext) 647 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname) 648 else: 649 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending) 650 651 return result
652 653 # pylint: disable=R0201
654 - def _CheckLUResult(self, op, result):
655 """Check the LU result against the contract in the opcode. 656 657 """ 658 resultcheck_fn = op.OP_RESULT 659 if not (resultcheck_fn is None or resultcheck_fn(result)): 660 logging.error("Expected opcode result matching %s, got %s", 661 resultcheck_fn, result) 662 if not getattr(op, "dry_run", False): 663 # FIXME: LUs should still behave in dry_run mode, or 664 # alternately we should have OP_DRYRUN_RESULT; in the 665 # meantime, we simply skip the OP_RESULT check in dry-run mode 666 raise errors.OpResultError("Opcode result does not match %s: %s" % 667 (resultcheck_fn, utils.Truncate(result, 80)))
668
669 - def ExecOpCode(self, op, cbs, timeout=None):
670 """Execute an opcode. 671 672 @type op: an OpCode instance 673 @param op: the opcode to be executed 674 @type cbs: L{OpExecCbBase} 675 @param cbs: Runtime callbacks 676 @type timeout: float or None 677 @param timeout: Maximum time to acquire all locks, None for no timeout 678 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 679 amount of time 680 681 """ 682 if not isinstance(op, opcodes.OpCode): 683 raise errors.ProgrammerError("Non-opcode instance passed" 684 " to ExecOpcode (%s)" % type(op)) 685 686 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 687 if lu_class is None: 688 raise errors.OpCodeUnknown("Unknown opcode") 689 690 if timeout is None: 691 calc_timeout = lambda: None 692 else: 693 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 694 695 self._cbs = cbs 696 try: 697 if self._enable_locks: 698 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 699 # and in a shared fashion otherwise (to prevent concurrent run with 700 # an exclusive LU. 701 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 702 not lu_class.REQ_BGL, False, calc_timeout()) 703 elif lu_class.REQ_BGL: 704 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 705 " disabled" % op.OP_ID) 706 707 lu = lu_class(self, op, self.cfg, self.rpc, 708 self._wconfdcontext, self.wconfd) 709 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 710 _CheckSecretParameters(op) 711 lu.ExpandNames() 712 assert lu.needed_locks is not None, "needed_locks not set by LU" 713 714 try: 715 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 716 calc_timeout) 717 finally: 718 if self._ec_id: 719 self.cfg.DropECReservations(self._ec_id) 720 finally: 721 self.wconfd.Client().FreeLocksLevel( 722 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER]) 723 self._cbs = None 724 725 self._CheckLUResult(op, result) 726 727 return result
728
729 - def Log(self, *args):
730 """Forward call to feedback callback function. 731 732 """ 733 if self._cbs: 734 self._cbs.Feedback(*args)
735
736 - def LogStep(self, current, total, message):
737 """Log a change in LU execution progress. 738 739 """ 740 logging.debug("Step %d/%d %s", current, total, message) 741 self.Log("STEP %d/%d %s" % (current, total, message))
742
743 - def LogWarning(self, message, *args, **kwargs):
744 """Log a warning to the logs and the user. 745 746 The optional keyword argument is 'hint' and can be used to show a 747 hint to the user (presumably related to the warning). If the 748 message is empty, it will not be printed at all, allowing one to 749 show only a hint. 750 751 """ 752 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 753 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 754 if args: 755 message = message % tuple(args) 756 if message: 757 logging.warning(message) 758 self.Log(" - WARNING: %s" % message) 759 if "hint" in kwargs: 760 self.Log(" Hint: %s" % kwargs["hint"])
761
762 - def LogInfo(self, message, *args):
763 """Log an informational message to the logs and the user. 764 765 """ 766 if args: 767 message = message % tuple(args) 768 logging.info(message) 769 self.Log(" - INFO: %s" % message)
770
771 - def GetECId(self):
772 """Returns the current execution context ID. 773 774 """ 775 if not self._ec_id: 776 raise errors.ProgrammerError("Tried to use execution context id when" 777 " not set") 778 return self._ec_id
779