Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module implementing the logic behind the cluster operations 
 23   
 24  This module implements the logic for doing operations in the cluster. There 
 25  are two kinds of classes defined: 
 26    - logical units, which know how to deal with their specific opcode only 
 27    - the processor, which dispatches the opcodes to their logical units 
 28   
 29  """ 
 30   
 31  import logging 
 32  import random 
 33  import time 
 34   
 35  from ganeti import opcodes 
 36  from ganeti import constants 
 37  from ganeti import errors 
 38  from ganeti import rpc 
 39  from ganeti import cmdlib 
 40  from ganeti import locking 
 41  from ganeti import utils 
 42  from ganeti import compat 
 43   
 44   
 45  _OP_PREFIX = "Op" 
 46  _LU_PREFIX = "LU" 
 47   
 48   
49 -class LockAcquireTimeout(Exception):
50 """Exception to report timeouts on acquiring locks. 51 52 """
53 54
55 -def _CalculateLockAttemptTimeouts():
56 """Calculate timeouts for lock attempts. 57 58 """ 59 result = [constants.LOCK_ATTEMPTS_MINWAIT] 60 running_sum = result[0] 61 62 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 63 # blocking acquire 64 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 65 timeout = (result[-1] * 1.05) ** 1.25 66 67 # Cap max timeout. This gives other jobs a chance to run even if 68 # we're still trying to get our locks, before finally moving to a 69 # blocking acquire. 70 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 71 # And also cap the lower boundary for safety 72 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 73 74 result.append(timeout) 75 running_sum += timeout 76 77 return result
78 79
80 -class LockAttemptTimeoutStrategy(object):
81 """Class with lock acquire timeout strategy. 82 83 """ 84 __slots__ = [ 85 "_timeouts", 86 "_random_fn", 87 "_time_fn", 88 ] 89 90 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 91
92 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
93 """Initializes this class. 94 95 @param _time_fn: Time function for unittests 96 @param _random_fn: Random number generator for unittests 97 98 """ 99 object.__init__(self) 100 101 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 102 self._time_fn = _time_fn 103 self._random_fn = _random_fn
104
105 - def NextAttempt(self):
106 """Returns the timeout for the next attempt. 107 108 """ 109 try: 110 timeout = self._timeouts.next() 111 except StopIteration: 112 # No more timeouts, do blocking acquire 113 timeout = None 114 115 if timeout is not None: 116 # Add a small variation (-/+ 5%) to timeout. This helps in situations 117 # where two or more jobs are fighting for the same lock(s). 118 variation_range = timeout * 0.1 119 timeout += ((self._random_fn() * variation_range) - 120 (variation_range * 0.5)) 121 122 return timeout
123 124
125 -class OpExecCbBase: # pylint: disable=W0232
126 """Base class for OpCode execution callbacks. 127 128 """
129 - def NotifyStart(self):
130 """Called when we are about to execute the LU. 131 132 This function is called when we're about to start the lu's Exec() method, 133 that is, after we have acquired all locks. 134 135 """
136
137 - def Feedback(self, *args):
138 """Sends feedback from the LU code to the end-user. 139 140 """
141
142 - def CheckCancel(self):
143 """Check whether job has been cancelled. 144 145 """
146
147 - def SubmitManyJobs(self, jobs):
148 """Submits jobs for processing. 149 150 See L{jqueue.JobQueue.SubmitManyJobs}. 151 152 """ 153 raise NotImplementedError
154 155
156 -def _LUNameForOpName(opname):
157 """Computes the LU name for a given OpCode name. 158 159 """ 160 assert opname.startswith(_OP_PREFIX), \ 161 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 162 163 return _LU_PREFIX + opname[len(_OP_PREFIX):]
164 165
166 -def _ComputeDispatchTable():
167 """Computes the opcode-to-lu dispatch table. 168 169 """ 170 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 171 for op in opcodes.OP_MAPPING.values() 172 if op.WITH_LU)
173 174
175 -class Processor(object):
176 """Object which runs OpCodes""" 177 DISPATCH_TABLE = _ComputeDispatchTable() 178
179 - def __init__(self, context, ec_id):
180 """Constructor for Processor 181 182 @type context: GanetiContext 183 @param context: global Ganeti context 184 @type ec_id: string 185 @param ec_id: execution context identifier 186 187 """ 188 self.context = context 189 self._ec_id = ec_id 190 self._cbs = None 191 self.rpc = rpc.RpcRunner(context.cfg) 192 self.hmclass = HooksMaster
193
194 - def _AcquireLocks(self, level, names, shared, timeout, priority):
195 """Acquires locks via the Ganeti lock manager. 196 197 @type level: int 198 @param level: Lock level 199 @type names: list or string 200 @param names: Lock names 201 @type shared: bool 202 @param shared: Whether the locks should be acquired in shared mode 203 @type timeout: None or float 204 @param timeout: Timeout for acquiring the locks 205 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 206 amount of time 207 208 """ 209 if self._cbs: 210 self._cbs.CheckCancel() 211 212 acquired = self.context.glm.acquire(level, names, shared=shared, 213 timeout=timeout, priority=priority) 214 215 if acquired is None: 216 raise LockAcquireTimeout() 217 218 return acquired
219
220 - def _ProcessResult(self, result):
221 """Examines opcode result. 222 223 If necessary, additional processing on the result is done. 224 225 """ 226 if isinstance(result, cmdlib.ResultWithJobs): 227 # Submit jobs 228 job_submission = self._cbs.SubmitManyJobs(result.jobs) 229 230 # Build dictionary 231 result = result.other 232 233 assert constants.JOB_IDS_KEY not in result, \ 234 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 235 236 result[constants.JOB_IDS_KEY] = job_submission 237 238 return result
239
240 - def _ExecLU(self, lu):
241 """Logical Unit execution sequence. 242 243 """ 244 write_count = self.context.cfg.write_count 245 lu.CheckPrereq() 246 hm = HooksMaster(self.rpc.call_hooks_runner, lu) 247 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 248 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 249 self.Log, None) 250 251 if getattr(lu.op, "dry_run", False): 252 # in this mode, no post-hooks are run, and the config is not 253 # written (as it might have been modified by another LU, and we 254 # shouldn't do writeout on behalf of other threads 255 self.LogInfo("dry-run mode requested, not actually executing" 256 " the operation") 257 return lu.dry_run_result 258 259 try: 260 result = self._ProcessResult(lu.Exec(self.Log)) 261 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 262 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 263 self.Log, result) 264 finally: 265 # FIXME: This needs locks if not lu_class.REQ_BGL 266 if write_count != self.context.cfg.write_count: 267 hm.RunConfigUpdate() 268 269 return result
270
271 - def _LockAndExecLU(self, lu, level, calc_timeout, priority):
272 """Execute a Logical Unit, with the needed locks. 273 274 This is a recursive function that starts locking the given level, and 275 proceeds up, till there are no more locks to acquire. Then it executes the 276 given LU and its opcodes. 277 278 """ 279 adding_locks = level in lu.add_locks 280 acquiring_locks = level in lu.needed_locks 281 if level not in locking.LEVELS: 282 if self._cbs: 283 self._cbs.NotifyStart() 284 285 result = self._ExecLU(lu) 286 287 elif adding_locks and acquiring_locks: 288 # We could both acquire and add locks at the same level, but for now we 289 # don't need this, so we'll avoid the complicated code needed. 290 raise NotImplementedError("Can't declare locks to acquire when adding" 291 " others") 292 293 elif adding_locks or acquiring_locks: 294 lu.DeclareLocks(level) 295 share = lu.share_locks[level] 296 297 try: 298 assert adding_locks ^ acquiring_locks, \ 299 "Locks must be either added or acquired" 300 301 if acquiring_locks: 302 # Acquiring locks 303 needed_locks = lu.needed_locks[level] 304 305 self._AcquireLocks(level, needed_locks, share, 306 calc_timeout(), priority) 307 else: 308 # Adding locks 309 add_locks = lu.add_locks[level] 310 lu.remove_locks[level] = add_locks 311 312 try: 313 self.context.glm.add(level, add_locks, acquired=1, shared=share) 314 except errors.LockError: 315 raise errors.OpPrereqError( 316 "Couldn't add locks (%s), probably because of a race condition" 317 " with another job, who added them first" % add_locks, 318 errors.ECODE_FAULT) 319 320 try: 321 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) 322 finally: 323 if level in lu.remove_locks: 324 self.context.glm.remove(level, lu.remove_locks[level]) 325 finally: 326 if self.context.glm.is_owned(level): 327 self.context.glm.release(level) 328 329 else: 330 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) 331 332 return result
333
334 - def ExecOpCode(self, op, cbs, timeout=None, priority=None):
335 """Execute an opcode. 336 337 @type op: an OpCode instance 338 @param op: the opcode to be executed 339 @type cbs: L{OpExecCbBase} 340 @param cbs: Runtime callbacks 341 @type timeout: float or None 342 @param timeout: Maximum time to acquire all locks, None for no timeout 343 @type priority: number or None 344 @param priority: Priority for acquiring lock(s) 345 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 346 amount of time 347 348 """ 349 if not isinstance(op, opcodes.OpCode): 350 raise errors.ProgrammerError("Non-opcode instance passed" 351 " to ExecOpcode (%s)" % type(op)) 352 353 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 354 if lu_class is None: 355 raise errors.OpCodeUnknown("Unknown opcode") 356 357 if timeout is None: 358 calc_timeout = lambda: None 359 else: 360 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 361 362 self._cbs = cbs 363 try: 364 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 365 # and in a shared fashion otherwise (to prevent concurrent run with 366 # an exclusive LU. 367 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 368 not lu_class.REQ_BGL, calc_timeout(), 369 priority) 370 try: 371 lu = lu_class(self, op, self.context, self.rpc) 372 lu.ExpandNames() 373 assert lu.needed_locks is not None, "needed_locks not set by LU" 374 375 try: 376 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, 377 priority) 378 finally: 379 if self._ec_id: 380 self.context.cfg.DropECReservations(self._ec_id) 381 finally: 382 self.context.glm.release(locking.LEVEL_CLUSTER) 383 finally: 384 self._cbs = None 385 386 resultcheck_fn = op.OP_RESULT 387 if not (resultcheck_fn is None or resultcheck_fn(result)): 388 logging.error("Expected opcode result matching %s, got %s", 389 resultcheck_fn, result) 390 raise errors.OpResultError("Opcode result does not match %s" % 391 resultcheck_fn) 392 393 return result
394
395 - def Log(self, *args):
396 """Forward call to feedback callback function. 397 398 """ 399 if self._cbs: 400 self._cbs.Feedback(*args)
401
402 - def LogStep(self, current, total, message):
403 """Log a change in LU execution progress. 404 405 """ 406 logging.debug("Step %d/%d %s", current, total, message) 407 self.Log("STEP %d/%d %s" % (current, total, message))
408
409 - def LogWarning(self, message, *args, **kwargs):
410 """Log a warning to the logs and the user. 411 412 The optional keyword argument is 'hint' and can be used to show a 413 hint to the user (presumably related to the warning). If the 414 message is empty, it will not be printed at all, allowing one to 415 show only a hint. 416 417 """ 418 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 419 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 420 if args: 421 message = message % tuple(args) 422 if message: 423 logging.warning(message) 424 self.Log(" - WARNING: %s" % message) 425 if "hint" in kwargs: 426 self.Log(" Hint: %s" % kwargs["hint"])
427
428 - def LogInfo(self, message, *args):
429 """Log an informational message to the logs and the user. 430 431 """ 432 if args: 433 message = message % tuple(args) 434 logging.info(message) 435 self.Log(" - INFO: %s" % message)
436
437 - def GetECId(self):
438 """Returns the current execution context ID. 439 440 """ 441 if not self._ec_id: 442 raise errors.ProgrammerError("Tried to use execution context id when" 443 " not set") 444 return self._ec_id
445 446
447 -class HooksMaster(object):
448 """Hooks master. 449 450 This class distributes the run commands to the nodes based on the 451 specific LU class. 452 453 In order to remove the direct dependency on the rpc module, the 454 constructor needs a function which actually does the remote 455 call. This will usually be rpc.call_hooks_runner, but any function 456 which behaves the same works. 457 458 """
459 - def __init__(self, callfn, lu):
460 self.callfn = callfn 461 self.lu = lu 462 self.op = lu.op 463 self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE) 464 465 if self.lu.HPATH is None: 466 nodes = (None, None) 467 else: 468 nodes = map(frozenset, self.lu.BuildHooksNodes()) 469 470 (self.pre_nodes, self.post_nodes) = nodes
471
472 - def _BuildEnv(self, phase):
473 """Compute the environment and the target nodes. 474 475 Based on the opcode and the current node list, this builds the 476 environment for the hooks and the target node list for the run. 477 478 """ 479 if phase == constants.HOOKS_PHASE_PRE: 480 prefix = "GANETI_" 481 elif phase == constants.HOOKS_PHASE_POST: 482 prefix = "GANETI_POST_" 483 else: 484 raise AssertionError("Unknown phase '%s'" % phase) 485 486 env = {} 487 488 if self.lu.HPATH is not None: 489 lu_env = self.lu.BuildHooksEnv() 490 if lu_env: 491 assert not compat.any(key.upper().startswith(prefix) for key in lu_env) 492 env.update(("%s%s" % (prefix, key), value) 493 for (key, value) in lu_env.items()) 494 495 if phase == constants.HOOKS_PHASE_PRE: 496 assert compat.all((key.startswith("GANETI_") and 497 not key.startswith("GANETI_POST_")) 498 for key in env) 499 500 elif phase == constants.HOOKS_PHASE_POST: 501 assert compat.all(key.startswith("GANETI_POST_") for key in env) 502 assert isinstance(self.pre_env, dict) 503 504 # Merge with pre-phase environment 505 assert not compat.any(key.startswith("GANETI_POST_") 506 for key in self.pre_env) 507 env.update(self.pre_env) 508 else: 509 raise AssertionError("Unknown phase '%s'" % phase) 510 511 return env
512
513 - def _RunWrapper(self, node_list, hpath, phase, phase_env):
514 """Simple wrapper over self.callfn. 515 516 This method fixes the environment before doing the rpc call. 517 518 """ 519 cfg = self.lu.cfg 520 521 env = { 522 "PATH": constants.HOOKS_PATH, 523 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION, 524 "GANETI_OP_CODE": self.op.OP_ID, 525 "GANETI_DATA_DIR": constants.DATA_DIR, 526 "GANETI_HOOKS_PHASE": phase, 527 "GANETI_HOOKS_PATH": hpath, 528 } 529 530 if self.lu.HTYPE: 531 env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE 532 533 if cfg is not None: 534 env["GANETI_CLUSTER"] = cfg.GetClusterName() 535 env["GANETI_MASTER"] = cfg.GetMasterNode() 536 537 if phase_env: 538 assert not (set(env) & set(phase_env)), "Environment variables conflict" 539 env.update(phase_env) 540 541 # Convert everything to strings 542 env = dict([(str(key), str(val)) for key, val in env.iteritems()]) 543 544 assert compat.all(key == "PATH" or key.startswith("GANETI_") 545 for key in env) 546 547 return self.callfn(node_list, hpath, phase, env)
548
549 - def RunPhase(self, phase, nodes=None):
550 """Run all the scripts for a phase. 551 552 This is the main function of the HookMaster. 553 554 @param phase: one of L{constants.HOOKS_PHASE_POST} or 555 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 556 @param nodes: overrides the predefined list of nodes for the given phase 557 @return: the processed results of the hooks multi-node rpc call 558 @raise errors.HooksFailure: on communication failure to the nodes 559 @raise errors.HooksAbort: on failure of one of the hooks 560 561 """ 562 if phase == constants.HOOKS_PHASE_PRE: 563 if nodes is None: 564 nodes = self.pre_nodes 565 env = self.pre_env 566 elif phase == constants.HOOKS_PHASE_POST: 567 if nodes is None: 568 nodes = self.post_nodes 569 env = self._BuildEnv(phase) 570 else: 571 raise AssertionError("Unknown phase '%s'" % phase) 572 573 if not nodes: 574 # empty node list, we should not attempt to run this as either 575 # we're in the cluster init phase and the rpc client part can't 576 # even attempt to run, or this LU doesn't do hooks at all 577 return 578 579 results = self._RunWrapper(nodes, self.lu.HPATH, phase, env) 580 if not results: 581 msg = "Communication Failure" 582 if phase == constants.HOOKS_PHASE_PRE: 583 raise errors.HooksFailure(msg) 584 else: 585 self.lu.LogWarning(msg) 586 return results 587 588 errs = [] 589 for node_name in results: 590 res = results[node_name] 591 if res.offline: 592 continue 593 594 msg = res.fail_msg 595 if msg: 596 self.lu.LogWarning("Communication failure to node %s: %s", 597 node_name, msg) 598 continue 599 600 for script, hkr, output in res.payload: 601 if hkr == constants.HKR_FAIL: 602 if phase == constants.HOOKS_PHASE_PRE: 603 errs.append((node_name, script, output)) 604 else: 605 if not output: 606 output = "(no output)" 607 self.lu.LogWarning("On %s script %s failed, output: %s" % 608 (node_name, script, output)) 609 610 if errs and phase == constants.HOOKS_PHASE_PRE: 611 raise errors.HooksAbort(errs) 612 613 return results
614
615 - def RunConfigUpdate(self):
616 """Run the special configuration update hook 617 618 This is a special hook that runs only on the master after each 619 top-level LI if the configuration has been updated. 620 621 """ 622 phase = constants.HOOKS_PHASE_POST 623 hpath = constants.HOOKS_NAME_CFGUPDATE 624 nodes = [self.lu.cfg.GetMasterNode()] 625 self._RunWrapper(nodes, hpath, phase, self.pre_env)
626