Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing the logic behind the cluster operations 
 32   
 33  This module implements the logic for doing operations in the cluster. There 
 34  are two kinds of classes defined: 
 35    - logical units, which know how to deal with their specific opcode only 
 36    - the processor, which dispatches the opcodes to their logical units 
 37   
 38  """ 
 39   
 40  import sys 
 41  import logging 
 42  import random 
 43  import time 
 44  import itertools 
 45  import traceback 
 46   
 47  from ganeti import opcodes 
 48  from ganeti import opcodes_base 
 49  from ganeti import constants 
 50  from ganeti import errors 
 51  from ganeti import hooksmaster 
 52  from ganeti import cmdlib 
 53  from ganeti import locking 
 54  from ganeti import utils 
 55  from ganeti import compat 
 56   
 57   
 58  _OP_PREFIX = "Op" 
 59  _LU_PREFIX = "LU" 
 60   
 61  #: LU classes which don't need to acquire the node allocation lock 
 62  #: (L{locking.NAL}) when they acquire all node or node resource locks 
 63  _NODE_ALLOC_WHITELIST = frozenset([]) 
 64   
 65  #: LU classes which don't need to acquire the node allocation lock 
 66  #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node 
 67  #: or node resource locks 
 68  _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([ 
 69    cmdlib.LUBackupExport, 
 70    cmdlib.LUBackupRemove, 
 71    cmdlib.LUOobCommand, 
 72    ]) 
 73   
 74   
75 -class LockAcquireTimeout(Exception):
76 """Exception to report timeouts on acquiring locks. 77 78 """
79 80
81 -def _CalculateLockAttemptTimeouts():
82 """Calculate timeouts for lock attempts. 83 84 """ 85 result = [constants.LOCK_ATTEMPTS_MINWAIT] 86 running_sum = result[0] 87 88 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 89 # blocking acquire 90 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 91 timeout = (result[-1] * 1.05) ** 1.25 92 93 # Cap max timeout. This gives other jobs a chance to run even if 94 # we're still trying to get our locks, before finally moving to a 95 # blocking acquire. 96 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 97 # And also cap the lower boundary for safety 98 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 99 100 result.append(timeout) 101 running_sum += timeout 102 103 return result
104 105
106 -class LockAttemptTimeoutStrategy(object):
107 """Class with lock acquire timeout strategy. 108 109 """ 110 __slots__ = [ 111 "_timeouts", 112 "_random_fn", 113 "_time_fn", 114 ] 115 116 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 117
118 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
119 """Initializes this class. 120 121 @param _time_fn: Time function for unittests 122 @param _random_fn: Random number generator for unittests 123 124 """ 125 object.__init__(self) 126 127 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 128 self._time_fn = _time_fn 129 self._random_fn = _random_fn
130
131 - def NextAttempt(self):
132 """Returns the timeout for the next attempt. 133 134 """ 135 try: 136 timeout = self._timeouts.next() 137 except StopIteration: 138 # No more timeouts, do blocking acquire 139 timeout = None 140 141 if timeout is not None: 142 # Add a small variation (-/+ 5%) to timeout. This helps in situations 143 # where two or more jobs are fighting for the same lock(s). 144 variation_range = timeout * 0.1 145 timeout += ((self._random_fn() * variation_range) - 146 (variation_range * 0.5)) 147 148 return timeout
149 150
151 -class OpExecCbBase(object): # pylint: disable=W0232
152 """Base class for OpCode execution callbacks. 153 154 """
155 - def NotifyStart(self):
156 """Called when we are about to execute the LU. 157 158 This function is called when we're about to start the lu's Exec() method, 159 that is, after we have acquired all locks. 160 161 """
162
163 - def Feedback(self, *args):
164 """Sends feedback from the LU code to the end-user. 165 166 """
167
168 - def CurrentPriority(self): # pylint: disable=R0201
169 """Returns current priority or C{None}. 170 171 """ 172 return None 173
174 - def SubmitManyJobs(self, jobs):
175 """Submits jobs for processing. 176 177 See L{jqueue.JobQueue.SubmitManyJobs}. 178 179 """ 180 raise NotImplementedError
181 182
183 -def _LUNameForOpName(opname):
184 """Computes the LU name for a given OpCode name. 185 186 """ 187 assert opname.startswith(_OP_PREFIX), \ 188 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 189 190 return _LU_PREFIX + opname[len(_OP_PREFIX):]
191 192
193 -def _ComputeDispatchTable():
194 """Computes the opcode-to-lu dispatch table. 195 196 """ 197 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 198 for op in opcodes.OP_MAPPING.values() 199 if op.WITH_LU)
200 201
202 -def _SetBaseOpParams(src, defcomment, dst):
203 """Copies basic opcode parameters. 204 205 @type src: L{opcodes.OpCode} 206 @param src: Source opcode 207 @type defcomment: string 208 @param defcomment: Comment to specify if not already given 209 @type dst: L{opcodes.OpCode} 210 @param dst: Destination opcode 211 212 """ 213 if hasattr(src, "debug_level"): 214 dst.debug_level = src.debug_level 215 216 if (getattr(dst, "priority", None) is None and 217 hasattr(src, "priority")): 218 dst.priority = src.priority 219 220 if not getattr(dst, opcodes_base.COMMENT_ATTR, None): 221 dst.comment = defcomment 222 223 if hasattr(src, constants.OPCODE_REASON): 224 dst.reason = getattr(dst, constants.OPCODE_REASON, []) 225 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
226 227
228 -def _ProcessResult(submit_fn, op, result):
229 """Examines opcode result. 230 231 If necessary, additional processing on the result is done. 232 233 """ 234 if isinstance(result, cmdlib.ResultWithJobs): 235 # Copy basic parameters (e.g. priority) 236 map(compat.partial(_SetBaseOpParams, op, 237 "Submitted by %s" % op.OP_ID), 238 itertools.chain(*result.jobs)) 239 240 # Submit jobs 241 job_submission = submit_fn(result.jobs) 242 243 # Build dictionary 244 result = result.other 245 246 assert constants.JOB_IDS_KEY not in result, \ 247 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 248 249 result[constants.JOB_IDS_KEY] = job_submission 250 251 return result
252 253
254 -def _FailingSubmitManyJobs(_):
255 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 256 257 """ 258 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 259 " queries) can not submit jobs")
260 261
262 -def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST, 263 _nal_whitelist=_NODE_ALLOC_WHITELIST):
264 """Performs consistency checks on locks acquired by a logical unit. 265 266 @type lu: L{cmdlib.LogicalUnit} 267 @param lu: Logical unit instance 268 @type glm: L{locking.GanetiLockManager} 269 @param glm: Lock manager 270 271 """ 272 if not __debug__: 273 return 274 275 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL) 276 277 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: 278 # TODO: Verify using actual lock mode, not using LU variables 279 if level in lu.needed_locks: 280 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC] 281 share_level = lu.share_locks[level] 282 283 if lu.__class__ in _mode_whitelist: 284 assert share_node_alloc != share_level, \ 285 "LU is whitelisted to use different modes for node allocation lock" 286 else: 287 assert bool(share_node_alloc) == bool(share_level), \ 288 ("Node allocation lock must be acquired using the same mode as nodes" 289 " and node resources") 290 291 if lu.__class__ in _nal_whitelist: 292 assert not have_nal, \ 293 "LU is whitelisted for not acquiring the node allocation lock" 294 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level): 295 assert have_nal, \ 296 ("Node allocation lock must be used if an LU acquires all nodes" 297 " or node resources")
298 299
300 -class Processor(object):
301 """Object which runs OpCodes""" 302 DISPATCH_TABLE = _ComputeDispatchTable() 303
304 - def __init__(self, context, ec_id, enable_locks=True):
305 """Constructor for Processor 306 307 @type context: GanetiContext 308 @param context: global Ganeti context 309 @type ec_id: string 310 @param ec_id: execution context identifier 311 312 """ 313 self.context = context 314 self._ec_id = ec_id 315 self._cbs = None 316 self.rpc = context.rpc 317 self.hmclass = hooksmaster.HooksMaster 318 self._enable_locks = enable_locks
319
320 - def _CheckLocksEnabled(self):
321 """Checks if locking is enabled. 322 323 @raise errors.ProgrammerError: In case locking is not enabled 324 325 """ 326 if not self._enable_locks: 327 raise errors.ProgrammerError("Attempted to use disabled locks")
328
329 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
330 """Acquires locks via the Ganeti lock manager. 331 332 @type level: int 333 @param level: Lock level 334 @type names: list or string 335 @param names: Lock names 336 @type shared: bool 337 @param shared: Whether the locks should be acquired in shared mode 338 @type opportunistic: bool 339 @param opportunistic: Whether to acquire opportunistically 340 @type timeout: None or float 341 @param timeout: Timeout for acquiring the locks 342 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 343 amount of time 344 345 """ 346 self._CheckLocksEnabled() 347 348 if self._cbs: 349 priority = self._cbs.CurrentPriority() 350 else: 351 priority = None 352 353 acquired = self.context.glm.acquire(level, names, shared=shared, 354 timeout=timeout, priority=priority, 355 opportunistic=opportunistic) 356 357 if acquired is None: 358 raise LockAcquireTimeout() 359 360 return acquired
361
362 - def _ExecLU(self, lu):
363 """Logical Unit execution sequence. 364 365 """ 366 write_count = self.context.cfg.write_count 367 lu.CheckPrereq() 368 369 hm = self.BuildHooksManager(lu) 370 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 371 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 372 self.Log, None) 373 374 if getattr(lu.op, "dry_run", False): 375 # in this mode, no post-hooks are run, and the config is not 376 # written (as it might have been modified by another LU, and we 377 # shouldn't do writeout on behalf of other threads 378 self.LogInfo("dry-run mode requested, not actually executing" 379 " the operation") 380 return lu.dry_run_result 381 382 if self._cbs: 383 submit_mj_fn = self._cbs.SubmitManyJobs 384 else: 385 submit_mj_fn = _FailingSubmitManyJobs 386 387 try: 388 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 389 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 390 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 391 self.Log, result) 392 finally: 393 # FIXME: This needs locks if not lu_class.REQ_BGL 394 if write_count != self.context.cfg.write_count: 395 hm.RunConfigUpdate() 396 397 return result
398
399 - def BuildHooksManager(self, lu):
400 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
401
402 - def _LockAndExecLU(self, lu, level, calc_timeout):
403 """Execute a Logical Unit, with the needed locks. 404 405 This is a recursive function that starts locking the given level, and 406 proceeds up, till there are no more locks to acquire. Then it executes the 407 given LU and its opcodes. 408 409 """ 410 glm = self.context.glm 411 adding_locks = level in lu.add_locks 412 acquiring_locks = level in lu.needed_locks 413 414 if level not in locking.LEVELS: 415 _VerifyLocks(lu, glm) 416 417 if self._cbs: 418 self._cbs.NotifyStart() 419 420 try: 421 result = self._ExecLU(lu) 422 except AssertionError, err: 423 # this is a bit ugly, as we don't know from which phase 424 # (prereq, exec) this comes; but it's better than an exception 425 # with no information 426 (_, _, tb) = sys.exc_info() 427 err_info = traceback.format_tb(tb) 428 del tb 429 logging.exception("Detected AssertionError") 430 raise errors.OpExecError("Internal assertion error: please report" 431 " this as a bug.\nError message: '%s';" 432 " location:\n%s" % (str(err), err_info[-1])) 433 434 elif adding_locks and acquiring_locks: 435 # We could both acquire and add locks at the same level, but for now we 436 # don't need this, so we'll avoid the complicated code needed. 437 raise NotImplementedError("Can't declare locks to acquire when adding" 438 " others") 439 440 elif adding_locks or acquiring_locks: 441 self._CheckLocksEnabled() 442 443 lu.DeclareLocks(level) 444 share = lu.share_locks[level] 445 opportunistic = lu.opportunistic_locks[level] 446 447 try: 448 assert adding_locks ^ acquiring_locks, \ 449 "Locks must be either added or acquired" 450 451 if acquiring_locks: 452 # Acquiring locks 453 needed_locks = lu.needed_locks[level] 454 455 self._AcquireLocks(level, needed_locks, share, opportunistic, 456 calc_timeout()) 457 else: 458 # Adding locks 459 add_locks = lu.add_locks[level] 460 lu.remove_locks[level] = add_locks 461 462 try: 463 glm.add(level, add_locks, acquired=1, shared=share) 464 except errors.LockError: 465 logging.exception("Detected lock error in level %s for locks" 466 " %s, shared=%s", level, add_locks, share) 467 raise errors.OpPrereqError( 468 "Couldn't add locks (%s), most likely because of another" 469 " job who added them first" % add_locks, 470 errors.ECODE_NOTUNIQUE) 471 472 try: 473 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 474 finally: 475 if level in lu.remove_locks: 476 glm.remove(level, lu.remove_locks[level]) 477 finally: 478 if glm.is_owned(level): 479 glm.release(level) 480 481 else: 482 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 483 484 return result
485 486 # pylint: disable=R0201
487 - def _CheckLUResult(self, op, result):
488 """Check the LU result against the contract in the opcode. 489 490 """ 491 resultcheck_fn = op.OP_RESULT 492 if not (resultcheck_fn is None or resultcheck_fn(result)): 493 logging.error("Expected opcode result matching %s, got %s", 494 resultcheck_fn, result) 495 if not getattr(op, "dry_run", False): 496 # FIXME: LUs should still behave in dry_run mode, or 497 # alternately we should have OP_DRYRUN_RESULT; in the 498 # meantime, we simply skip the OP_RESULT check in dry-run mode 499 raise errors.OpResultError("Opcode result does not match %s: %s" % 500 (resultcheck_fn, utils.Truncate(result, 80)))
501
502 - def ExecOpCode(self, op, cbs, timeout=None):
503 """Execute an opcode. 504 505 @type op: an OpCode instance 506 @param op: the opcode to be executed 507 @type cbs: L{OpExecCbBase} 508 @param cbs: Runtime callbacks 509 @type timeout: float or None 510 @param timeout: Maximum time to acquire all locks, None for no timeout 511 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 512 amount of time 513 514 """ 515 if not isinstance(op, opcodes.OpCode): 516 raise errors.ProgrammerError("Non-opcode instance passed" 517 " to ExecOpcode (%s)" % type(op)) 518 519 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 520 if lu_class is None: 521 raise errors.OpCodeUnknown("Unknown opcode") 522 523 if timeout is None: 524 calc_timeout = lambda: None 525 else: 526 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 527 528 self._cbs = cbs 529 try: 530 if self._enable_locks: 531 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 532 # and in a shared fashion otherwise (to prevent concurrent run with 533 # an exclusive LU. 534 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 535 not lu_class.REQ_BGL, False, calc_timeout()) 536 elif lu_class.REQ_BGL: 537 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 538 " disabled" % op.OP_ID) 539 540 try: 541 lu = lu_class(self, op, self.context, self.rpc) 542 lu.ExpandNames() 543 assert lu.needed_locks is not None, "needed_locks not set by LU" 544 545 try: 546 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 547 calc_timeout) 548 finally: 549 if self._ec_id: 550 self.context.cfg.DropECReservations(self._ec_id) 551 finally: 552 # Release BGL if owned 553 if self.context.glm.is_owned(locking.LEVEL_CLUSTER): 554 assert self._enable_locks 555 self.context.glm.release(locking.LEVEL_CLUSTER) 556 finally: 557 self._cbs = None 558 559 self._CheckLUResult(op, result) 560 561 return result
562
563 - def Log(self, *args):
564 """Forward call to feedback callback function. 565 566 """ 567 if self._cbs: 568 self._cbs.Feedback(*args)
569
570 - def LogStep(self, current, total, message):
571 """Log a change in LU execution progress. 572 573 """ 574 logging.debug("Step %d/%d %s", current, total, message) 575 self.Log("STEP %d/%d %s" % (current, total, message))
576
577 - def LogWarning(self, message, *args, **kwargs):
578 """Log a warning to the logs and the user. 579 580 The optional keyword argument is 'hint' and can be used to show a 581 hint to the user (presumably related to the warning). If the 582 message is empty, it will not be printed at all, allowing one to 583 show only a hint. 584 585 """ 586 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 587 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 588 if args: 589 message = message % tuple(args) 590 if message: 591 logging.warning(message) 592 self.Log(" - WARNING: %s" % message) 593 if "hint" in kwargs: 594 self.Log(" Hint: %s" % kwargs["hint"])
595
596 - def LogInfo(self, message, *args):
597 """Log an informational message to the logs and the user. 598 599 """ 600 if args: 601 message = message % tuple(args) 602 logging.info(message) 603 self.Log(" - INFO: %s" % message)
604
605 - def GetECId(self):
606 """Returns the current execution context ID. 607 608 """ 609 if not self._ec_id: 610 raise errors.ProgrammerError("Tried to use execution context id when" 611 " not set") 612 return self._ec_id
613