Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module implementing the logic behind the cluster operations 
 23   
 24  This module implements the logic for doing operations in the cluster. There 
 25  are two kinds of classes defined: 
 26    - logical units, which know how to deal with their specific opcode only 
 27    - the processor, which dispatches the opcodes to their logical units 
 28   
 29  """ 
 30   
 31  import logging 
 32  import random 
 33  import time 
 34   
 35  from ganeti import opcodes 
 36  from ganeti import constants 
 37  from ganeti import errors 
 38  from ganeti import rpc 
 39  from ganeti import cmdlib 
 40  from ganeti import locking 
 41  from ganeti import utils 
 42   
 43   
 44  _OP_PREFIX = "Op" 
 45  _LU_PREFIX = "LU" 
 46   
 47   
48 -class LockAcquireTimeout(Exception):
49 """Exception to report timeouts on acquiring locks. 50 51 """
52 53
54 -def _CalculateLockAttemptTimeouts():
55 """Calculate timeouts for lock attempts. 56 57 """ 58 result = [constants.LOCK_ATTEMPTS_MINWAIT] 59 running_sum = result[0] 60 61 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 62 # blocking acquire 63 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 64 timeout = (result[-1] * 1.05) ** 1.25 65 66 # Cap max timeout. This gives other jobs a chance to run even if 67 # we're still trying to get our locks, before finally moving to a 68 # blocking acquire. 69 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 70 # And also cap the lower boundary for safety 71 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 72 73 result.append(timeout) 74 running_sum += timeout 75 76 return result
77 78
79 -class LockAttemptTimeoutStrategy(object):
80 """Class with lock acquire timeout strategy. 81 82 """ 83 __slots__ = [ 84 "_timeouts", 85 "_random_fn", 86 "_time_fn", 87 ] 88 89 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 90
91 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
92 """Initializes this class. 93 94 @param _time_fn: Time function for unittests 95 @param _random_fn: Random number generator for unittests 96 97 """ 98 object.__init__(self) 99 100 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 101 self._time_fn = _time_fn 102 self._random_fn = _random_fn
103
104 - def NextAttempt(self):
105 """Returns the timeout for the next attempt. 106 107 """ 108 try: 109 timeout = self._timeouts.next() 110 except StopIteration: 111 # No more timeouts, do blocking acquire 112 timeout = None 113 114 if timeout is not None: 115 # Add a small variation (-/+ 5%) to timeout. This helps in situations 116 # where two or more jobs are fighting for the same lock(s). 117 variation_range = timeout * 0.1 118 timeout += ((self._random_fn() * variation_range) - 119 (variation_range * 0.5)) 120 121 return timeout
122 123
124 -class OpExecCbBase: # pylint: disable-msg=W0232
125 """Base class for OpCode execution callbacks. 126 127 """
128 - def NotifyStart(self):
129 """Called when we are about to execute the LU. 130 131 This function is called when we're about to start the lu's Exec() method, 132 that is, after we have acquired all locks. 133 134 """
135
136 - def Feedback(self, *args):
137 """Sends feedback from the LU code to the end-user. 138 139 """
140
141 - def CheckCancel(self):
142 """Check whether job has been cancelled. 143 144 """
145 146
147 -def _LUNameForOpName(opname):
148 """Computes the LU name for a given OpCode name. 149 150 """ 151 assert opname.startswith(_OP_PREFIX), \ 152 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 153 154 return _LU_PREFIX + opname[len(_OP_PREFIX):]
155 156
157 -def _ComputeDispatchTable():
158 """Computes the opcode-to-lu dispatch table. 159 160 """ 161 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 162 for op in opcodes.OP_MAPPING.values() 163 if op.WITH_LU)
164 165
166 -class Processor(object):
167 """Object which runs OpCodes""" 168 DISPATCH_TABLE = _ComputeDispatchTable() 169
170 - def __init__(self, context, ec_id):
171 """Constructor for Processor 172 173 @type context: GanetiContext 174 @param context: global Ganeti context 175 @type ec_id: string 176 @param ec_id: execution context identifier 177 178 """ 179 self.context = context 180 self._ec_id = ec_id 181 self._cbs = None 182 self.rpc = rpc.RpcRunner(context.cfg) 183 self.hmclass = HooksMaster
184
185 - def _AcquireLocks(self, level, names, shared, timeout, priority):
186 """Acquires locks via the Ganeti lock manager. 187 188 @type level: int 189 @param level: Lock level 190 @type names: list or string 191 @param names: Lock names 192 @type shared: bool 193 @param shared: Whether the locks should be acquired in shared mode 194 @type timeout: None or float 195 @param timeout: Timeout for acquiring the locks 196 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 197 amount of time 198 199 """ 200 if self._cbs: 201 self._cbs.CheckCancel() 202 203 acquired = self.context.glm.acquire(level, names, shared=shared, 204 timeout=timeout, priority=priority) 205 206 if acquired is None: 207 raise LockAcquireTimeout() 208 209 return acquired
210
211 - def _ExecLU(self, lu):
212 """Logical Unit execution sequence. 213 214 """ 215 write_count = self.context.cfg.write_count 216 lu.CheckPrereq() 217 hm = HooksMaster(self.rpc.call_hooks_runner, lu) 218 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 219 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 220 self.Log, None) 221 222 if getattr(lu.op, "dry_run", False): 223 # in this mode, no post-hooks are run, and the config is not 224 # written (as it might have been modified by another LU, and we 225 # shouldn't do writeout on behalf of other threads 226 self.LogInfo("dry-run mode requested, not actually executing" 227 " the operation") 228 return lu.dry_run_result 229 230 try: 231 result = lu.Exec(self.Log) 232 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 233 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 234 self.Log, result) 235 finally: 236 # FIXME: This needs locks if not lu_class.REQ_BGL 237 if write_count != self.context.cfg.write_count: 238 hm.RunConfigUpdate() 239 240 return result
241
242 - def _LockAndExecLU(self, lu, level, calc_timeout, priority):
243 """Execute a Logical Unit, with the needed locks. 244 245 This is a recursive function that starts locking the given level, and 246 proceeds up, till there are no more locks to acquire. Then it executes the 247 given LU and its opcodes. 248 249 """ 250 adding_locks = level in lu.add_locks 251 acquiring_locks = level in lu.needed_locks 252 if level not in locking.LEVELS: 253 if self._cbs: 254 self._cbs.NotifyStart() 255 256 result = self._ExecLU(lu) 257 258 elif adding_locks and acquiring_locks: 259 # We could both acquire and add locks at the same level, but for now we 260 # don't need this, so we'll avoid the complicated code needed. 261 raise NotImplementedError("Can't declare locks to acquire when adding" 262 " others") 263 264 elif adding_locks or acquiring_locks: 265 lu.DeclareLocks(level) 266 share = lu.share_locks[level] 267 268 try: 269 assert adding_locks ^ acquiring_locks, \ 270 "Locks must be either added or acquired" 271 272 if acquiring_locks: 273 # Acquiring locks 274 needed_locks = lu.needed_locks[level] 275 276 acquired = self._AcquireLocks(level, needed_locks, share, 277 calc_timeout(), priority) 278 else: 279 # Adding locks 280 add_locks = lu.add_locks[level] 281 lu.remove_locks[level] = add_locks 282 283 try: 284 self.context.glm.add(level, add_locks, acquired=1, shared=share) 285 except errors.LockError: 286 raise errors.OpPrereqError( 287 "Couldn't add locks (%s), probably because of a race condition" 288 " with another job, who added them first" % add_locks, 289 errors.ECODE_FAULT) 290 291 acquired = add_locks 292 293 try: 294 lu.acquired_locks[level] = acquired 295 296 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) 297 finally: 298 if level in lu.remove_locks: 299 self.context.glm.remove(level, lu.remove_locks[level]) 300 finally: 301 if self.context.glm.is_owned(level): 302 self.context.glm.release(level) 303 304 else: 305 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) 306 307 return result
308
309 - def ExecOpCode(self, op, cbs, timeout=None, priority=None):
310 """Execute an opcode. 311 312 @type op: an OpCode instance 313 @param op: the opcode to be executed 314 @type cbs: L{OpExecCbBase} 315 @param cbs: Runtime callbacks 316 @type timeout: float or None 317 @param timeout: Maximum time to acquire all locks, None for no timeout 318 @type priority: number or None 319 @param priority: Priority for acquiring lock(s) 320 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 321 amount of time 322 323 """ 324 if not isinstance(op, opcodes.OpCode): 325 raise errors.ProgrammerError("Non-opcode instance passed" 326 " to ExecOpcode") 327 328 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 329 if lu_class is None: 330 raise errors.OpCodeUnknown("Unknown opcode") 331 332 if timeout is None: 333 calc_timeout = lambda: None 334 else: 335 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 336 337 self._cbs = cbs 338 try: 339 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 340 # and in a shared fashion otherwise (to prevent concurrent run with 341 # an exclusive LU. 342 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 343 not lu_class.REQ_BGL, calc_timeout(), 344 priority) 345 try: 346 lu = lu_class(self, op, self.context, self.rpc) 347 lu.ExpandNames() 348 assert lu.needed_locks is not None, "needed_locks not set by LU" 349 350 try: 351 return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, 352 priority) 353 finally: 354 if self._ec_id: 355 self.context.cfg.DropECReservations(self._ec_id) 356 finally: 357 self.context.glm.release(locking.LEVEL_CLUSTER) 358 finally: 359 self._cbs = None
360
361 - def Log(self, *args):
362 """Forward call to feedback callback function. 363 364 """ 365 if self._cbs: 366 self._cbs.Feedback(*args)
367
368 - def LogStep(self, current, total, message):
369 """Log a change in LU execution progress. 370 371 """ 372 logging.debug("Step %d/%d %s", current, total, message) 373 self.Log("STEP %d/%d %s" % (current, total, message))
374
375 - def LogWarning(self, message, *args, **kwargs):
376 """Log a warning to the logs and the user. 377 378 The optional keyword argument is 'hint' and can be used to show a 379 hint to the user (presumably related to the warning). If the 380 message is empty, it will not be printed at all, allowing one to 381 show only a hint. 382 383 """ 384 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 385 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 386 if args: 387 message = message % tuple(args) 388 if message: 389 logging.warning(message) 390 self.Log(" - WARNING: %s" % message) 391 if "hint" in kwargs: 392 self.Log(" Hint: %s" % kwargs["hint"])
393
394 - def LogInfo(self, message, *args):
395 """Log an informational message to the logs and the user. 396 397 """ 398 if args: 399 message = message % tuple(args) 400 logging.info(message) 401 self.Log(" - INFO: %s" % message)
402
403 - def GetECId(self):
404 """Returns the current execution context ID. 405 406 """ 407 if not self._ec_id: 408 raise errors.ProgrammerError("Tried to use execution context id when" 409 " not set") 410 return self._ec_id
411 412
413 -class HooksMaster(object):
414 """Hooks master. 415 416 This class distributes the run commands to the nodes based on the 417 specific LU class. 418 419 In order to remove the direct dependency on the rpc module, the 420 constructor needs a function which actually does the remote 421 call. This will usually be rpc.call_hooks_runner, but any function 422 which behaves the same works. 423 424 """
425 - def __init__(self, callfn, lu):
426 self.callfn = callfn 427 self.lu = lu 428 self.op = lu.op 429 self.env, node_list_pre, node_list_post = self._BuildEnv() 430 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre, 431 constants.HOOKS_PHASE_POST: node_list_post}
432
433 - def _BuildEnv(self):
434 """Compute the environment and the target nodes. 435 436 Based on the opcode and the current node list, this builds the 437 environment for the hooks and the target node list for the run. 438 439 """ 440 env = { 441 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin", 442 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION, 443 "GANETI_OP_CODE": self.op.OP_ID, 444 "GANETI_OBJECT_TYPE": self.lu.HTYPE, 445 "GANETI_DATA_DIR": constants.DATA_DIR, 446 } 447 448 if self.lu.HPATH is not None: 449 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv() 450 if lu_env: 451 for key in lu_env: 452 env["GANETI_" + key] = lu_env[key] 453 else: 454 lu_nodes_pre = lu_nodes_post = [] 455 456 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
457
458 - def _RunWrapper(self, node_list, hpath, phase):
459 """Simple wrapper over self.callfn. 460 461 This method fixes the environment before doing the rpc call. 462 463 """ 464 env = self.env.copy() 465 env["GANETI_HOOKS_PHASE"] = phase 466 env["GANETI_HOOKS_PATH"] = hpath 467 if self.lu.cfg is not None: 468 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName() 469 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode() 470 471 env = dict([(str(key), str(val)) for key, val in env.iteritems()]) 472 473 return self.callfn(node_list, hpath, phase, env)
474
475 - def RunPhase(self, phase, nodes=None):
476 """Run all the scripts for a phase. 477 478 This is the main function of the HookMaster. 479 480 @param phase: one of L{constants.HOOKS_PHASE_POST} or 481 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 482 @param nodes: overrides the predefined list of nodes for the given phase 483 @return: the processed results of the hooks multi-node rpc call 484 @raise errors.HooksFailure: on communication failure to the nodes 485 @raise errors.HooksAbort: on failure of one of the hooks 486 487 """ 488 if not self.node_list[phase] and not nodes: 489 # empty node list, we should not attempt to run this as either 490 # we're in the cluster init phase and the rpc client part can't 491 # even attempt to run, or this LU doesn't do hooks at all 492 return 493 hpath = self.lu.HPATH 494 if nodes is not None: 495 results = self._RunWrapper(nodes, hpath, phase) 496 else: 497 results = self._RunWrapper(self.node_list[phase], hpath, phase) 498 errs = [] 499 if not results: 500 msg = "Communication Failure" 501 if phase == constants.HOOKS_PHASE_PRE: 502 raise errors.HooksFailure(msg) 503 else: 504 self.lu.LogWarning(msg) 505 return results 506 for node_name in results: 507 res = results[node_name] 508 if res.offline: 509 continue 510 msg = res.fail_msg 511 if msg: 512 self.lu.LogWarning("Communication failure to node %s: %s", 513 node_name, msg) 514 continue 515 for script, hkr, output in res.payload: 516 if hkr == constants.HKR_FAIL: 517 if phase == constants.HOOKS_PHASE_PRE: 518 errs.append((node_name, script, output)) 519 else: 520 if not output: 521 output = "(no output)" 522 self.lu.LogWarning("On %s script %s failed, output: %s" % 523 (node_name, script, output)) 524 if errs and phase == constants.HOOKS_PHASE_PRE: 525 raise errors.HooksAbort(errs) 526 return results
527
528 - def RunConfigUpdate(self):
529 """Run the special configuration update hook 530 531 This is a special hook that runs only on the master after each 532 top-level LI if the configuration has been updated. 533 534 """ 535 phase = constants.HOOKS_PHASE_POST 536 hpath = constants.HOOKS_NAME_CFGUPDATE 537 nodes = [self.lu.cfg.GetMasterNode()] 538 self._RunWrapper(nodes, hpath, phase)
539