Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing the logic behind the cluster operations 
 32   
 33  This module implements the logic for doing operations in the cluster. There 
 34  are two kinds of classes defined: 
 35    - logical units, which know how to deal with their specific opcode only 
 36    - the processor, which dispatches the opcodes to their logical units 
 37   
 38  """ 
 39   
 40  import sys 
 41  import logging 
 42  import random 
 43  import time 
 44  import itertools 
 45  import traceback 
 46   
 47  from ganeti import opcodes 
 48  from ganeti import opcodes_base 
 49  from ganeti import constants 
 50  from ganeti import errors 
 51  from ganeti import hooksmaster 
 52  from ganeti import cmdlib 
 53  from ganeti import locking 
 54  from ganeti import utils 
 55  from ganeti import compat 
 56  from ganeti import wconfd 
 57   
 58   
 59  sighupReceived = [False] 
 60  lusExecuting = [0] 
 61   
 62  _OP_PREFIX = "Op" 
 63  _LU_PREFIX = "LU" 
 64   
 65   
66 -class LockAcquireTimeout(Exception):
67 """Exception to report timeouts on acquiring locks. 68 69 """
70 71
72 -def _CalculateLockAttemptTimeouts():
73 """Calculate timeouts for lock attempts. 74 75 """ 76 result = [constants.LOCK_ATTEMPTS_MINWAIT] 77 running_sum = result[0] 78 79 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 80 # blocking acquire 81 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 82 timeout = (result[-1] * 1.05) ** 1.25 83 84 # Cap max timeout. This gives other jobs a chance to run even if 85 # we're still trying to get our locks, before finally moving to a 86 # blocking acquire. 87 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 88 # And also cap the lower boundary for safety 89 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 90 91 result.append(timeout) 92 running_sum += timeout 93 94 return result
95 96
97 -class LockAttemptTimeoutStrategy(object):
98 """Class with lock acquire timeout strategy. 99 100 """ 101 __slots__ = [ 102 "_timeouts", 103 "_random_fn", 104 "_time_fn", 105 ] 106 107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class. 111 112 @param _time_fn: Time function for unittests 113 @param _random_fn: Random number generator for unittests 114 115 """ 116 object.__init__(self) 117 118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 119 self._time_fn = _time_fn 120 self._random_fn = _random_fn
121
122 - def NextAttempt(self):
123 """Returns the timeout for the next attempt. 124 125 """ 126 try: 127 timeout = self._timeouts.next() 128 except StopIteration: 129 # No more timeouts, do blocking acquire 130 timeout = None 131 132 if timeout is not None: 133 # Add a small variation (-/+ 5%) to timeout. This helps in situations 134 # where two or more jobs are fighting for the same lock(s). 135 variation_range = timeout * 0.1 136 timeout += ((self._random_fn() * variation_range) - 137 (variation_range * 0.5)) 138 139 return timeout
140 141
142 -class OpExecCbBase(object): # pylint: disable=W0232
143 """Base class for OpCode execution callbacks. 144 145 """
146 - def NotifyStart(self):
147 """Called when we are about to execute the LU. 148 149 This function is called when we're about to start the lu's Exec() method, 150 that is, after we have acquired all locks. 151 152 """
153
154 - def NotifyRetry(self):
155 """Called when we are about to reset an LU to retry again. 156 157 This function is called after PrepareRetry successfully completed. 158 159 """
160
161 - def Feedback(self, *args):
162 """Sends feedback from the LU code to the end-user. 163 164 """
165
166 - def CurrentPriority(self): # pylint: disable=R0201
167 """Returns current priority or C{None}. 168 169 """ 170 return None 171
172 - def SubmitManyJobs(self, jobs):
173 """Submits jobs for processing. 174 175 See L{jqueue.JobQueue.SubmitManyJobs}. 176 177 """ 178 raise NotImplementedError
179 180
181 -def _LUNameForOpName(opname):
182 """Computes the LU name for a given OpCode name. 183 184 """ 185 assert opname.startswith(_OP_PREFIX), \ 186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 187 188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189 190
191 -def _ComputeDispatchTable():
192 """Computes the opcode-to-lu dispatch table. 193 194 """ 195 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 196 for op in opcodes.OP_MAPPING.values() 197 if op.WITH_LU)
198 199
200 -def _SetBaseOpParams(src, defcomment, dst):
201 """Copies basic opcode parameters. 202 203 @type src: L{opcodes.OpCode} 204 @param src: Source opcode 205 @type defcomment: string 206 @param defcomment: Comment to specify if not already given 207 @type dst: L{opcodes.OpCode} 208 @param dst: Destination opcode 209 210 """ 211 if hasattr(src, "debug_level"): 212 dst.debug_level = src.debug_level 213 214 if (getattr(dst, "priority", None) is None and 215 hasattr(src, "priority")): 216 dst.priority = src.priority 217 218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None): 219 dst.comment = defcomment 220 221 if hasattr(src, constants.OPCODE_REASON): 222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, [])) 223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224 225
226 -def _ProcessResult(submit_fn, op, result):
227 """Examines opcode result. 228 229 If necessary, additional processing on the result is done. 230 231 """ 232 if isinstance(result, cmdlib.ResultWithJobs): 233 # Copy basic parameters (e.g. priority) 234 map(compat.partial(_SetBaseOpParams, op, 235 "Submitted by %s" % op.OP_ID), 236 itertools.chain(*result.jobs)) 237 238 # Submit jobs 239 job_submission = submit_fn(result.jobs) 240 241 # Build dictionary 242 result = result.other 243 244 assert constants.JOB_IDS_KEY not in result, \ 245 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 246 247 result[constants.JOB_IDS_KEY] = job_submission 248 249 return result
250 251
252 -def _FailingSubmitManyJobs(_):
253 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 254 255 """ 256 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 257 " queries) can not submit jobs")
258 259
260 -def _LockList(names):
261 """If 'names' is a string, make it a single-element list. 262 263 @type names: list or string or NoneType 264 @param names: Lock names 265 @rtype: a list of strings 266 @return: if 'names' argument is an iterable, a list of it; 267 if it's a string, make it a one-element list; 268 if L{locking.ALL_SET}, L{locking.ALL_SET} 269 270 """ 271 if names == locking.ALL_SET: 272 return names 273 elif isinstance(names, basestring): 274 return [names] 275 else: 276 return list(names)
277 278
279 -def _CheckSecretParameters(op):
280 """Check if secret parameters are expected, but missing. 281 282 """ 283 if hasattr(op, "osparams_secret") and op.osparams_secret: 284 for secret_param in op.osparams_secret: 285 if op.osparams_secret[secret_param].Get() == constants.REDACTED: 286 raise errors.OpPrereqError("Please re-submit secret parameters to job.", 287 errors.ECODE_INVAL)
288 289
290 -class Processor(object):
291 """Object which runs OpCodes""" 292 DISPATCH_TABLE = _ComputeDispatchTable() 293
294 - def __init__(self, context, ec_id, enable_locks=True):
295 """Constructor for Processor 296 297 @type context: GanetiContext 298 @param context: global Ganeti context 299 @type ec_id: string 300 @param ec_id: execution context identifier 301 302 """ 303 self._ec_id = ec_id 304 self._cbs = None 305 self.cfg = context.GetConfig(ec_id) 306 self.rpc = context.GetRpc(self.cfg) 307 self.hmclass = hooksmaster.HooksMaster 308 self._hm = None 309 self._enable_locks = enable_locks 310 self.wconfd = wconfd # Indirection to allow testing 311 self._wconfdcontext = context.GetWConfdContext(ec_id)
312
313 - def _CheckLocksEnabled(self):
314 """Checks if locking is enabled. 315 316 @raise errors.ProgrammerError: In case locking is not enabled 317 318 """ 319 if not self._enable_locks: 320 raise errors.ProgrammerError("Attempted to use disabled locks")
321
322 - def _RequestAndWait(self, request, timeout):
323 """Request locks from WConfD and wait for them to be granted. 324 325 @type request: list 326 @param request: the lock request to be sent to WConfD 327 @type timeout: float 328 @param timeout: the time to wait for the request to be granted 329 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 330 amount of time; in this case, locks still might be acquired or a request 331 pending. 332 333 """ 334 logging.debug("Trying %ss to request %s for %s", 335 timeout, request, self._wconfdcontext) 336 if self._cbs: 337 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 338 else: 339 priority = None 340 341 if priority is None: 342 priority = constants.OP_PRIO_DEFAULT 343 344 ## Expect a signal 345 if sighupReceived[0]: 346 logging.warning("Ignoring unexpected SIGHUP") 347 sighupReceived[0] = False 348 349 # Request locks 350 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 351 request) 352 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 353 354 if pending: 355 def _HasPending(): 356 if sighupReceived[0]: 357 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 358 else: 359 return True
360 361 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout) 362 363 signal = sighupReceived[0] 364 365 if pending: 366 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 367 368 if pending and signal: 369 logging.warning("Ignoring unexpected SIGHUP") 370 sighupReceived[0] = False 371 372 logging.debug("Finished trying. Pending: %s", pending) 373 if pending: 374 raise LockAcquireTimeout()
375
376 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout, 377 opportunistic_count=1, request_only=False):
378 """Acquires locks via the Ganeti lock manager. 379 380 @type level: int 381 @param level: Lock level 382 @type names: list or string 383 @param names: Lock names 384 @type shared: bool 385 @param shared: Whether the locks should be acquired in shared mode 386 @type opportunistic: bool 387 @param opportunistic: Whether to acquire opportunistically 388 @type timeout: None or float 389 @param timeout: Timeout for acquiring the locks 390 @type request_only: bool 391 @param request_only: do not acquire the locks, just return the request 392 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 393 amount of time; in this case, locks still might be acquired or a request 394 pending. 395 396 """ 397 self._CheckLocksEnabled() 398 399 if self._cbs: 400 priority = self._cbs.CurrentPriority() # pylint: disable=W0612 401 else: 402 priority = None 403 404 if priority is None: 405 priority = constants.OP_PRIO_DEFAULT 406 407 if names == locking.ALL_SET: 408 if opportunistic: 409 expand_fns = { 410 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]), 411 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList, 412 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList, 413 locking.LEVEL_NODE: self.cfg.GetNodeList, 414 locking.LEVEL_NODE_RES: self.cfg.GetNodeList, 415 locking.LEVEL_NETWORK: self.cfg.GetNetworkList, 416 } 417 names = expand_fns[level]() 418 else: 419 names = locking.LOCKSET_NAME 420 421 names = _LockList(names) 422 423 # For locks of the same level, the lock order is lexicographic 424 names.sort() 425 426 levelname = locking.LEVEL_NAMES[level] 427 428 locks = ["%s/%s" % (levelname, lock) for lock in list(names)] 429 430 if not names: 431 logging.debug("Acquiring no locks for (%s) at level %s", 432 self._wconfdcontext, levelname) 433 return [] 434 435 if shared: 436 request = [[lock, "shared"] for lock in locks] 437 else: 438 request = [[lock, "exclusive"] for lock in locks] 439 440 if request_only: 441 logging.debug("Lock request for level %s is %s", level, request) 442 return request 443 444 self.cfg.OutDate() 445 446 if timeout is None: 447 ## Note: once we are so desperate for locks to request them 448 ## unconditionally, we no longer care about an original plan 449 ## to acquire locks opportunistically. 450 logging.info("Definitely requesting %s for %s", 451 request, self._wconfdcontext) 452 ## The only way to be sure of not getting starved is to sequentially 453 ## acquire the locks one by one (in lock order). 454 for r in request: 455 logging.debug("Definite request %s for %s", r, self._wconfdcontext) 456 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority, 457 [r]) 458 while True: 459 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext) 460 if not pending: 461 break 462 time.sleep(10.0 * random.random()) 463 464 elif opportunistic: 465 logging.debug("For %ss trying to opportunistically acquire" 466 " at least %d of %s for %s.", 467 timeout, opportunistic_count, locks, self._wconfdcontext) 468 locks = utils.SimpleRetry( 469 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion, 470 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request]) 471 logging.debug("Managed to get the following locks: %s", locks) 472 if locks == []: 473 raise LockAcquireTimeout() 474 else: 475 self._RequestAndWait(request, timeout) 476 477 return locks
478
479 - def _ExecLU(self, lu):
480 """Logical Unit execution sequence. 481 482 """ 483 write_count = self.cfg.write_count 484 lu.cfg.OutDate() 485 lu.CheckPrereq() 486 487 self._hm = self.BuildHooksManager(lu) 488 try: 489 # Run hooks twice: first for the global hooks, then for the usual hooks. 490 self._hm.RunPhase(constants.HOOKS_PHASE_PRE, is_global=True) 491 h_results = self._hm.RunPhase(constants.HOOKS_PHASE_PRE) 492 except Exception, err: # pylint: disable=W0703 493 # This gives the LU a chance of cleaning up in case of an hooks failure. 494 # The type of exception is deliberately broad to be able to react to 495 # any kind of failure. 496 lu.HooksAbortCallBack(constants.HOOKS_PHASE_PRE, self.Log, err) 497 # We re-raise the exception to not alter the behavior of LU handling 498 # otherwise. 499 raise err 500 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 501 self.Log, None) 502 503 if getattr(lu.op, "dry_run", False): 504 # in this mode, no post-hooks are run, and the config is not 505 # written (as it might have been modified by another LU, and we 506 # shouldn't do writeout on behalf of other threads 507 self.LogInfo("dry-run mode requested, not actually executing" 508 " the operation") 509 return lu.dry_run_result 510 511 if self._cbs: 512 submit_mj_fn = self._cbs.SubmitManyJobs 513 else: 514 submit_mj_fn = _FailingSubmitManyJobs 515 516 lusExecuting[0] += 1 517 try: 518 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 519 h_results = self._hm.RunPhase(constants.HOOKS_PHASE_POST) 520 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 521 self.Log, result) 522 finally: 523 # FIXME: This needs locks if not lu_class.REQ_BGL 524 lusExecuting[0] -= 1 525 if write_count != self.cfg.write_count: 526 self._hm.RunConfigUpdate() 527 528 return result
529
530 - def BuildHooksManager(self, lu):
531 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu, 532 self.GetECId())
533
534 - def _LockAndExecLU(self, lu, level, calc_timeout, pending=None):
535 """Execute a Logical Unit, with the needed locks. 536 537 This is a recursive function that starts locking the given level, and 538 proceeds up, till there are no more locks to acquire. Then it executes the 539 given LU and its opcodes. 540 541 """ 542 pending = pending or [] 543 logging.debug("Looking at locks of level %s, still need to obtain %s", 544 level, pending) 545 adding_locks = level in lu.add_locks 546 acquiring_locks = level in lu.needed_locks 547 548 if level not in locking.LEVELS: 549 if pending: 550 self._RequestAndWait(pending, calc_timeout()) 551 lu.cfg.OutDate() 552 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 553 pending = [] 554 555 logging.debug("Finished acquiring locks") 556 557 if self._cbs: 558 self._cbs.NotifyStart() 559 560 try: 561 result = self._ExecLU(lu) 562 except errors.OpPrereqError, err: 563 (_, ecode) = err.args 564 if ecode != errors.ECODE_TEMP_NORES: 565 raise 566 logging.debug("Temporarily out of resources; will retry internally") 567 try: 568 lu.PrepareRetry(self.Log) 569 if self._cbs: 570 self._cbs.NotifyRetry() 571 except errors.OpRetryNotSupportedError: 572 logging.debug("LU does not know how to retry.") 573 raise err 574 raise LockAcquireTimeout() 575 except AssertionError, err: 576 # this is a bit ugly, as we don't know from which phase 577 # (prereq, exec) this comes; but it's better than an exception 578 # with no information 579 (_, _, tb) = sys.exc_info() 580 err_info = traceback.format_tb(tb) 581 del tb 582 logging.exception("Detected AssertionError") 583 raise errors.OpExecError("Internal assertion error: please report" 584 " this as a bug.\nError message: '%s';" 585 " location:\n%s" % (str(err), err_info[-1])) 586 return result 587 588 # Determine if the acquiring is opportunistic up front 589 opportunistic = lu.opportunistic_locks[level] 590 591 dont_collate = lu.dont_collate_locks[level] 592 593 if dont_collate and pending: 594 self._RequestAndWait(pending, calc_timeout()) 595 lu.cfg.OutDate() 596 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 597 pending = [] 598 599 if adding_locks and opportunistic: 600 # We could simultaneously acquire locks opportunistically and add new 601 # ones, but that would require altering the API, and no use cases are 602 # present in the system at the moment. 603 raise NotImplementedError("Can't opportunistically acquire locks when" 604 " adding new ones") 605 606 if adding_locks and acquiring_locks and \ 607 lu.needed_locks[level] == locking.ALL_SET: 608 # It would also probably be possible to acquire all locks of a certain 609 # type while adding new locks, but there is no use case at the moment. 610 raise NotImplementedError("Can't request all locks of a certain level" 611 " and add new locks") 612 613 if adding_locks or acquiring_locks: 614 self._CheckLocksEnabled() 615 616 lu.DeclareLocks(level) 617 share = lu.share_locks[level] 618 opportunistic_count = lu.opportunistic_locks_count[level] 619 620 try: 621 if acquiring_locks: 622 needed_locks = _LockList(lu.needed_locks[level]) 623 else: 624 needed_locks = [] 625 626 if adding_locks: 627 needed_locks.extend(_LockList(lu.add_locks[level])) 628 629 timeout = calc_timeout() 630 if timeout is not None and not opportunistic: 631 pending = pending + self._AcquireLocks(level, needed_locks, share, 632 opportunistic, timeout, 633 request_only=True) 634 else: 635 if pending: 636 self._RequestAndWait(pending, calc_timeout()) 637 lu.cfg.OutDate() 638 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 639 pending = [] 640 self._AcquireLocks(level, needed_locks, share, opportunistic, 641 timeout, 642 opportunistic_count=opportunistic_count) 643 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 644 645 result = self._LockAndExecLU(lu, level + 1, calc_timeout, 646 pending=pending) 647 finally: 648 levelname = locking.LEVEL_NAMES[level] 649 logging.debug("Freeing locks at level %s for %s", 650 levelname, self._wconfdcontext) 651 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname) 652 else: 653 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending) 654 655 return result
656 657 # pylint: disable=R0201
658 - def _CheckLUResult(self, op, result):
659 """Check the LU result against the contract in the opcode. 660 661 """ 662 resultcheck_fn = op.OP_RESULT 663 if not (resultcheck_fn is None or resultcheck_fn(result)): 664 logging.error("Expected opcode result matching %s, got %s", 665 resultcheck_fn, result) 666 if not getattr(op, "dry_run", False): 667 # FIXME: LUs should still behave in dry_run mode, or 668 # alternately we should have OP_DRYRUN_RESULT; in the 669 # meantime, we simply skip the OP_RESULT check in dry-run mode 670 raise errors.OpResultError("Opcode result does not match %s: %s" % 671 (resultcheck_fn, utils.Truncate(result, 80)))
672
673 - def _PrepareLockListsAndExecLU(self, op, lu_class, calc_timeout):
674 """Execute an opcode. 675 676 @param op: the opcode to be executed 677 @param lu_class: the LU class implementing the current opcode 678 @param calc_timeout: The function calculating the time remaining 679 to acquire all locks, None for no timeout 680 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 681 amount of time 682 683 """ 684 try: 685 if self._enable_locks: 686 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 687 # and in a shared fashion otherwise (to prevent concurrent run with 688 # an exclusive LU. 689 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 690 not lu_class.REQ_BGL, False, calc_timeout()) 691 elif lu_class.REQ_BGL: 692 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 693 " disabled" % op.OP_ID) 694 695 lu = lu_class(self, op, self.cfg, self.rpc, 696 self._wconfdcontext, self.wconfd) 697 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) 698 _CheckSecretParameters(op) 699 lu.ExpandNames() 700 assert lu.needed_locks is not None, "needed_locks not set by LU" 701 702 try: 703 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 704 calc_timeout) 705 finally: 706 if self._ec_id: 707 self.cfg.DropECReservations(self._ec_id) 708 finally: 709 self.wconfd.Client().FreeLocksLevel( 710 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER]) 711 self._cbs = None 712 713 return result
714
715 - def ExecOpCode(self, op, cbs, timeout=None):
716 """Execute an opcode. 717 718 @type op: an OpCode instance 719 @param op: the opcode to be executed 720 @type cbs: L{OpExecCbBase} 721 @param cbs: Runtime callbacks 722 @type timeout: float or None 723 @param timeout: Maximum time to acquire all locks, None for no timeout 724 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 725 amount of time 726 727 """ 728 if not isinstance(op, opcodes.OpCode): 729 raise errors.ProgrammerError("Non-opcode instance passed" 730 " to ExecOpcode (%s)" % type(op)) 731 732 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 733 if lu_class is None: 734 raise errors.OpCodeUnknown("Unknown opcode") 735 736 if timeout is None: 737 calc_timeout = lambda: None 738 else: 739 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 740 741 self._cbs = cbs 742 try: 743 result = self._PrepareLockListsAndExecLU(op, lu_class, calc_timeout) 744 745 # The post hooks below are always executed with a SUCCESS status because 746 # all the possible errors during pre hooks and LU execution cause 747 # exception and therefore the statement below will be skipped. 748 if self._hm is not None: 749 self._hm.RunPhase(constants.HOOKS_PHASE_POST, is_global=True, 750 post_status=constants.POST_HOOKS_STATUS_SUCCESS) 751 except: 752 # execute global post hooks with the failed status on any exception 753 hooksmaster.ExecGlobalPostHooks(op.OP_ID, self.cfg.GetMasterNodeName(), 754 self.rpc.call_hooks_runner, 755 logging.warning, 756 self.cfg.GetClusterName(), 757 self.cfg.GetMasterNode(), self.GetECId(), 758 constants.POST_HOOKS_STATUS_ERROR) 759 raise 760 761 self._CheckLUResult(op, result) 762 return result
763
764 - def Log(self, *args):
765 """Forward call to feedback callback function. 766 767 """ 768 if self._cbs: 769 self._cbs.Feedback(*args)
770
771 - def LogStep(self, current, total, message):
772 """Log a change in LU execution progress. 773 774 """ 775 logging.debug("Step %d/%d %s", current, total, message) 776 self.Log("STEP %d/%d %s" % (current, total, message))
777
778 - def LogWarning(self, message, *args, **kwargs):
779 """Log a warning to the logs and the user. 780 781 The optional keyword argument is 'hint' and can be used to show a 782 hint to the user (presumably related to the warning). If the 783 message is empty, it will not be printed at all, allowing one to 784 show only a hint. 785 786 """ 787 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 788 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 789 if args: 790 message = message % tuple(args) 791 if message: 792 logging.warning(message) 793 self.Log(" - WARNING: %s" % message) 794 if "hint" in kwargs: 795 self.Log(" Hint: %s" % kwargs["hint"])
796
797 - def LogInfo(self, message, *args):
798 """Log an informational message to the logs and the user. 799 800 """ 801 if args: 802 message = message % tuple(args) 803 logging.info(message) 804 self.Log(" - INFO: %s" % message)
805
806 - def GetECId(self):
807 """Returns the current execution context ID. 808 809 """ 810 if not self._ec_id: 811 raise errors.ProgrammerError("Tried to use execution context id when" 812 " not set") 813 return self._ec_id
814