Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module implementing the logic behind the cluster operations 
 23   
 24  This module implements the logic for doing operations in the cluster. There 
 25  are two kinds of classes defined: 
 26    - logical units, which know how to deal with their specific opcode only 
 27    - the processor, which dispatches the opcodes to their logical units 
 28   
 29  """ 
 30   
 31  import sys 
 32  import logging 
 33  import random 
 34  import time 
 35  import itertools 
 36  import traceback 
 37   
 38  from ganeti import opcodes 
 39  from ganeti import constants 
 40  from ganeti import errors 
 41  from ganeti import hooksmaster 
 42  from ganeti import cmdlib 
 43  from ganeti import locking 
 44  from ganeti import utils 
 45  from ganeti import compat 
 46   
 47   
 48  _OP_PREFIX = "Op" 
 49  _LU_PREFIX = "LU" 
 50   
 51  #: LU classes which don't need to acquire the node allocation lock 
 52  #: (L{locking.NAL}) when they acquire all node or node resource locks 
 53  _NODE_ALLOC_WHITELIST = frozenset([]) 
 54   
 55  #: LU classes which don't need to acquire the node allocation lock 
 56  #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node 
 57  #: or node resource locks 
 58  _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([ 
 59    cmdlib.LUBackupExport, 
 60    cmdlib.LUBackupRemove, 
 61    cmdlib.LUOobCommand, 
 62    ]) 
 63   
 64   
65 -class LockAcquireTimeout(Exception):
66 """Exception to report timeouts on acquiring locks. 67 68 """
69 70
71 -def _CalculateLockAttemptTimeouts():
72 """Calculate timeouts for lock attempts. 73 74 """ 75 result = [constants.LOCK_ATTEMPTS_MINWAIT] 76 running_sum = result[0] 77 78 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 79 # blocking acquire 80 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 81 timeout = (result[-1] * 1.05) ** 1.25 82 83 # Cap max timeout. This gives other jobs a chance to run even if 84 # we're still trying to get our locks, before finally moving to a 85 # blocking acquire. 86 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 87 # And also cap the lower boundary for safety 88 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 89 90 result.append(timeout) 91 running_sum += timeout 92 93 return result
94 95
96 -class LockAttemptTimeoutStrategy(object):
97 """Class with lock acquire timeout strategy. 98 99 """ 100 __slots__ = [ 101 "_timeouts", 102 "_random_fn", 103 "_time_fn", 104 ] 105 106 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 107
108 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
109 """Initializes this class. 110 111 @param _time_fn: Time function for unittests 112 @param _random_fn: Random number generator for unittests 113 114 """ 115 object.__init__(self) 116 117 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 118 self._time_fn = _time_fn 119 self._random_fn = _random_fn
120
121 - def NextAttempt(self):
122 """Returns the timeout for the next attempt. 123 124 """ 125 try: 126 timeout = self._timeouts.next() 127 except StopIteration: 128 # No more timeouts, do blocking acquire 129 timeout = None 130 131 if timeout is not None: 132 # Add a small variation (-/+ 5%) to timeout. This helps in situations 133 # where two or more jobs are fighting for the same lock(s). 134 variation_range = timeout * 0.1 135 timeout += ((self._random_fn() * variation_range) - 136 (variation_range * 0.5)) 137 138 return timeout
139 140
141 -class OpExecCbBase: # pylint: disable=W0232
142 """Base class for OpCode execution callbacks. 143 144 """
145 - def NotifyStart(self):
146 """Called when we are about to execute the LU. 147 148 This function is called when we're about to start the lu's Exec() method, 149 that is, after we have acquired all locks. 150 151 """
152
153 - def Feedback(self, *args):
154 """Sends feedback from the LU code to the end-user. 155 156 """
157
158 - def CurrentPriority(self): # pylint: disable=R0201
159 """Returns current priority or C{None}. 160 161 """ 162 return None 163
164 - def SubmitManyJobs(self, jobs):
165 """Submits jobs for processing. 166 167 See L{jqueue.JobQueue.SubmitManyJobs}. 168 169 """ 170 raise NotImplementedError
171 172
173 -def _LUNameForOpName(opname):
174 """Computes the LU name for a given OpCode name. 175 176 """ 177 assert opname.startswith(_OP_PREFIX), \ 178 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 179 180 return _LU_PREFIX + opname[len(_OP_PREFIX):]
181 182
183 -def _ComputeDispatchTable():
184 """Computes the opcode-to-lu dispatch table. 185 186 """ 187 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 188 for op in opcodes.OP_MAPPING.values() 189 if op.WITH_LU)
190 191
192 -def _SetBaseOpParams(src, defcomment, dst):
193 """Copies basic opcode parameters. 194 195 @type src: L{opcodes.OpCode} 196 @param src: Source opcode 197 @type defcomment: string 198 @param defcomment: Comment to specify if not already given 199 @type dst: L{opcodes.OpCode} 200 @param dst: Destination opcode 201 202 """ 203 if hasattr(src, "debug_level"): 204 dst.debug_level = src.debug_level 205 206 if (getattr(dst, "priority", None) is None and 207 hasattr(src, "priority")): 208 dst.priority = src.priority 209 210 if not getattr(dst, opcodes.COMMENT_ATTR, None): 211 dst.comment = defcomment
212 213
214 -def _ProcessResult(submit_fn, op, result):
215 """Examines opcode result. 216 217 If necessary, additional processing on the result is done. 218 219 """ 220 if isinstance(result, cmdlib.ResultWithJobs): 221 # Copy basic parameters (e.g. priority) 222 map(compat.partial(_SetBaseOpParams, op, 223 "Submitted by %s" % op.OP_ID), 224 itertools.chain(*result.jobs)) 225 226 # Submit jobs 227 job_submission = submit_fn(result.jobs) 228 229 # Build dictionary 230 result = result.other 231 232 assert constants.JOB_IDS_KEY not in result, \ 233 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 234 235 result[constants.JOB_IDS_KEY] = job_submission 236 237 return result
238 239
240 -def _FailingSubmitManyJobs(_):
241 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 242 243 """ 244 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 245 " queries) can not submit jobs")
246 247
248 -def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST, 249 _nal_whitelist=_NODE_ALLOC_WHITELIST):
250 """Performs consistency checks on locks acquired by a logical unit. 251 252 @type lu: L{cmdlib.LogicalUnit} 253 @param lu: Logical unit instance 254 @type glm: L{locking.GanetiLockManager} 255 @param glm: Lock manager 256 257 """ 258 if not __debug__: 259 return 260 261 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL) 262 263 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: 264 # TODO: Verify using actual lock mode, not using LU variables 265 if level in lu.needed_locks: 266 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC] 267 share_level = lu.share_locks[level] 268 269 if lu.__class__ in _mode_whitelist: 270 assert share_node_alloc != share_level, \ 271 "LU is whitelisted to use different modes for node allocation lock" 272 else: 273 assert bool(share_node_alloc) == bool(share_level), \ 274 ("Node allocation lock must be acquired using the same mode as nodes" 275 " and node resources") 276 277 if lu.__class__ in _nal_whitelist: 278 assert not have_nal, \ 279 "LU is whitelisted for not acquiring the node allocation lock" 280 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level): 281 assert have_nal, \ 282 ("Node allocation lock must be used if an LU acquires all nodes" 283 " or node resources")
284 285
286 -class Processor(object):
287 """Object which runs OpCodes""" 288 DISPATCH_TABLE = _ComputeDispatchTable() 289
290 - def __init__(self, context, ec_id, enable_locks=True):
291 """Constructor for Processor 292 293 @type context: GanetiContext 294 @param context: global Ganeti context 295 @type ec_id: string 296 @param ec_id: execution context identifier 297 298 """ 299 self.context = context 300 self._ec_id = ec_id 301 self._cbs = None 302 self.rpc = context.rpc 303 self.hmclass = hooksmaster.HooksMaster 304 self._enable_locks = enable_locks
305
306 - def _CheckLocksEnabled(self):
307 """Checks if locking is enabled. 308 309 @raise errors.ProgrammerError: In case locking is not enabled 310 311 """ 312 if not self._enable_locks: 313 raise errors.ProgrammerError("Attempted to use disabled locks")
314
315 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
316 """Acquires locks via the Ganeti lock manager. 317 318 @type level: int 319 @param level: Lock level 320 @type names: list or string 321 @param names: Lock names 322 @type shared: bool 323 @param shared: Whether the locks should be acquired in shared mode 324 @type opportunistic: bool 325 @param opportunistic: Whether to acquire opportunistically 326 @type timeout: None or float 327 @param timeout: Timeout for acquiring the locks 328 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 329 amount of time 330 331 """ 332 self._CheckLocksEnabled() 333 334 if self._cbs: 335 priority = self._cbs.CurrentPriority() 336 else: 337 priority = None 338 339 acquired = self.context.glm.acquire(level, names, shared=shared, 340 timeout=timeout, priority=priority, 341 opportunistic=opportunistic) 342 343 if acquired is None: 344 raise LockAcquireTimeout() 345 346 return acquired
347
348 - def _ExecLU(self, lu):
349 """Logical Unit execution sequence. 350 351 """ 352 write_count = self.context.cfg.write_count 353 lu.CheckPrereq() 354 355 hm = self.BuildHooksManager(lu) 356 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 357 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 358 self.Log, None) 359 360 if getattr(lu.op, "dry_run", False): 361 # in this mode, no post-hooks are run, and the config is not 362 # written (as it might have been modified by another LU, and we 363 # shouldn't do writeout on behalf of other threads 364 self.LogInfo("dry-run mode requested, not actually executing" 365 " the operation") 366 return lu.dry_run_result 367 368 if self._cbs: 369 submit_mj_fn = self._cbs.SubmitManyJobs 370 else: 371 submit_mj_fn = _FailingSubmitManyJobs 372 373 try: 374 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 375 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 376 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 377 self.Log, result) 378 finally: 379 # FIXME: This needs locks if not lu_class.REQ_BGL 380 if write_count != self.context.cfg.write_count: 381 hm.RunConfigUpdate() 382 383 return result
384
385 - def BuildHooksManager(self, lu):
386 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
387
388 - def _LockAndExecLU(self, lu, level, calc_timeout):
389 """Execute a Logical Unit, with the needed locks. 390 391 This is a recursive function that starts locking the given level, and 392 proceeds up, till there are no more locks to acquire. Then it executes the 393 given LU and its opcodes. 394 395 """ 396 glm = self.context.glm 397 adding_locks = level in lu.add_locks 398 acquiring_locks = level in lu.needed_locks 399 400 if level not in locking.LEVELS: 401 _VerifyLocks(lu, glm) 402 403 if self._cbs: 404 self._cbs.NotifyStart() 405 406 try: 407 result = self._ExecLU(lu) 408 except AssertionError, err: 409 # this is a bit ugly, as we don't know from which phase 410 # (prereq, exec) this comes; but it's better than an exception 411 # with no information 412 (_, _, tb) = sys.exc_info() 413 err_info = traceback.format_tb(tb) 414 del tb 415 logging.exception("Detected AssertionError") 416 raise errors.OpExecError("Internal assertion error: please report" 417 " this as a bug.\nError message: '%s';" 418 " location:\n%s" % (str(err), err_info[-1])) 419 420 elif adding_locks and acquiring_locks: 421 # We could both acquire and add locks at the same level, but for now we 422 # don't need this, so we'll avoid the complicated code needed. 423 raise NotImplementedError("Can't declare locks to acquire when adding" 424 " others") 425 426 elif adding_locks or acquiring_locks: 427 self._CheckLocksEnabled() 428 429 lu.DeclareLocks(level) 430 share = lu.share_locks[level] 431 opportunistic = lu.opportunistic_locks[level] 432 433 try: 434 assert adding_locks ^ acquiring_locks, \ 435 "Locks must be either added or acquired" 436 437 if acquiring_locks: 438 # Acquiring locks 439 needed_locks = lu.needed_locks[level] 440 441 self._AcquireLocks(level, needed_locks, share, opportunistic, 442 calc_timeout()) 443 else: 444 # Adding locks 445 add_locks = lu.add_locks[level] 446 lu.remove_locks[level] = add_locks 447 448 try: 449 glm.add(level, add_locks, acquired=1, shared=share) 450 except errors.LockError: 451 logging.exception("Detected lock error in level %s for locks" 452 " %s, shared=%s", level, add_locks, share) 453 raise errors.OpPrereqError( 454 "Couldn't add locks (%s), most likely because of another" 455 " job who added them first" % add_locks, 456 errors.ECODE_NOTUNIQUE) 457 458 try: 459 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 460 finally: 461 if level in lu.remove_locks: 462 glm.remove(level, lu.remove_locks[level]) 463 finally: 464 if glm.is_owned(level): 465 glm.release(level) 466 467 else: 468 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 469 470 return result
471
472 - def ExecOpCode(self, op, cbs, timeout=None):
473 """Execute an opcode. 474 475 @type op: an OpCode instance 476 @param op: the opcode to be executed 477 @type cbs: L{OpExecCbBase} 478 @param cbs: Runtime callbacks 479 @type timeout: float or None 480 @param timeout: Maximum time to acquire all locks, None for no timeout 481 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 482 amount of time 483 484 """ 485 if not isinstance(op, opcodes.OpCode): 486 raise errors.ProgrammerError("Non-opcode instance passed" 487 " to ExecOpcode (%s)" % type(op)) 488 489 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 490 if lu_class is None: 491 raise errors.OpCodeUnknown("Unknown opcode") 492 493 if timeout is None: 494 calc_timeout = lambda: None 495 else: 496 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 497 498 self._cbs = cbs 499 try: 500 if self._enable_locks: 501 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 502 # and in a shared fashion otherwise (to prevent concurrent run with 503 # an exclusive LU. 504 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 505 not lu_class.REQ_BGL, False, calc_timeout()) 506 elif lu_class.REQ_BGL: 507 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 508 " disabled" % op.OP_ID) 509 510 try: 511 lu = lu_class(self, op, self.context, self.rpc) 512 lu.ExpandNames() 513 assert lu.needed_locks is not None, "needed_locks not set by LU" 514 515 try: 516 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, 517 calc_timeout) 518 finally: 519 if self._ec_id: 520 self.context.cfg.DropECReservations(self._ec_id) 521 finally: 522 # Release BGL if owned 523 if self.context.glm.is_owned(locking.LEVEL_CLUSTER): 524 assert self._enable_locks 525 self.context.glm.release(locking.LEVEL_CLUSTER) 526 finally: 527 self._cbs = None 528 529 resultcheck_fn = op.OP_RESULT 530 if not (resultcheck_fn is None or resultcheck_fn(result)): 531 logging.error("Expected opcode result matching %s, got %s", 532 resultcheck_fn, result) 533 if not getattr(op, "dry_run", False): 534 # FIXME: LUs should still behave in dry_run mode, or 535 # alternately we should have OP_DRYRUN_RESULT; in the 536 # meantime, we simply skip the OP_RESULT check in dry-run mode 537 raise errors.OpResultError("Opcode result does not match %s: %s" % 538 (resultcheck_fn, utils.Truncate(result, 80))) 539 540 return result
541
542 - def Log(self, *args):
543 """Forward call to feedback callback function. 544 545 """ 546 if self._cbs: 547 self._cbs.Feedback(*args)
548
549 - def LogStep(self, current, total, message):
550 """Log a change in LU execution progress. 551 552 """ 553 logging.debug("Step %d/%d %s", current, total, message) 554 self.Log("STEP %d/%d %s" % (current, total, message))
555
556 - def LogWarning(self, message, *args, **kwargs):
557 """Log a warning to the logs and the user. 558 559 The optional keyword argument is 'hint' and can be used to show a 560 hint to the user (presumably related to the warning). If the 561 message is empty, it will not be printed at all, allowing one to 562 show only a hint. 563 564 """ 565 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 566 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 567 if args: 568 message = message % tuple(args) 569 if message: 570 logging.warning(message) 571 self.Log(" - WARNING: %s" % message) 572 if "hint" in kwargs: 573 self.Log(" Hint: %s" % kwargs["hint"])
574
575 - def LogInfo(self, message, *args):
576 """Log an informational message to the logs and the user. 577 578 """ 579 if args: 580 message = message % tuple(args) 581 logging.info(message) 582 self.Log(" - INFO: %s" % message)
583
584 - def GetECId(self):
585 """Returns the current execution context ID. 586 587 """ 588 if not self._ec_id: 589 raise errors.ProgrammerError("Tried to use execution context id when" 590 " not set") 591 return self._ec_id
592