Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module implementing the logic behind the cluster operations 
 23   
 24  This module implements the logic for doing operations in the cluster. There 
 25  are two kinds of classes defined: 
 26    - logical units, which know how to deal with their specific opcode only 
 27    - the processor, which dispatches the opcodes to their logical units 
 28   
 29  """ 
 30   
 31  import sys 
 32  import logging 
 33  import random 
 34  import time 
 35  import itertools 
 36  import traceback 
 37   
 38  from ganeti import opcodes 
 39  from ganeti import constants 
 40  from ganeti import errors 
 41  from ganeti import hooksmaster 
 42  from ganeti import cmdlib 
 43  from ganeti import locking 
 44  from ganeti import utils 
 45  from ganeti import compat 
 46   
 47   
 48  _OP_PREFIX = "Op" 
 49  _LU_PREFIX = "LU" 
 50   
 51  #: LU classes which don't need to acquire the node allocation lock 
 52  #: (L{locking.NAL}) when they acquire all node or node resource locks 
 53  _NODE_ALLOC_WHITELIST = frozenset([]) 
 54   
 55  #: LU classes which don't need to acquire the node allocation lock 
 56  #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node 
 57  #: or node resource locks 
 58  _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([ 
 59    cmdlib.LUBackupExport, 
 60    cmdlib.LUBackupRemove, 
 61    cmdlib.LUOobCommand, 
 62    ]) 
 63   
 64   
65 -class LockAcquireTimeout(Exception):
66 """Exception to report timeouts on acquiring locks. 67 68 """
69 70
71 -def _CalculateLockAttemptTimeouts():
72 """Calculate timeouts for lock attempts. 73 74 """ 75 result = [constants.LOCK_ATTEMPTS_MINWAIT] 76 running_sum = result[0] 77 78 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 79 # blocking acquire 80 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 81 timeout = (result[-1] * 1.05) ** 1.25 82 83 # Cap max timeout. This gives other jobs a chance to run even if 84 # we're still trying to get our locks, before finally moving to a 85 # blocking acquire. 86 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 87 # And also cap the lower boundary for safety 88 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 89 90 result.append(timeout) 91 running_sum += timeout 92 93 return result
94 95
96 -class LockAttemptTimeoutStrategy(object):
97 """Class with lock acquire timeout strategy. 98 99 """ 100 __slots__ = [ 101 "_timeouts", 102 "_random_fn", 103 "_time_fn", 104 ] 105 106 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 107
108 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
109 """Initializes this class. 110 111 @param _time_fn: Time function for unittests 112 @param _random_fn: Random number generator for unittests 113 114 """ 115 object.__init__(self) 116 117 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 118 self._time_fn = _time_fn 119 self._random_fn = _random_fn
120
121 - def NextAttempt(self):
122 """Returns the timeout for the next attempt. 123 124 """ 125 try: 126 timeout = self._timeouts.next() 127 except StopIteration: 128 # No more timeouts, do blocking acquire 129 timeout = None 130 131 if timeout is not None: 132 # Add a small variation (-/+ 5%) to timeout. This helps in situations 133 # where two or more jobs are fighting for the same lock(s). 134 variation_range = timeout * 0.1 135 timeout += ((self._random_fn() * variation_range) - 136 (variation_range * 0.5)) 137 138 return timeout
139 140
141 -class OpExecCbBase: # pylint: disable=W0232
142 """Base class for OpCode execution callbacks. 143 144 """
145 - def NotifyStart(self):
146 """Called when we are about to execute the LU. 147 148 This function is called when we're about to start the lu's Exec() method, 149 that is, after we have acquired all locks. 150 151 """
152
153 - def Feedback(self, *args):
154 """Sends feedback from the LU code to the end-user. 155 156 """
157
158 - def CurrentPriority(self): # pylint: disable=R0201
159 """Returns current priority or C{None}. 160 161 """ 162 return None 163
164 - def SubmitManyJobs(self, jobs):
165 """Submits jobs for processing. 166 167 See L{jqueue.JobQueue.SubmitManyJobs}. 168 169 """ 170 raise NotImplementedError
171 172
173 -def _LUNameForOpName(opname):
174 """Computes the LU name for a given OpCode name. 175 176 """ 177 assert opname.startswith(_OP_PREFIX), \ 178 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 179 180 return _LU_PREFIX + opname[len(_OP_PREFIX):]
181 182
183 -def _ComputeDispatchTable():
184 """Computes the opcode-to-lu dispatch table. 185 186 """ 187 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 188 for op in opcodes.OP_MAPPING.values() 189 if op.WITH_LU)
190 191
192 -def _SetBaseOpParams(src, defcomment, dst):
193 """Copies basic opcode parameters. 194 195 @type src: L{opcodes.OpCode} 196 @param src: Source opcode 197 @type defcomment: string 198 @param defcomment: Comment to specify if not already given 199 @type dst: L{opcodes.OpCode} 200 @param dst: Destination opcode 201 202 """ 203 if hasattr(src, "debug_level"): 204 dst.debug_level = src.debug_level 205 206 if (getattr(dst, "priority", None) is None and 207 hasattr(src, "priority")): 208 dst.priority = src.priority 209 210 if not getattr(dst, opcodes.COMMENT_ATTR, None): 211 dst.comment = defcomment 212 213 if hasattr(src, constants.OPCODE_REASON): 214 dst.reason = getattr(dst, constants.OPCODE_REASON, []) 215 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
216 217
218 -def _ProcessResult(submit_fn, op, result):
219 """Examines opcode result. 220 221 If necessary, additional processing on the result is done. 222 223 """ 224 if isinstance(result, cmdlib.ResultWithJobs): 225 # Copy basic parameters (e.g. priority) 226 map(compat.partial(_SetBaseOpParams, op, 227 "Submitted by %s" % op.OP_ID), 228 itertools.chain(*result.jobs)) 229 230 # Submit jobs 231 job_submission = submit_fn(result.jobs) 232 233 # Build dictionary 234 result = result.other 235 236 assert constants.JOB_IDS_KEY not in result, \ 237 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 238 239 result[constants.JOB_IDS_KEY] = job_submission 240 241 return result
242 243
244 -def _FailingSubmitManyJobs(_):
245 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 246 247 """ 248 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 249 " queries) can not submit jobs")
250 251
252 -def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST, 253 _nal_whitelist=_NODE_ALLOC_WHITELIST):
254 """Performs consistency checks on locks acquired by a logical unit. 255 256 @type lu: L{cmdlib.LogicalUnit} 257 @param lu: Logical unit instance 258 @type glm: L{locking.GanetiLockManager} 259 @param glm: Lock manager 260 261 """ 262 if not __debug__: 263 return 264 265 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL) 266 267 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: 268 # TODO: Verify using actual lock mode, not using LU variables 269 if level in lu.needed_locks: 270 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC] 271 share_level = lu.share_locks[level] 272 273 if lu.__class__ in _mode_whitelist: 274 assert share_node_alloc != share_level, \ 275 "LU is whitelisted to use different modes for node allocation lock" 276 else: 277 assert bool(share_node_alloc) == bool(share_level), \ 278 ("Node allocation lock must be acquired using the same mode as nodes" 279 " and node resources") 280 281 if lu.__class__ in _nal_whitelist: 282 assert not have_nal, \ 283 "LU is whitelisted for not acquiring the node allocation lock" 284 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level): 285 assert have_nal, \ 286 ("Node allocation lock must be used if an LU acquires all nodes" 287 " or node resources")
288 289
290 -class Processor(object):
291 """Object which runs OpCodes""" 292 DISPATCH_TABLE = _ComputeDispatchTable() 293
294 - def __init__(self, context, ec_id, enable_locks=True):
295 """Constructor for Processor 296 297 @type context: GanetiContext 298 @param context: global Ganeti context 299 @type ec_id: string 300 @param ec_id: execution context identifier 301 302 """ 303 self.context = context 304 self._ec_id = ec_id 305 self._cbs = None 306 self.rpc = context.rpc 307 self.hmclass = hooksmaster.HooksMaster 308 self._enable_locks = enable_locks
309
310 - def _CheckLocksEnabled(self):
311 """Checks if locking is enabled. 312 313 @raise errors.ProgrammerError: In case locking is not enabled 314 315 """ 316 if not self._enable_locks: 317 raise errors.ProgrammerError("Attempted to use disabled locks")
318
319 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
320 """Acquires locks via the Ganeti lock manager. 321 322 @type level: int 323 @param level: Lock level 324 @type names: list or string 325 @param names: Lock names 326 @type shared: bool 327 @param shared: Whether the locks should be acquired in shared mode 328 @type opportunistic: bool 329 @param opportunistic: Whether to acquire opportunistically 330 @type timeout: None or float 331 @param timeout: Timeout for acquiring the locks 332 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 333 amount of time 334 335 """ 336 self._CheckLocksEnabled() 337 338 if self._cbs: 339 priority = self._cbs.CurrentPriority() 340 else: 341 priority = None 342 343 acquired = self.context.glm.acquire(level, names, shared=shared, 344 timeout=timeout, priority=priority, 345 opportunistic=opportunistic) 346 347 if acquired is None: 348 raise LockAcquireTimeout() 349 350 return acquired
351
352 - def _ExecLU(self, lu):
353 """Logical Unit execution sequence. 354 355 """ 356 write_count = self.context.cfg.write_count 357 lu.CheckPrereq() 358 359 hm = self.BuildHooksManager(lu) 360 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 361 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 362 self.Log, None) 363 364 if getattr(lu.op, "dry_run", False): 365 # in this mode, no post-hooks are run, and the config is not 366 # written (as it might have been modified by another LU, and we 367 # shouldn't do writeout on behalf of other threads 368 self.LogInfo("dry-run mode requested, not actually executing" 369 " the operation") 370 return lu.dry_run_result 371 372 if self._cbs: 373 submit_mj_fn = self._cbs.SubmitManyJobs 374 else: 375 submit_mj_fn = _FailingSubmitManyJobs 376 377 try: 378 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 379 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 380 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 381 self.Log, result) 382 finally: 383 # FIXME: This needs locks if not lu_class.REQ_BGL 384 if write_count != self.context.cfg.write_count: 385 hm.RunConfigUpdate() 386 387 return result
388
389 - def BuildHooksManager(self, lu):
390 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
391
392 - def _LockAndExecLU(self, lu, level, calc_timeout):
393 """Execute a Logical Unit, with the needed locks. 394 395 This is a recursive function that starts locking the given level, and 396 proceeds up, till there are no more locks to acquire. Then it executes the 397 given LU and its opcodes. 398 399 """ 400 glm = self.context.glm 401 adding_locks = level in lu.add_locks 402 acquiring_locks = level in lu.needed_locks 403 404 if level not in locking.LEVELS: 405 _VerifyLocks(lu, glm) 406 407 if self._cbs: 408 self._cbs.NotifyStart() 409 410 try: 411 result = self._ExecLU(lu) 412 except AssertionError, err: 413 # this is a bit ugly, as we don't know from which phase 414 # (prereq, exec) this comes; but it's better than an exception 415 # with no information 416 (_, _, tb) = sys.exc_info() 417 err_info = traceback.format_tb(tb) 418 del tb 419 logging.exception("Detected AssertionError") 420 raise errors.OpExecError("Internal assertion error: please report" 421 " this as a bug.\nError message: '%s';" 422 " location:\n%s" % (str(err), err_info[-1])) 423 424 elif adding_locks and acquiring_locks: 425 # We could both acquire and add locks at the same level, but for now we 426 # don't need this, so we'll avoid the complicated code needed. 427 raise NotImplementedError("Can't declare locks to acquire when adding" 428 " others") 429 430 elif adding_locks or acquiring_locks: 431 self._CheckLocksEnabled() 432 433 lu.DeclareLocks(level) 434 share = lu.share_locks[level] 435 opportunistic = lu.opportunistic_locks[level] 436 437 try: 438 assert adding_locks ^ acquiring_locks, \ 439 "Locks must be either added or acquired" 440 441 if acquiring_locks: 442 # Acquiring locks 443 needed_locks = lu.needed_locks[level] 444 445 self._AcquireLocks(level, needed_locks, share, opportunistic, 446 calc_timeout()) 447 else: 448 # Adding locks 449 add_locks = lu.add_locks[level] 450 lu.remove_locks[level] = add_locks 451 452 try: 453 glm.add(level, add_locks, acquired=1, shared=share) 454 except errors.LockError: 455 logging.exception("Detected lock error in level %s for locks" 456 " %s, shared=%s", level, add_locks, share) 457 raise errors.OpPrereqError( 458 "Couldn't add locks (%s), most likely because of another" 459 " job who added them first" % add_locks, 460 errors.ECODE_NOTUNIQUE) 461 462 try: 463 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 464 finally: 465 if level in lu.remove_locks: 466 glm.remove(level, lu.remove_locks[level]) 467 finally: 468 if glm.is_owned(level): 469 glm.release(level) 470 471 else: 472 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 473 474 return result
475
476 - def ExecOpCode(self, op, cbs, timeout=None):
477 """Execute an opcode. 478 479 @type op: an OpCode instance 480 @param op: the opcode to be executed 481 @type cbs: L{OpExecCbBase} 482 @param cbs: Runtime callbacks 483 @type timeout: float or None 484 @param timeout: Maximum time to acquire all locks, None for no timeout 485 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 486 amount of time 487 488 """ 489 if not isinstance(op, opcodes.OpCode): 490 raise errors.ProgrammerError("Non-opcode instance passed" 491 " to ExecOpcode (%s)" % type(op)) 492 493 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 494 if lu_class is None: 495 raise errors.OpCodeUnknown("Unknown opcode") 496 497 if timeout is None: 498 calc_timeout = lambda: None 499 else: 500 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 501 502 self._cbs = cbs 503 try: 504 if self._enable_locks: 505 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 506 # and in a shared fashion otherwise (to prevent concurrent run with 507 # an exclusive LU. 508 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 509 not lu_class.REQ_BGL, False, calc_timeout()) 510 elif lu_class.REQ_BGL: 511 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 512 " disabled" % op.OP_ID) 513 514 try: 515 lu = lu_class(self, op, self.context, self.rpc) 516 lu.ExpandNames() 517 assert lu.needed_locks is not None, "needed_locks not set by LU" 518 519 try: 520 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 521 calc_timeout) 522 finally: 523 if self._ec_id: 524 self.context.cfg.DropECReservations(self._ec_id) 525 finally: 526 # Release BGL if owned 527 if self.context.glm.is_owned(locking.LEVEL_CLUSTER): 528 assert self._enable_locks 529 self.context.glm.release(locking.LEVEL_CLUSTER) 530 finally: 531 self._cbs = None 532 533 resultcheck_fn = op.OP_RESULT 534 if not (resultcheck_fn is None or resultcheck_fn(result)): 535 logging.error("Expected opcode result matching %s, got %s", 536 resultcheck_fn, result) 537 if not getattr(op, "dry_run", False): 538 # FIXME: LUs should still behave in dry_run mode, or 539 # alternately we should have OP_DRYRUN_RESULT; in the 540 # meantime, we simply skip the OP_RESULT check in dry-run mode 541 raise errors.OpResultError("Opcode result does not match %s: %s" % 542 (resultcheck_fn, utils.Truncate(result, 80))) 543 544 return result
545
546 - def Log(self, *args):
547 """Forward call to feedback callback function. 548 549 """ 550 if self._cbs: 551 self._cbs.Feedback(*args)
552
553 - def LogStep(self, current, total, message):
554 """Log a change in LU execution progress. 555 556 """ 557 logging.debug("Step %d/%d %s", current, total, message) 558 self.Log("STEP %d/%d %s" % (current, total, message))
559
560 - def LogWarning(self, message, *args, **kwargs):
561 """Log a warning to the logs and the user. 562 563 The optional keyword argument is 'hint' and can be used to show a 564 hint to the user (presumably related to the warning). If the 565 message is empty, it will not be printed at all, allowing one to 566 show only a hint. 567 568 """ 569 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 570 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 571 if args: 572 message = message % tuple(args) 573 if message: 574 logging.warning(message) 575 self.Log(" - WARNING: %s" % message) 576 if "hint" in kwargs: 577 self.Log(" Hint: %s" % kwargs["hint"])
578
579 - def LogInfo(self, message, *args):
580 """Log an informational message to the logs and the user. 581 582 """ 583 if args: 584 message = message % tuple(args) 585 logging.info(message) 586 self.Log(" - INFO: %s" % message)
587
588 - def GetECId(self):
589 """Returns the current execution context ID. 590 591 """ 592 if not self._ec_id: 593 raise errors.ProgrammerError("Tried to use execution context id when" 594 " not set") 595 return self._ec_id
596