Package ganeti :: Module mcpu
[hide private]
[frames] | no frames]

Source Code for Module ganeti.mcpu

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module implementing the logic behind the cluster operations 
 23   
 24  This module implements the logic for doing operations in the cluster. There 
 25  are two kinds of classes defined: 
 26    - logical units, which know how to deal with their specific opcode only 
 27    - the processor, which dispatches the opcodes to their logical units 
 28   
 29  """ 
 30   
 31  import logging 
 32  import random 
 33  import time 
 34  import itertools 
 35   
 36  from ganeti import opcodes 
 37  from ganeti import constants 
 38  from ganeti import errors 
 39  from ganeti import cmdlib 
 40  from ganeti import locking 
 41  from ganeti import utils 
 42  from ganeti import compat 
 43   
 44   
 45  _OP_PREFIX = "Op" 
 46  _LU_PREFIX = "LU" 
47 48 49 -class LockAcquireTimeout(Exception):
50 """Exception to report timeouts on acquiring locks. 51 52 """
53
54 55 -def _CalculateLockAttemptTimeouts():
56 """Calculate timeouts for lock attempts. 57 58 """ 59 result = [constants.LOCK_ATTEMPTS_MINWAIT] 60 running_sum = result[0] 61 62 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a 63 # blocking acquire 64 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: 65 timeout = (result[-1] * 1.05) ** 1.25 66 67 # Cap max timeout. This gives other jobs a chance to run even if 68 # we're still trying to get our locks, before finally moving to a 69 # blocking acquire. 70 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) 71 # And also cap the lower boundary for safety 72 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) 73 74 result.append(timeout) 75 running_sum += timeout 76 77 return result
78
79 80 -class LockAttemptTimeoutStrategy(object):
81 """Class with lock acquire timeout strategy. 82 83 """ 84 __slots__ = [ 85 "_timeouts", 86 "_random_fn", 87 "_time_fn", 88 ] 89 90 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() 91
92 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
93 """Initializes this class. 94 95 @param _time_fn: Time function for unittests 96 @param _random_fn: Random number generator for unittests 97 98 """ 99 object.__init__(self) 100 101 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) 102 self._time_fn = _time_fn 103 self._random_fn = _random_fn
104
105 - def NextAttempt(self):
106 """Returns the timeout for the next attempt. 107 108 """ 109 try: 110 timeout = self._timeouts.next() 111 except StopIteration: 112 # No more timeouts, do blocking acquire 113 timeout = None 114 115 if timeout is not None: 116 # Add a small variation (-/+ 5%) to timeout. This helps in situations 117 # where two or more jobs are fighting for the same lock(s). 118 variation_range = timeout * 0.1 119 timeout += ((self._random_fn() * variation_range) - 120 (variation_range * 0.5)) 121 122 return timeout
123
124 125 -class OpExecCbBase: # pylint: disable=W0232
126 """Base class for OpCode execution callbacks. 127 128 """
129 - def NotifyStart(self):
130 """Called when we are about to execute the LU. 131 132 This function is called when we're about to start the lu's Exec() method, 133 that is, after we have acquired all locks. 134 135 """
136
137 - def Feedback(self, *args):
138 """Sends feedback from the LU code to the end-user. 139 140 """
141
142 - def CheckCancel(self):
143 """Check whether job has been cancelled. 144 145 """
146
147 - def SubmitManyJobs(self, jobs):
148 """Submits jobs for processing. 149 150 See L{jqueue.JobQueue.SubmitManyJobs}. 151 152 """ 153 raise NotImplementedError
154
155 156 -def _LUNameForOpName(opname):
157 """Computes the LU name for a given OpCode name. 158 159 """ 160 assert opname.startswith(_OP_PREFIX), \ 161 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) 162 163 return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
165 166 -def _ComputeDispatchTable():
167 """Computes the opcode-to-lu dispatch table. 168 169 """ 170 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) 171 for op in opcodes.OP_MAPPING.values() 172 if op.WITH_LU)
173
174 175 -def _SetBaseOpParams(src, defcomment, dst):
176 """Copies basic opcode parameters. 177 178 @type src: L{opcodes.OpCode} 179 @param src: Source opcode 180 @type defcomment: string 181 @param defcomment: Comment to specify if not already given 182 @type dst: L{opcodes.OpCode} 183 @param dst: Destination opcode 184 185 """ 186 if hasattr(src, "debug_level"): 187 dst.debug_level = src.debug_level 188 189 if (getattr(dst, "priority", None) is None and 190 hasattr(src, "priority")): 191 dst.priority = src.priority 192 193 if not getattr(dst, opcodes.COMMENT_ATTR, None): 194 dst.comment = defcomment
195
196 197 -def _ProcessResult(submit_fn, op, result):
198 """Examines opcode result. 199 200 If necessary, additional processing on the result is done. 201 202 """ 203 if isinstance(result, cmdlib.ResultWithJobs): 204 # Copy basic parameters (e.g. priority) 205 map(compat.partial(_SetBaseOpParams, op, 206 "Submitted by %s" % op.OP_ID), 207 itertools.chain(*result.jobs)) 208 209 # Submit jobs 210 job_submission = submit_fn(result.jobs) 211 212 # Build dictionary 213 result = result.other 214 215 assert constants.JOB_IDS_KEY not in result, \ 216 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY 217 218 result[constants.JOB_IDS_KEY] = job_submission 219 220 return result
221
222 223 -def _FailingSubmitManyJobs(_):
224 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. 225 226 """ 227 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." 228 " queries) can not submit jobs")
229
230 231 -def _RpcResultsToHooksResults(rpc_results):
232 """Function to convert RPC results to the format expected by HooksMaster. 233 234 @type rpc_results: dict(node: L{rpc.RpcResult}) 235 @param rpc_results: RPC results 236 @rtype: dict(node: (fail_msg, offline, hooks_results)) 237 @return: RPC results unpacked according to the format expected by 238 L({mcpu.HooksMaster} 239 240 """ 241 return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload)) 242 for (node, rpc_res) in rpc_results.items())
243
244 245 -class Processor(object):
246 """Object which runs OpCodes""" 247 DISPATCH_TABLE = _ComputeDispatchTable() 248
249 - def __init__(self, context, ec_id, enable_locks=True):
250 """Constructor for Processor 251 252 @type context: GanetiContext 253 @param context: global Ganeti context 254 @type ec_id: string 255 @param ec_id: execution context identifier 256 257 """ 258 self.context = context 259 self._ec_id = ec_id 260 self._cbs = None 261 self.rpc = context.rpc 262 self.hmclass = HooksMaster 263 self._enable_locks = enable_locks
264
265 - def _CheckLocksEnabled(self):
266 """Checks if locking is enabled. 267 268 @raise errors.ProgrammerError: In case locking is not enabled 269 270 """ 271 if not self._enable_locks: 272 raise errors.ProgrammerError("Attempted to use disabled locks")
273
274 - def _AcquireLocks(self, level, names, shared, timeout, priority):
275 """Acquires locks via the Ganeti lock manager. 276 277 @type level: int 278 @param level: Lock level 279 @type names: list or string 280 @param names: Lock names 281 @type shared: bool 282 @param shared: Whether the locks should be acquired in shared mode 283 @type timeout: None or float 284 @param timeout: Timeout for acquiring the locks 285 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 286 amount of time 287 288 """ 289 self._CheckLocksEnabled() 290 291 if self._cbs: 292 self._cbs.CheckCancel() 293 294 acquired = self.context.glm.acquire(level, names, shared=shared, 295 timeout=timeout, priority=priority) 296 297 if acquired is None: 298 raise LockAcquireTimeout() 299 300 return acquired
301
302 - def _ExecLU(self, lu):
303 """Logical Unit execution sequence. 304 305 """ 306 write_count = self.context.cfg.write_count 307 lu.CheckPrereq() 308 309 hm = self.BuildHooksManager(lu) 310 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) 311 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, 312 self.Log, None) 313 314 if getattr(lu.op, "dry_run", False): 315 # in this mode, no post-hooks are run, and the config is not 316 # written (as it might have been modified by another LU, and we 317 # shouldn't do writeout on behalf of other threads 318 self.LogInfo("dry-run mode requested, not actually executing" 319 " the operation") 320 return lu.dry_run_result 321 322 if self._cbs: 323 submit_mj_fn = self._cbs.SubmitManyJobs 324 else: 325 submit_mj_fn = _FailingSubmitManyJobs 326 327 try: 328 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) 329 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) 330 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, 331 self.Log, result) 332 finally: 333 # FIXME: This needs locks if not lu_class.REQ_BGL 334 if write_count != self.context.cfg.write_count: 335 hm.RunConfigUpdate() 336 337 return result
338
339 - def BuildHooksManager(self, lu):
340 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
341
342 - def _LockAndExecLU(self, lu, level, calc_timeout, priority):
343 """Execute a Logical Unit, with the needed locks. 344 345 This is a recursive function that starts locking the given level, and 346 proceeds up, till there are no more locks to acquire. Then it executes the 347 given LU and its opcodes. 348 349 """ 350 adding_locks = level in lu.add_locks 351 acquiring_locks = level in lu.needed_locks 352 if level not in locking.LEVELS: 353 if self._cbs: 354 self._cbs.NotifyStart() 355 356 result = self._ExecLU(lu) 357 358 elif adding_locks and acquiring_locks: 359 # We could both acquire and add locks at the same level, but for now we 360 # don't need this, so we'll avoid the complicated code needed. 361 raise NotImplementedError("Can't declare locks to acquire when adding" 362 " others") 363 364 elif adding_locks or acquiring_locks: 365 self._CheckLocksEnabled() 366 367 lu.DeclareLocks(level) 368 share = lu.share_locks[level] 369 370 try: 371 assert adding_locks ^ acquiring_locks, \ 372 "Locks must be either added or acquired" 373 374 if acquiring_locks: 375 # Acquiring locks 376 needed_locks = lu.needed_locks[level] 377 378 self._AcquireLocks(level, needed_locks, share, 379 calc_timeout(), priority) 380 else: 381 # Adding locks 382 add_locks = lu.add_locks[level] 383 lu.remove_locks[level] = add_locks 384 385 try: 386 self.context.glm.add(level, add_locks, acquired=1, shared=share) 387 except errors.LockError: 388 logging.exception("Detected lock error in level %s for locks" 389 " %s, shared=%s", level, add_locks, share) 390 raise errors.OpPrereqError( 391 "Couldn't add locks (%s), most likely because of another" 392 " job who added them first" % add_locks, 393 errors.ECODE_NOTUNIQUE) 394 395 try: 396 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) 397 finally: 398 if level in lu.remove_locks: 399 self.context.glm.remove(level, lu.remove_locks[level]) 400 finally: 401 if self.context.glm.is_owned(level): 402 self.context.glm.release(level) 403 404 else: 405 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) 406 407 return result
408
409 - def ExecOpCode(self, op, cbs, timeout=None, priority=None):
410 """Execute an opcode. 411 412 @type op: an OpCode instance 413 @param op: the opcode to be executed 414 @type cbs: L{OpExecCbBase} 415 @param cbs: Runtime callbacks 416 @type timeout: float or None 417 @param timeout: Maximum time to acquire all locks, None for no timeout 418 @type priority: number or None 419 @param priority: Priority for acquiring lock(s) 420 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified 421 amount of time 422 423 """ 424 if not isinstance(op, opcodes.OpCode): 425 raise errors.ProgrammerError("Non-opcode instance passed" 426 " to ExecOpcode (%s)" % type(op)) 427 428 lu_class = self.DISPATCH_TABLE.get(op.__class__, None) 429 if lu_class is None: 430 raise errors.OpCodeUnknown("Unknown opcode") 431 432 if timeout is None: 433 calc_timeout = lambda: None 434 else: 435 calc_timeout = utils.RunningTimeout(timeout, False).Remaining 436 437 self._cbs = cbs 438 try: 439 if self._enable_locks: 440 # Acquire the Big Ganeti Lock exclusively if this LU requires it, 441 # and in a shared fashion otherwise (to prevent concurrent run with 442 # an exclusive LU. 443 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, 444 not lu_class.REQ_BGL, calc_timeout(), 445 priority) 446 elif lu_class.REQ_BGL: 447 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" 448 " disabled" % op.OP_ID) 449 450 try: 451 lu = lu_class(self, op, self.context, self.rpc) 452 lu.ExpandNames() 453 assert lu.needed_locks is not None, "needed_locks not set by LU" 454 455 try: 456 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, 457 priority) 458 finally: 459 if self._ec_id: 460 self.context.cfg.DropECReservations(self._ec_id) 461 finally: 462 # Release BGL if owned 463 if self.context.glm.is_owned(locking.LEVEL_CLUSTER): 464 assert self._enable_locks 465 self.context.glm.release(locking.LEVEL_CLUSTER) 466 finally: 467 self._cbs = None 468 469 resultcheck_fn = op.OP_RESULT 470 if not (resultcheck_fn is None or resultcheck_fn(result)): 471 logging.error("Expected opcode result matching %s, got %s", 472 resultcheck_fn, result) 473 if not getattr(op, "dry_run", False): 474 # FIXME: LUs should still behave in dry_run mode, or 475 # alternately we should have OP_DRYRUN_RESULT; in the 476 # meantime, we simply skip the OP_RESULT check in dry-run mode 477 raise errors.OpResultError("Opcode result does not match %s: %s" % 478 (resultcheck_fn, utils.Truncate(result, 80))) 479 480 return result
481
482 - def Log(self, *args):
483 """Forward call to feedback callback function. 484 485 """ 486 if self._cbs: 487 self._cbs.Feedback(*args)
488
489 - def LogStep(self, current, total, message):
490 """Log a change in LU execution progress. 491 492 """ 493 logging.debug("Step %d/%d %s", current, total, message) 494 self.Log("STEP %d/%d %s" % (current, total, message))
495
496 - def LogWarning(self, message, *args, **kwargs):
497 """Log a warning to the logs and the user. 498 499 The optional keyword argument is 'hint' and can be used to show a 500 hint to the user (presumably related to the warning). If the 501 message is empty, it will not be printed at all, allowing one to 502 show only a hint. 503 504 """ 505 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \ 506 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs) 507 if args: 508 message = message % tuple(args) 509 if message: 510 logging.warning(message) 511 self.Log(" - WARNING: %s" % message) 512 if "hint" in kwargs: 513 self.Log(" Hint: %s" % kwargs["hint"])
514
515 - def LogInfo(self, message, *args):
516 """Log an informational message to the logs and the user. 517 518 """ 519 if args: 520 message = message % tuple(args) 521 logging.info(message) 522 self.Log(" - INFO: %s" % message)
523
524 - def GetECId(self):
525 """Returns the current execution context ID. 526 527 """ 528 if not self._ec_id: 529 raise errors.ProgrammerError("Tried to use execution context id when" 530 " not set") 531 return self._ec_id
532
533 534 -class HooksMaster(object):
535 - def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn, 536 hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None, 537 master_name=None):
538 """Base class for hooks masters. 539 540 This class invokes the execution of hooks according to the behaviour 541 specified by its parameters. 542 543 @type opcode: string 544 @param opcode: opcode of the operation to which the hooks are tied 545 @type hooks_path: string 546 @param hooks_path: prefix of the hooks directories 547 @type nodes: 2-tuple of lists 548 @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be 549 run and nodes on which post-hooks must be run 550 @type hooks_execution_fn: function that accepts the following parameters: 551 (node_list, hooks_path, phase, environment) 552 @param hooks_execution_fn: function that will execute the hooks; can be 553 None, indicating that no conversion is necessary. 554 @type hooks_results_adapt_fn: function 555 @param hooks_results_adapt_fn: function that will adapt the return value of 556 hooks_execution_fn to the format expected by RunPhase 557 @type build_env_fn: function that returns a dictionary having strings as 558 keys 559 @param build_env_fn: function that builds the environment for the hooks 560 @type log_fn: function that accepts a string 561 @param log_fn: logging function 562 @type htype: string or None 563 @param htype: None or one of L{constants.HTYPE_CLUSTER}, 564 L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE} 565 @type cluster_name: string 566 @param cluster_name: name of the cluster 567 @type master_name: string 568 @param master_name: name of the master 569 570 """ 571 self.opcode = opcode 572 self.hooks_path = hooks_path 573 self.hooks_execution_fn = hooks_execution_fn 574 self.hooks_results_adapt_fn = hooks_results_adapt_fn 575 self.build_env_fn = build_env_fn 576 self.log_fn = log_fn 577 self.htype = htype 578 self.cluster_name = cluster_name 579 self.master_name = master_name 580 581 self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE) 582 (self.pre_nodes, self.post_nodes) = nodes
583
584 - def _BuildEnv(self, phase):
585 """Compute the environment and the target nodes. 586 587 Based on the opcode and the current node list, this builds the 588 environment for the hooks and the target node list for the run. 589 590 """ 591 if phase == constants.HOOKS_PHASE_PRE: 592 prefix = "GANETI_" 593 elif phase == constants.HOOKS_PHASE_POST: 594 prefix = "GANETI_POST_" 595 else: 596 raise AssertionError("Unknown phase '%s'" % phase) 597 598 env = {} 599 600 if self.hooks_path is not None: 601 phase_env = self.build_env_fn() 602 if phase_env: 603 assert not compat.any(key.upper().startswith(prefix) 604 for key in phase_env) 605 env.update(("%s%s" % (prefix, key), value) 606 for (key, value) in phase_env.items()) 607 608 if phase == constants.HOOKS_PHASE_PRE: 609 assert compat.all((key.startswith("GANETI_") and 610 not key.startswith("GANETI_POST_")) 611 for key in env) 612 613 elif phase == constants.HOOKS_PHASE_POST: 614 assert compat.all(key.startswith("GANETI_POST_") for key in env) 615 assert isinstance(self.pre_env, dict) 616 617 # Merge with pre-phase environment 618 assert not compat.any(key.startswith("GANETI_POST_") 619 for key in self.pre_env) 620 env.update(self.pre_env) 621 else: 622 raise AssertionError("Unknown phase '%s'" % phase) 623 624 return env
625
626 - def _RunWrapper(self, node_list, hpath, phase, phase_env):
627 """Simple wrapper over self.callfn. 628 629 This method fixes the environment before executing the hooks. 630 631 """ 632 env = { 633 "PATH": constants.HOOKS_PATH, 634 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION, 635 "GANETI_OP_CODE": self.opcode, 636 "GANETI_DATA_DIR": constants.DATA_DIR, 637 "GANETI_HOOKS_PHASE": phase, 638 "GANETI_HOOKS_PATH": hpath, 639 } 640 641 if self.htype: 642 env["GANETI_OBJECT_TYPE"] = self.htype 643 644 if self.cluster_name is not None: 645 env["GANETI_CLUSTER"] = self.cluster_name 646 647 if self.master_name is not None: 648 env["GANETI_MASTER"] = self.master_name 649 650 if phase_env: 651 env = utils.algo.JoinDisjointDicts(env, phase_env) 652 653 # Convert everything to strings 654 env = dict([(str(key), str(val)) for key, val in env.iteritems()]) 655 656 assert compat.all(key == "PATH" or key.startswith("GANETI_") 657 for key in env) 658 659 return self.hooks_execution_fn(node_list, hpath, phase, env)
660
661 - def RunPhase(self, phase, nodes=None):
662 """Run all the scripts for a phase. 663 664 This is the main function of the HookMaster. 665 It executes self.hooks_execution_fn, and after running 666 self.hooks_results_adapt_fn on its results it expects them to be in the form 667 {node_name: (fail_msg, [(script, result, output), ...]}). 668 669 @param phase: one of L{constants.HOOKS_PHASE_POST} or 670 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 671 @param nodes: overrides the predefined list of nodes for the given phase 672 @return: the processed results of the hooks multi-node rpc call 673 @raise errors.HooksFailure: on communication failure to the nodes 674 @raise errors.HooksAbort: on failure of one of the hooks 675 676 """ 677 if phase == constants.HOOKS_PHASE_PRE: 678 if nodes is None: 679 nodes = self.pre_nodes 680 env = self.pre_env 681 elif phase == constants.HOOKS_PHASE_POST: 682 if nodes is None: 683 nodes = self.post_nodes 684 env = self._BuildEnv(phase) 685 else: 686 raise AssertionError("Unknown phase '%s'" % phase) 687 688 if not nodes: 689 # empty node list, we should not attempt to run this as either 690 # we're in the cluster init phase and the rpc client part can't 691 # even attempt to run, or this LU doesn't do hooks at all 692 return 693 694 results = self._RunWrapper(nodes, self.hooks_path, phase, env) 695 if not results: 696 msg = "Communication Failure" 697 if phase == constants.HOOKS_PHASE_PRE: 698 raise errors.HooksFailure(msg) 699 else: 700 self.log_fn(msg) 701 return results 702 703 converted_res = results 704 if self.hooks_results_adapt_fn: 705 converted_res = self.hooks_results_adapt_fn(results) 706 707 errs = [] 708 for node_name, (fail_msg, offline, hooks_results) in converted_res.items(): 709 if offline: 710 continue 711 712 if fail_msg: 713 self.log_fn("Communication failure to node %s: %s", node_name, fail_msg) 714 continue 715 716 for script, hkr, output in hooks_results: 717 if hkr == constants.HKR_FAIL: 718 if phase == constants.HOOKS_PHASE_PRE: 719 errs.append((node_name, script, output)) 720 else: 721 if not output: 722 output = "(no output)" 723 self.log_fn("On %s script %s failed, output: %s" % 724 (node_name, script, output)) 725 726 if errs and phase == constants.HOOKS_PHASE_PRE: 727 raise errors.HooksAbort(errs) 728 729 return results
730
731 - def RunConfigUpdate(self):
732 """Run the special configuration update hook 733 734 This is a special hook that runs only on the master after each 735 top-level LI if the configuration has been updated. 736 737 """ 738 phase = constants.HOOKS_PHASE_POST 739 hpath = constants.HOOKS_NAME_CFGUPDATE 740 nodes = [self.master_name] 741 self._RunWrapper(nodes, hpath, phase, self.pre_env)
742 743 @staticmethod
744 - def BuildFromLu(hooks_execution_fn, lu):
745 if lu.HPATH is None: 746 nodes = (None, None) 747 else: 748 nodes = map(frozenset, lu.BuildHooksNodes()) 749 750 master_name = cluster_name = None 751 if lu.cfg: 752 master_name = lu.cfg.GetMasterNode() 753 cluster_name = lu.cfg.GetClusterName() 754 755 return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn, 756 _RpcResultsToHooksResults, lu.BuildHooksEnv, 757 lu.LogWarning, lu.HTYPE, cluster_name, master_name)
758