Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module implementing the logic behind the cluster operations 
 23   
 24  This module implements the logic for doing operations in the cluster. There 
 25  are two kinds of classes defined: 
 26    - logical units, which know how to deal with their specific opcode only 
 27    - the processor, which dispatches the opcodes to their logical units 
 28   
 29  """ 
 30   
 31  import logging 
 32  import random 
 33  import time 
 34   
 35  from ganeti import opcodes 
 36  from ganeti import constants 
 37  from ganeti import errors 
 38  from ganeti import rpc 
 39  from ganeti import cmdlib 
 40  from ganeti import locking 
 41   
 42   
43 -class _LockAcquireTimeout(Exception):
44 """Internal exception to report timeouts on acquiring locks. 45 46 """
47 48
49 -def _CalculateLockAttemptTimeouts():
50 """Calculate timeouts for lock attempts. 51 52 """ 53 result = [1.0] 54 55 # Wait for a total of at least 150s before doing a blocking acquire 56 while sum(result) < 150.0: 57 timeout = (result[-1] * 1.05) ** 1.25 58 59 # Cap timeout at 10 seconds. This gives other jobs a chance to run 60 # even if we're still trying to get our locks, before finally moving 61 # to a blocking acquire. 62 if timeout > 10.0: 63 timeout = 10.0 64 65 elif timeout < 0.1: 66 # Lower boundary for safety 67 timeout = 0.1 68 69 result.append(timeout) 70 71 return result
72 73
74 -class _LockAttemptTimeoutStrategy(object):
75 """Class with lock acquire timeout strategy. 76 77 """ 78 __slots__ = [ 79 "_attempt", 80 "_random_fn", 81 "_start_time", 82 "_time_fn", 83 "_running_timeout", 84 ] 85 86 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 87
88 - def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
89 """Initializes this class. 90 91 @type attempt: int 92 @param attempt: Current attempt number 93 @param _time_fn: Time function for unittests 94 @param _random_fn: Random number generator for unittests 95 96 """ 97 object.__init__(self) 98 99 if attempt < 0: 100 raise ValueError("Attempt must be zero or positive") 101 102 self._attempt = attempt 103 self._time_fn = _time_fn 104 self._random_fn = _random_fn 105 106 try: 107 timeout = self._TIMEOUT_PER_ATTEMPT[attempt] 108 except IndexError: 109 # No more timeouts, do blocking acquire 110 timeout = None 111 112 self._running_timeout = locking.RunningTimeout(timeout, False, 113 _time_fn=_time_fn)
114
115 - def NextAttempt(self):
116 """Returns the strategy for the next attempt. 117 118 """ 119 return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1, 120 _time_fn=self._time_fn, 121 _random_fn=self._random_fn)
122
123 - def CalcRemainingTimeout(self):
124 """Returns the remaining timeout. 125 126 """ 127 timeout = self._running_timeout.Remaining() 128 129 if timeout is not None: 130 # Add a small variation (-/+ 5%) to timeout. This helps in situations 131 # where two or more jobs are fighting for the same lock(s). 132 variation_range = timeout * 0.1 133 timeout += ((self._random_fn() * variation_range) - 134 (variation_range * 0.5)) 135 136 return timeout
137 138
139 -class OpExecCbBase: # pylint: disable-msg=W0232
140 """Base class for OpCode execution callbacks. 141 142 """
143 - def NotifyStart(self):
144 """Called when we are about to execute the LU. 145 146 This function is called when we're about to start the lu's Exec() method, 147 that is, after we have acquired all locks. 148 149 """
150
151 - def Feedback(self, *args):
152 """Sends feedback from the LU code to the end-user. 153 154 """
155
156 - def CheckCancel(self):
157 """Check whether job has been cancelled. 158 159 """
160 161
162 -class Processor(object):
163 """Object which runs OpCodes""" 164 DISPATCH_TABLE = { 165 # Cluster 166 opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster, 167 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster, 168 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo, 169 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster, 170 opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues, 171 opcodes.OpRenameCluster: cmdlib.LURenameCluster, 172 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks, 173 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams, 174 opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig, 175 opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes, 176 # node lu 177 opcodes.OpAddNode: cmdlib.LUAddNode, 178 opcodes.OpQueryNodes: cmdlib.LUQueryNodes, 179 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes, 180 opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage, 181 opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage, 182 opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage, 183 opcodes.OpRemoveNode: cmdlib.LURemoveNode, 184 opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams, 185 opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode, 186 opcodes.OpMigrateNode: cmdlib.LUMigrateNode, 187 opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy, 188 # instance lu 189 opcodes.OpCreateInstance: cmdlib.LUCreateInstance, 190 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance, 191 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance, 192 opcodes.OpRenameInstance: cmdlib.LURenameInstance, 193 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks, 194 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance, 195 opcodes.OpStartupInstance: cmdlib.LUStartupInstance, 196 opcodes.OpRebootInstance: cmdlib.LURebootInstance, 197 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks, 198 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks, 199 opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks, 200 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance, 201 opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance, 202 opcodes.OpMoveInstance: cmdlib.LUMoveInstance, 203 opcodes.OpConnectConsole: cmdlib.LUConnectConsole, 204 opcodes.OpQueryInstances: cmdlib.LUQueryInstances, 205 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData, 206 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams, 207 opcodes.OpGrowDisk: cmdlib.LUGrowDisk, 208 # os lu 209 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS, 210 # exports lu 211 opcodes.OpQueryExports: cmdlib.LUQueryExports, 212 opcodes.OpPrepareExport: cmdlib.LUPrepareExport, 213 opcodes.OpExportInstance: cmdlib.LUExportInstance, 214 opcodes.OpRemoveExport: cmdlib.LURemoveExport, 215 # tags lu 216 opcodes.OpGetTags: cmdlib.LUGetTags, 217 opcodes.OpSearchTags: cmdlib.LUSearchTags, 218 opcodes.OpAddTags: cmdlib.LUAddTags, 219 opcodes.OpDelTags: cmdlib.LUDelTags, 220 # test lu 221 opcodes.OpTestDelay: cmdlib.LUTestDelay, 222 opcodes.OpTestAllocator: cmdlib.LUTestAllocator, 223 opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue, 224 } 225
226 - def __init__(self, context, ec_id):
227 """Constructor for Processor 228 229 @type context: GanetiContext 230 @param context: global Ganeti context 231 @type ec_id: string 232 @param ec_id: execution context identifier 233 234 """ 235 self.context = context 236 self._ec_id = ec_id 237 self._cbs = None 238 self.rpc = rpc.RpcRunner(context.cfg) 239 self.hmclass = HooksMaster
240
241 - def _AcquireLocks(self, level, names, shared, timeout):
242 """Acquires locks via the Ganeti lock manager. 243 244 @type level: int 245 @param level: Lock level 246 @type names: list or string 247 @param names: Lock names 248 @type shared: bool 249 @param shared: Whether the locks should be acquired in shared mode 250 @type timeout: None or float 251 @param timeout: Timeout for acquiring the locks 252 253 """ 254 if self._cbs: 255 self._cbs.CheckCancel() 256 257 acquired = self.context.glm.acquire(level, names, shared=shared, 258 timeout=timeout) 259 260 return acquired
261
262 - def _ExecLU(self, lu):
263 """Logical Unit execution sequence. 264 265 """ 266 write_count = self.context.cfg.write_count 267 lu.CheckPrereq() 268 hm = HooksMaster(self.rpc.call_hooks_runner, lu) 269 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 270 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 271 self.Log, None) 272 273 if getattr(lu.op, "dry_run", False): 274 # in this mode, no post-hooks are run, and the config is not 275 # written (as it might have been modified by another LU, and we 276 # shouldn't do writeout on behalf of other threads 277 self.LogInfo("dry-run mode requested, not actually executing" 278 " the operation") 279 return lu.dry_run_result 280 281 try: 282 result = lu.Exec(self.Log) 283 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 284 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 285 self.Log, result) 286 finally: 287 # FIXME: This needs locks if not lu_class.REQ_BGL 288 if write_count != self.context.cfg.write_count: 289 hm.RunConfigUpdate() 290 291 return result
292
293 - def _LockAndExecLU(self, lu, level, calc_timeout):
294 """Execute a Logical Unit, with the needed locks. 295 296 This is a recursive function that starts locking the given level, and 297 proceeds up, till there are no more locks to acquire. Then it executes the 298 given LU and its opcodes. 299 300 """ 301 adding_locks = level in lu.add_locks 302 acquiring_locks = level in lu.needed_locks 303 if level not in locking.LEVELS: 304 if self._cbs: 305 self._cbs.NotifyStart() 306 307 result = self._ExecLU(lu) 308 309 elif adding_locks and acquiring_locks: 310 # We could both acquire and add locks at the same level, but for now we 311 # don't need this, so we'll avoid the complicated code needed. 312 raise NotImplementedError("Can't declare locks to acquire when adding" 313 " others") 314 315 elif adding_locks or acquiring_locks: 316 lu.DeclareLocks(level) 317 share = lu.share_locks[level] 318 319 try: 320 assert adding_locks ^ acquiring_locks, \ 321 "Locks must be either added or acquired" 322 323 if acquiring_locks: 324 # Acquiring locks 325 needed_locks = lu.needed_locks[level] 326 327 acquired = self._AcquireLocks(level, needed_locks, share, 328 calc_timeout()) 329 330 if acquired is None: 331 raise _LockAcquireTimeout() 332 333 else: 334 # Adding locks 335 add_locks = lu.add_locks[level] 336 lu.remove_locks[level] = add_locks 337 338 try: 339 self.context.glm.add(level, add_locks, acquired=1, shared=share) 340 except errors.LockError: 341 raise errors.OpPrereqError( 342 "Couldn't add locks (%s), probably because of a race condition" 343 " with another job, who added them first" % add_locks, 344 errors.ECODE_FAULT) 345 346 acquired = add_locks 347 348 try: 349 lu.acquired_locks[level] = acquired 350 351 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 352 finally: 353 if level in lu.remove_locks: 354 self.context.glm.remove(level, lu.remove_locks[level]) 355 finally: 356 if self.context.glm.is_owned(level): 357 self.context.glm.release(level) 358 359 else: 360 result = self._LockAndExecLU(lu, level + 1, calc_timeout) 361 362 return result
363
364 - def ExecOpCode(self, op, cbs):
365 """Execute an opcode. 366 367 @type op: an OpCode instance 368 @param op: the opcode to be executed 369 @type cbs: L{OpExecCbBase} 370 @param cbs: Runtime callbacks 371 372 """ 373 if not isinstance(op, opcodes.OpCode): 374 raise errors.ProgrammerError("Non-opcode instance passed" 375 " to ExecOpcode") 376 377 self._cbs = cbs 378 try: 379 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 380 if lu_class is None: 381 raise errors.OpCodeUnknown("Unknown opcode") 382 383 timeout_strategy = _LockAttemptTimeoutStrategy() 384 385 while True: 386 try: 387 acquire_timeout = timeout_strategy.CalcRemainingTimeout() 388 389 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 390 # and in a shared fashion otherwise (to prevent concurrent run with 391 # an exclusive LU. 392 if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 393 not lu_class.REQ_BGL, acquire_timeout) is None: 394 raise _LockAcquireTimeout() 395 396 try: 397 lu = lu_class(self, op, self.context, self.rpc) 398 lu.ExpandNames() 399 assert lu.needed_locks is not None, "needed_locks not set by LU" 400 401 try: 402 return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, 403 timeout_strategy.CalcRemainingTimeout) 404 finally: 405 if self._ec_id: 406 self.context.cfg.DropECReservations(self._ec_id) 407 408 finally: 409 self.context.glm.release(locking.LEVEL_CLUSTER) 410 411 except _LockAcquireTimeout: 412 # Timeout while waiting for lock, try again 413 pass 414 415 timeout_strategy = timeout_strategy.NextAttempt() 416 417 finally: 418 self._cbs = None
419
420 - def Log(self, *args):
421 """Forward call to feedback callback function. 422 423 """ 424 if self._cbs: 425 self._cbs.Feedback(*args)
426
427 - def LogStep(self, current, total, message):
428 """Log a change in LU execution progress. 429 430 """ 431 logging.debug("Step %d/%d %s", current, total, message) 432 self.Log("STEP %d/%d %s" % (current, total, message))
433
434 - def LogWarning(self, message, *args, **kwargs):
435 """Log a warning to the logs and the user. 436 437 The optional keyword argument is 'hint' and can be used to show a 438 hint to the user (presumably related to the warning). If the 439 message is empty, it will not be printed at all, allowing one to 440 show only a hint. 441 442 """ 443 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 444 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 445 if args: 446 message = message % tuple(args) 447 if message: 448 logging.warning(message) 449 self.Log(" - WARNING: %s" % message) 450 if "hint" in kwargs: 451 self.Log(" Hint: %s" % kwargs["hint"])
452
453 - def LogInfo(self, message, *args):
454 """Log an informational message to the logs and the user. 455 456 """ 457 if args: 458 message = message % tuple(args) 459 logging.info(message) 460 self.Log(" - INFO: %s" % message)
461
462 - def GetECId(self):
463 if not self._ec_id: 464 errors.ProgrammerError("Tried to use execution context id when not set") 465 return self._ec_id
466 467
468 -class HooksMaster(object):
469 """Hooks master. 470 471 This class distributes the run commands to the nodes based on the 472 specific LU class. 473 474 In order to remove the direct dependency on the rpc module, the 475 constructor needs a function which actually does the remote 476 call. This will usually be rpc.call_hooks_runner, but any function 477 which behaves the same works. 478 479 """
480 - def __init__(self, callfn, lu):
481 self.callfn = callfn 482 self.lu = lu 483 self.op = lu.op 484 self.env, node_list_pre, node_list_post = self._BuildEnv() 485 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre, 486 constants.HOOKS_PHASE_POST: node_list_post}
487
488 - def _BuildEnv(self):
489 """Compute the environment and the target nodes. 490 491 Based on the opcode and the current node list, this builds the 492 environment for the hooks and the target node list for the run. 493 494 """ 495 env = { 496 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin", 497 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION, 498 "GANETI_OP_CODE": self.op.OP_ID, 499 "GANETI_OBJECT_TYPE": self.lu.HTYPE, 500 "GANETI_DATA_DIR": constants.DATA_DIR, 501 } 502 503 if self.lu.HPATH is not None: 504 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv() 505 if lu_env: 506 for key in lu_env: 507 env["GANETI_" + key] = lu_env[key] 508 else: 509 lu_nodes_pre = lu_nodes_post = [] 510 511 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
512
513 - def _RunWrapper(self, node_list, hpath, phase):
514 """Simple wrapper over self.callfn. 515 516 This method fixes the environment before doing the rpc call. 517 518 """ 519 env = self.env.copy() 520 env["GANETI_HOOKS_PHASE"] = phase 521 env["GANETI_HOOKS_PATH"] = hpath 522 if self.lu.cfg is not None: 523 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName() 524 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode() 525 526 env = dict([(str(key), str(val)) for key, val in env.iteritems()]) 527 528 return self.callfn(node_list, hpath, phase, env)
529
530 - def RunPhase(self, phase, nodes=None):
531 """Run all the scripts for a phase. 532 533 This is the main function of the HookMaster. 534 535 @param phase: one of L{constants.HOOKS_PHASE_POST} or 536 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 537 @param nodes: overrides the predefined list of nodes for the given phase 538 @return: the processed results of the hooks multi-node rpc call 539 @raise errors.HooksFailure: on communication failure to the nodes 540 @raise errors.HooksAbort: on failure of one of the hooks 541 542 """ 543 if not self.node_list[phase] and not nodes: 544 # empty node list, we should not attempt to run this as either 545 # we're in the cluster init phase and the rpc client part can't 546 # even attempt to run, or this LU doesn't do hooks at all 547 return 548 hpath = self.lu.HPATH 549 if nodes is not None: 550 results = self._RunWrapper(nodes, hpath, phase) 551 else: 552 results = self._RunWrapper(self.node_list[phase], hpath, phase) 553 errs = [] 554 if not results: 555 msg = "Communication Failure" 556 if phase == constants.HOOKS_PHASE_PRE: 557 raise errors.HooksFailure(msg) 558 else: 559 self.lu.LogWarning(msg) 560 return results 561 for node_name in results: 562 res = results[node_name] 563 if res.offline: 564 continue 565 msg = res.fail_msg 566 if msg: 567 self.lu.LogWarning("Communication failure to node %s: %s", 568 node_name, msg) 569 continue 570 for script, hkr, output in res.payload: 571 if hkr == constants.HKR_FAIL: 572 if phase == constants.HOOKS_PHASE_PRE: 573 errs.append((node_name, script, output)) 574 else: 575 if not output: 576 output = "(no output)" 577 self.lu.LogWarning("On %s script %s failed, output: %s" % 578 (node_name, script, output)) 579 if errs and phase == constants.HOOKS_PHASE_PRE: 580 raise errors.HooksAbort(errs) 581 return results
582
583 - def RunConfigUpdate(self):
584 """Run the special configuration update hook 585 586 This is a special hook that runs only on the master after each 587 top-level LI if the configuration has been updated. 588 589 """ 590 phase = constants.HOOKS_PHASE_POST 591 hpath = constants.HOOKS_NAME_CFGUPDATE 592 nodes = [self.lu.cfg.GetMasterNode()] 593 self._RunWrapper(nodes, hpath, phase)
594