Package ganeti :: Module cmdlib
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib

    1  # 
    2  # 
    3   
    4  # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. 
    5  # 
    6  # This program is free software; you can redistribute it and/or modify 
    7  # it under the terms of the GNU General Public License as published by 
    8  # the Free Software Foundation; either version 2 of the License, or 
    9  # (at your option) any later version. 
   10  # 
   11  # This program is distributed in the hope that it will be useful, but 
   12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
   13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
   14  # General Public License for more details. 
   15  # 
   16  # You should have received a copy of the GNU General Public License 
   17  # along with this program; if not, write to the Free Software 
   18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
   19  # 02110-1301, USA. 
   20   
   21   
   22  """Module implementing the master-side code.""" 
   23   
   24  # pylint: disable-msg=W0201,C0302 
   25   
   26  # W0201 since most LU attributes are defined in CheckPrereq or similar 
   27  # functions 
   28   
   29  # C0302: since we have waaaay to many lines in this module 
   30   
   31  import os 
   32  import os.path 
   33  import time 
   34  import re 
   35  import platform 
   36  import logging 
   37  import copy 
   38  import OpenSSL 
   39  import socket 
   40  import tempfile 
   41  import shutil 
   42   
   43  from ganeti import ssh 
   44  from ganeti import utils 
   45  from ganeti import errors 
   46  from ganeti import hypervisor 
   47  from ganeti import locking 
   48  from ganeti import constants 
   49  from ganeti import objects 
   50  from ganeti import serializer 
   51  from ganeti import ssconf 
   52  from ganeti import uidpool 
   53  from ganeti import compat 
   54  from ganeti import masterd 
   55  from ganeti import netutils 
   56   
   57  import ganeti.masterd.instance # pylint: disable-msg=W0611 
58 59 60 # Modifiable default values; need to define these here before the 61 # actual LUs 62 63 -def _EmptyList():
64 """Returns an empty list. 65 66 """ 67 return []
68
69 70 -def _EmptyDict():
71 """Returns an empty dict. 72 73 """ 74 return {}
75 76 77 #: The without-default default value 78 _NoDefault = object() 79 80 81 #: The no-type (value to complex to check it in the type system) 82 _NoType = object()
83 84 85 # Some basic types 86 -def _TNotNone(val):
87 """Checks if the given value is not None. 88 89 """ 90 return val is not None
91
92 93 -def _TNone(val):
94 """Checks if the given value is None. 95 96 """ 97 return val is None
98
99 100 -def _TBool(val):
101 """Checks if the given value is a boolean. 102 103 """ 104 return isinstance(val, bool)
105
106 107 -def _TInt(val):
108 """Checks if the given value is an integer. 109 110 """ 111 return isinstance(val, int)
112
113 114 -def _TFloat(val):
115 """Checks if the given value is a float. 116 117 """ 118 return isinstance(val, float)
119
120 121 -def _TString(val):
122 """Checks if the given value is a string. 123 124 """ 125 return isinstance(val, basestring)
126
127 128 -def _TTrue(val):
129 """Checks if a given value evaluates to a boolean True value. 130 131 """ 132 return bool(val)
133
134 135 -def _TElemOf(target_list):
136 """Builds a function that checks if a given value is a member of a list. 137 138 """ 139 return lambda val: val in target_list
140
141 142 # Container types 143 -def _TList(val):
144 """Checks if the given value is a list. 145 146 """ 147 return isinstance(val, list)
148
149 150 -def _TDict(val):
151 """Checks if the given value is a dictionary. 152 153 """ 154 return isinstance(val, dict)
155
156 157 -def _TIsLength(size):
158 """Check is the given container is of the given size. 159 160 """ 161 return lambda container: len(container) == size
162
163 164 # Combinator types 165 -def _TAnd(*args):
166 """Combine multiple functions using an AND operation. 167 168 """ 169 def fn(val): 170 return compat.all(t(val) for t in args)
171 return fn 172
173 174 -def _TOr(*args):
175 """Combine multiple functions using an AND operation. 176 177 """ 178 def fn(val): 179 return compat.any(t(val) for t in args)
180 return fn 181
182 183 -def _TMap(fn, test):
184 """Checks that a modified version of the argument passes the given test. 185 186 """ 187 return lambda val: test(fn(val))
188 189 190 # Type aliases 191 192 #: a non-empty string 193 _TNonEmptyString = _TAnd(_TString, _TTrue) 194 195 196 #: a maybe non-empty string 197 _TMaybeString = _TOr(_TNonEmptyString, _TNone) 198 199 200 #: a maybe boolean (bool or none) 201 _TMaybeBool = _TOr(_TBool, _TNone) 202 203 204 #: a positive integer 205 _TPositiveInt = _TAnd(_TInt, lambda v: v >= 0) 206 207 #: a strictly positive integer 208 _TStrictPositiveInt = _TAnd(_TInt, lambda v: v > 0)
209 210 211 -def _TListOf(my_type):
212 """Checks if a given value is a list with all elements of the same type. 213 214 """ 215 return _TAnd(_TList, 216 lambda lst: compat.all(my_type(v) for v in lst))
217
218 219 -def _TDictOf(key_type, val_type):
220 """Checks a dict type for the type of its key/values. 221 222 """ 223 return _TAnd(_TDict, 224 lambda my_dict: (compat.all(key_type(v) for v in my_dict.keys()) 225 and compat.all(val_type(v) 226 for v in my_dict.values())))
227 228 229 # Common opcode attributes 230 231 #: output fields for a query operation 232 _POutputFields = ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)) 233 234 235 #: the shutdown timeout 236 _PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, 237 _TPositiveInt) 238 239 #: the force parameter 240 _PForce = ("force", False, _TBool) 241 242 #: a required instance name (for single-instance LUs) 243 _PInstanceName = ("instance_name", _NoDefault, _TNonEmptyString) 244 245 246 #: a required node name (for single-node LUs) 247 _PNodeName = ("node_name", _NoDefault, _TNonEmptyString) 248 249 #: the migration type (live/non-live) 250 _PMigrationMode = ("mode", None, _TOr(_TNone, 251 _TElemOf(constants.HT_MIGRATION_MODES))) 252 253 #: the obsolete 'live' mode (boolean) 254 _PMigrationLive = ("live", None, _TMaybeBool)
255 256 257 # End types 258 -class LogicalUnit(object):
259 """Logical Unit base class. 260 261 Subclasses must follow these rules: 262 - implement ExpandNames 263 - implement CheckPrereq (except when tasklets are used) 264 - implement Exec (except when tasklets are used) 265 - implement BuildHooksEnv 266 - redefine HPATH and HTYPE 267 - optionally redefine their run requirements: 268 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively 269 270 Note that all commands require root permissions. 271 272 @ivar dry_run_result: the value (if any) that will be returned to the caller 273 in dry-run mode (signalled by opcode dry_run parameter) 274 @cvar _OP_PARAMS: a list of opcode attributes, their defaults values 275 they should get if not already defined, and types they must match 276 277 """ 278 HPATH = None 279 HTYPE = None 280 _OP_PARAMS = [] 281 REQ_BGL = True 282
283 - def __init__(self, processor, op, context, rpc):
284 """Constructor for LogicalUnit. 285 286 This needs to be overridden in derived classes in order to check op 287 validity. 288 289 """ 290 self.proc = processor 291 self.op = op 292 self.cfg = context.cfg 293 self.context = context 294 self.rpc = rpc 295 # Dicts used to declare locking needs to mcpu 296 self.needed_locks = None 297 self.acquired_locks = {} 298 self.share_locks = dict.fromkeys(locking.LEVELS, 0) 299 self.add_locks = {} 300 self.remove_locks = {} 301 # Used to force good behavior when calling helper functions 302 self.recalculate_locks = {} 303 self.__ssh = None 304 # logging 305 self.Log = processor.Log # pylint: disable-msg=C0103 306 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 307 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 308 self.LogStep = processor.LogStep # pylint: disable-msg=C0103 309 # support for dry-run 310 self.dry_run_result = None 311 # support for generic debug attribute 312 if (not hasattr(self.op, "debug_level") or 313 not isinstance(self.op.debug_level, int)): 314 self.op.debug_level = 0 315 316 # Tasklets 317 self.tasklets = None 318 319 # The new kind-of-type-system 320 op_id = self.op.OP_ID 321 for attr_name, aval, test in self._OP_PARAMS: 322 if not hasattr(op, attr_name): 323 if aval == _NoDefault: 324 raise errors.OpPrereqError("Required parameter '%s.%s' missing" % 325 (op_id, attr_name), errors.ECODE_INVAL) 326 else: 327 if callable(aval): 328 dval = aval() 329 else: 330 dval = aval 331 setattr(self.op, attr_name, dval) 332 attr_val = getattr(op, attr_name) 333 if test == _NoType: 334 # no tests here 335 continue 336 if not callable(test): 337 raise errors.ProgrammerError("Validation for parameter '%s.%s' failed," 338 " given type is not a proper type (%s)" % 339 (op_id, attr_name, test)) 340 if not test(attr_val): 341 logging.error("OpCode %s, parameter %s, has invalid type %s/value %s", 342 self.op.OP_ID, attr_name, type(attr_val), attr_val) 343 raise errors.OpPrereqError("Parameter '%s.%s' fails validation" % 344 (op_id, attr_name), errors.ECODE_INVAL) 345 346 self.CheckArguments()
347
348 - def __GetSSH(self):
349 """Returns the SshRunner object 350 351 """ 352 if not self.__ssh: 353 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName()) 354 return self.__ssh
355 356 ssh = property(fget=__GetSSH) 357
358 - def CheckArguments(self):
359 """Check syntactic validity for the opcode arguments. 360 361 This method is for doing a simple syntactic check and ensure 362 validity of opcode parameters, without any cluster-related 363 checks. While the same can be accomplished in ExpandNames and/or 364 CheckPrereq, doing these separate is better because: 365 366 - ExpandNames is left as as purely a lock-related function 367 - CheckPrereq is run after we have acquired locks (and possible 368 waited for them) 369 370 The function is allowed to change the self.op attribute so that 371 later methods can no longer worry about missing parameters. 372 373 """ 374 pass
375
376 - def ExpandNames(self):
377 """Expand names for this LU. 378 379 This method is called before starting to execute the opcode, and it should 380 update all the parameters of the opcode to their canonical form (e.g. a 381 short node name must be fully expanded after this method has successfully 382 completed). This way locking, hooks, logging, ecc. can work correctly. 383 384 LUs which implement this method must also populate the self.needed_locks 385 member, as a dict with lock levels as keys, and a list of needed lock names 386 as values. Rules: 387 388 - use an empty dict if you don't need any lock 389 - if you don't need any lock at a particular level omit that level 390 - don't put anything for the BGL level 391 - if you want all locks at a level use locking.ALL_SET as a value 392 393 If you need to share locks (rather than acquire them exclusively) at one 394 level you can modify self.share_locks, setting a true value (usually 1) for 395 that level. By default locks are not shared. 396 397 This function can also define a list of tasklets, which then will be 398 executed in order instead of the usual LU-level CheckPrereq and Exec 399 functions, if those are not defined by the LU. 400 401 Examples:: 402 403 # Acquire all nodes and one instance 404 self.needed_locks = { 405 locking.LEVEL_NODE: locking.ALL_SET, 406 locking.LEVEL_INSTANCE: ['instance1.example.com'], 407 } 408 # Acquire just two nodes 409 self.needed_locks = { 410 locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'], 411 } 412 # Acquire no locks 413 self.needed_locks = {} # No, you can't leave it to the default value None 414 415 """ 416 # The implementation of this method is mandatory only if the new LU is 417 # concurrent, so that old LUs don't need to be changed all at the same 418 # time. 419 if self.REQ_BGL: 420 self.needed_locks = {} # Exclusive LUs don't need locks. 421 else: 422 raise NotImplementedError
423
424 - def DeclareLocks(self, level):
425 """Declare LU locking needs for a level 426 427 While most LUs can just declare their locking needs at ExpandNames time, 428 sometimes there's the need to calculate some locks after having acquired 429 the ones before. This function is called just before acquiring locks at a 430 particular level, but after acquiring the ones at lower levels, and permits 431 such calculations. It can be used to modify self.needed_locks, and by 432 default it does nothing. 433 434 This function is only called if you have something already set in 435 self.needed_locks for the level. 436 437 @param level: Locking level which is going to be locked 438 @type level: member of ganeti.locking.LEVELS 439 440 """
441
442 - def CheckPrereq(self):
443 """Check prerequisites for this LU. 444 445 This method should check that the prerequisites for the execution 446 of this LU are fulfilled. It can do internode communication, but 447 it should be idempotent - no cluster or system changes are 448 allowed. 449 450 The method should raise errors.OpPrereqError in case something is 451 not fulfilled. Its return value is ignored. 452 453 This method should also update all the parameters of the opcode to 454 their canonical form if it hasn't been done by ExpandNames before. 455 456 """ 457 if self.tasklets is not None: 458 for (idx, tl) in enumerate(self.tasklets): 459 logging.debug("Checking prerequisites for tasklet %s/%s", 460 idx + 1, len(self.tasklets)) 461 tl.CheckPrereq() 462 else: 463 pass
464
465 - def Exec(self, feedback_fn):
466 """Execute the LU. 467 468 This method should implement the actual work. It should raise 469 errors.OpExecError for failures that are somewhat dealt with in 470 code, or expected. 471 472 """ 473 if self.tasklets is not None: 474 for (idx, tl) in enumerate(self.tasklets): 475 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets)) 476 tl.Exec(feedback_fn) 477 else: 478 raise NotImplementedError
479
480 - def BuildHooksEnv(self):
481 """Build hooks environment for this LU. 482 483 This method should return a three-node tuple consisting of: a dict 484 containing the environment that will be used for running the 485 specific hook for this LU, a list of node names on which the hook 486 should run before the execution, and a list of node names on which 487 the hook should run after the execution. 488 489 The keys of the dict must not have 'GANETI_' prefixed as this will 490 be handled in the hooks runner. Also note additional keys will be 491 added by the hooks runner. If the LU doesn't define any 492 environment, an empty dict (and not None) should be returned. 493 494 No nodes should be returned as an empty list (and not None). 495 496 Note that if the HPATH for a LU class is None, this function will 497 not be called. 498 499 """ 500 raise NotImplementedError
501
502 - def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
503 """Notify the LU about the results of its hooks. 504 505 This method is called every time a hooks phase is executed, and notifies 506 the Logical Unit about the hooks' result. The LU can then use it to alter 507 its result based on the hooks. By default the method does nothing and the 508 previous result is passed back unchanged but any LU can define it if it 509 wants to use the local cluster hook-scripts somehow. 510 511 @param phase: one of L{constants.HOOKS_PHASE_POST} or 512 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 513 @param hook_results: the results of the multi-node hooks rpc call 514 @param feedback_fn: function used send feedback back to the caller 515 @param lu_result: the previous Exec result this LU had, or None 516 in the PRE phase 517 @return: the new Exec result, based on the previous result 518 and hook results 519 520 """ 521 # API must be kept, thus we ignore the unused argument and could 522 # be a function warnings 523 # pylint: disable-msg=W0613,R0201 524 return lu_result
525
526 - def _ExpandAndLockInstance(self):
527 """Helper function to expand and lock an instance. 528 529 Many LUs that work on an instance take its name in self.op.instance_name 530 and need to expand it and then declare the expanded name for locking. This 531 function does it, and then updates self.op.instance_name to the expanded 532 name. It also initializes needed_locks as a dict, if this hasn't been done 533 before. 534 535 """ 536 if self.needed_locks is None: 537 self.needed_locks = {} 538 else: 539 assert locking.LEVEL_INSTANCE not in self.needed_locks, \ 540 "_ExpandAndLockInstance called with instance-level locks set" 541 self.op.instance_name = _ExpandInstanceName(self.cfg, 542 self.op.instance_name) 543 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
544
545 - def _LockInstancesNodes(self, primary_only=False):
546 """Helper function to declare instances' nodes for locking. 547 548 This function should be called after locking one or more instances to lock 549 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE] 550 with all primary or secondary nodes for instances already locked and 551 present in self.needed_locks[locking.LEVEL_INSTANCE]. 552 553 It should be called from DeclareLocks, and for safety only works if 554 self.recalculate_locks[locking.LEVEL_NODE] is set. 555 556 In the future it may grow parameters to just lock some instance's nodes, or 557 to just lock primaries or secondary nodes, if needed. 558 559 If should be called in DeclareLocks in a way similar to:: 560 561 if level == locking.LEVEL_NODE: 562 self._LockInstancesNodes() 563 564 @type primary_only: boolean 565 @param primary_only: only lock primary nodes of locked instances 566 567 """ 568 assert locking.LEVEL_NODE in self.recalculate_locks, \ 569 "_LockInstancesNodes helper function called with no nodes to recalculate" 570 571 # TODO: check if we're really been called with the instance locks held 572 573 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the 574 # future we might want to have different behaviors depending on the value 575 # of self.recalculate_locks[locking.LEVEL_NODE] 576 wanted_nodes = [] 577 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: 578 instance = self.context.cfg.GetInstanceInfo(instance_name) 579 wanted_nodes.append(instance.primary_node) 580 if not primary_only: 581 wanted_nodes.extend(instance.secondary_nodes) 582 583 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE: 584 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes 585 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND: 586 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes) 587 588 del self.recalculate_locks[locking.LEVEL_NODE]
589
590 591 -class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
592 """Simple LU which runs no hooks. 593 594 This LU is intended as a parent for other LogicalUnits which will 595 run no hooks, in order to reduce duplicate code. 596 597 """ 598 HPATH = None 599 HTYPE = None 600
601 - def BuildHooksEnv(self):
602 """Empty BuildHooksEnv for NoHooksLu. 603 604 This just raises an error. 605 606 """ 607 assert False, "BuildHooksEnv called for NoHooksLUs"
608
609 610 -class Tasklet:
611 """Tasklet base class. 612 613 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or 614 they can mix legacy code with tasklets. Locking needs to be done in the LU, 615 tasklets know nothing about locks. 616 617 Subclasses must follow these rules: 618 - Implement CheckPrereq 619 - Implement Exec 620 621 """
622 - def __init__(self, lu):
623 self.lu = lu 624 625 # Shortcuts 626 self.cfg = lu.cfg 627 self.rpc = lu.rpc
628
629 - def CheckPrereq(self):
630 """Check prerequisites for this tasklets. 631 632 This method should check whether the prerequisites for the execution of 633 this tasklet are fulfilled. It can do internode communication, but it 634 should be idempotent - no cluster or system changes are allowed. 635 636 The method should raise errors.OpPrereqError in case something is not 637 fulfilled. Its return value is ignored. 638 639 This method should also update all parameters to their canonical form if it 640 hasn't been done before. 641 642 """ 643 pass
644
645 - def Exec(self, feedback_fn):
646 """Execute the tasklet. 647 648 This method should implement the actual work. It should raise 649 errors.OpExecError for failures that are somewhat dealt with in code, or 650 expected. 651 652 """ 653 raise NotImplementedError
654
655 656 -def _GetWantedNodes(lu, nodes):
657 """Returns list of checked and expanded node names. 658 659 @type lu: L{LogicalUnit} 660 @param lu: the logical unit on whose behalf we execute 661 @type nodes: list 662 @param nodes: list of node names or None for all nodes 663 @rtype: list 664 @return: the list of nodes, sorted 665 @raise errors.ProgrammerError: if the nodes parameter is wrong type 666 667 """ 668 if not nodes: 669 raise errors.ProgrammerError("_GetWantedNodes should only be called with a" 670 " non-empty list of nodes whose name is to be expanded.") 671 672 wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes] 673 return utils.NiceSort(wanted)
674
675 676 -def _GetWantedInstances(lu, instances):
677 """Returns list of checked and expanded instance names. 678 679 @type lu: L{LogicalUnit} 680 @param lu: the logical unit on whose behalf we execute 681 @type instances: list 682 @param instances: list of instance names or None for all instances 683 @rtype: list 684 @return: the list of instances, sorted 685 @raise errors.OpPrereqError: if the instances parameter is wrong type 686 @raise errors.OpPrereqError: if any of the passed instances is not found 687 688 """ 689 if instances: 690 wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances] 691 else: 692 wanted = utils.NiceSort(lu.cfg.GetInstanceList()) 693 return wanted
694
695 696 -def _GetUpdatedParams(old_params, update_dict, 697 use_default=True, use_none=False):
698 """Return the new version of a parameter dictionary. 699 700 @type old_params: dict 701 @param old_params: old parameters 702 @type update_dict: dict 703 @param update_dict: dict containing new parameter values, or 704 constants.VALUE_DEFAULT to reset the parameter to its default 705 value 706 @param use_default: boolean 707 @type use_default: whether to recognise L{constants.VALUE_DEFAULT} 708 values as 'to be deleted' values 709 @param use_none: boolean 710 @type use_none: whether to recognise C{None} values as 'to be 711 deleted' values 712 @rtype: dict 713 @return: the new parameter dictionary 714 715 """ 716 params_copy = copy.deepcopy(old_params) 717 for key, val in update_dict.iteritems(): 718 if ((use_default and val == constants.VALUE_DEFAULT) or 719 (use_none and val is None)): 720 try: 721 del params_copy[key] 722 except KeyError: 723 pass 724 else: 725 params_copy[key] = val 726 return params_copy
727
728 729 -def _CheckOutputFields(static, dynamic, selected):
730 """Checks whether all selected fields are valid. 731 732 @type static: L{utils.FieldSet} 733 @param static: static fields set 734 @type dynamic: L{utils.FieldSet} 735 @param dynamic: dynamic fields set 736 737 """ 738 f = utils.FieldSet() 739 f.Extend(static) 740 f.Extend(dynamic) 741 742 delta = f.NonMatching(selected) 743 if delta: 744 raise errors.OpPrereqError("Unknown output fields selected: %s" 745 % ",".join(delta), errors.ECODE_INVAL)
746
747 748 -def _CheckGlobalHvParams(params):
749 """Validates that given hypervisor params are not global ones. 750 751 This will ensure that instances don't get customised versions of 752 global params. 753 754 """ 755 used_globals = constants.HVC_GLOBALS.intersection(params) 756 if used_globals: 757 msg = ("The following hypervisor parameters are global and cannot" 758 " be customized at instance level, please modify them at" 759 " cluster level: %s" % utils.CommaJoin(used_globals)) 760 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
761
762 763 -def _CheckNodeOnline(lu, node):
764 """Ensure that a given node is online. 765 766 @param lu: the LU on behalf of which we make the check 767 @param node: the node to check 768 @raise errors.OpPrereqError: if the node is offline 769 770 """ 771 if lu.cfg.GetNodeInfo(node).offline: 772 raise errors.OpPrereqError("Can't use offline node %s" % node, 773 errors.ECODE_INVAL)
774
775 776 -def _CheckNodeNotDrained(lu, node):
777 """Ensure that a given node is not drained. 778 779 @param lu: the LU on behalf of which we make the check 780 @param node: the node to check 781 @raise errors.OpPrereqError: if the node is drained 782 783 """ 784 if lu.cfg.GetNodeInfo(node).drained: 785 raise errors.OpPrereqError("Can't use drained node %s" % node, 786 errors.ECODE_INVAL)
787
788 789 -def _CheckNodeHasOS(lu, node, os_name, force_variant):
790 """Ensure that a node supports a given OS. 791 792 @param lu: the LU on behalf of which we make the check 793 @param node: the node to check 794 @param os_name: the OS to query about 795 @param force_variant: whether to ignore variant errors 796 @raise errors.OpPrereqError: if the node is not supporting the OS 797 798 """ 799 result = lu.rpc.call_os_get(node, os_name) 800 result.Raise("OS '%s' not in supported OS list for node %s" % 801 (os_name, node), 802 prereq=True, ecode=errors.ECODE_INVAL) 803 if not force_variant: 804 _CheckOSVariant(result.payload, os_name)
805
806 807 -def _RequireFileStorage():
808 """Checks that file storage is enabled. 809 810 @raise errors.OpPrereqError: when file storage is disabled 811 812 """ 813 if not constants.ENABLE_FILE_STORAGE: 814 raise errors.OpPrereqError("File storage disabled at configure time", 815 errors.ECODE_INVAL)
816
817 818 -def _CheckDiskTemplate(template):
819 """Ensure a given disk template is valid. 820 821 """ 822 if template not in constants.DISK_TEMPLATES: 823 msg = ("Invalid disk template name '%s', valid templates are: %s" % 824 (template, utils.CommaJoin(constants.DISK_TEMPLATES))) 825 raise errors.OpPrereqError(msg, errors.ECODE_INVAL) 826 if template == constants.DT_FILE: 827 _RequireFileStorage() 828 return True
829
830 831 -def _CheckStorageType(storage_type):
832 """Ensure a given storage type is valid. 833 834 """ 835 if storage_type not in constants.VALID_STORAGE_TYPES: 836 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, 837 errors.ECODE_INVAL) 838 if storage_type == constants.ST_FILE: 839 _RequireFileStorage() 840 return True
841
842 843 -def _GetClusterDomainSecret():
844 """Reads the cluster domain secret. 845 846 """ 847 return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE, 848 strict=True)
849
850 851 -def _CheckInstanceDown(lu, instance, reason):
852 """Ensure that an instance is not running.""" 853 if instance.admin_up: 854 raise errors.OpPrereqError("Instance %s is marked to be up, %s" % 855 (instance.name, reason), errors.ECODE_STATE) 856 857 pnode = instance.primary_node 858 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] 859 ins_l.Raise("Can't contact node %s for instance information" % pnode, 860 prereq=True, ecode=errors.ECODE_ENVIRON) 861 862 if instance.name in ins_l.payload: 863 raise errors.OpPrereqError("Instance %s is running, %s" % 864 (instance.name, reason), errors.ECODE_STATE)
865
866 867 -def _ExpandItemName(fn, name, kind):
868 """Expand an item name. 869 870 @param fn: the function to use for expansion 871 @param name: requested item name 872 @param kind: text description ('Node' or 'Instance') 873 @return: the resolved (full) name 874 @raise errors.OpPrereqError: if the item is not found 875 876 """ 877 full_name = fn(name) 878 if full_name is None: 879 raise errors.OpPrereqError("%s '%s' not known" % (kind, name), 880 errors.ECODE_NOENT) 881 return full_name
882
883 884 -def _ExpandNodeName(cfg, name):
885 """Wrapper over L{_ExpandItemName} for nodes.""" 886 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
887
888 889 -def _ExpandInstanceName(cfg, name):
890 """Wrapper over L{_ExpandItemName} for instance.""" 891 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
892
893 894 -def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, 895 memory, vcpus, nics, disk_template, disks, 896 bep, hvp, hypervisor_name):
897 """Builds instance related env variables for hooks 898 899 This builds the hook environment from individual variables. 900 901 @type name: string 902 @param name: the name of the instance 903 @type primary_node: string 904 @param primary_node: the name of the instance's primary node 905 @type secondary_nodes: list 906 @param secondary_nodes: list of secondary nodes as strings 907 @type os_type: string 908 @param os_type: the name of the instance's OS 909 @type status: boolean 910 @param status: the should_run status of the instance 911 @type memory: string 912 @param memory: the memory size of the instance 913 @type vcpus: string 914 @param vcpus: the count of VCPUs the instance has 915 @type nics: list 916 @param nics: list of tuples (ip, mac, mode, link) representing 917 the NICs the instance has 918 @type disk_template: string 919 @param disk_template: the disk template of the instance 920 @type disks: list 921 @param disks: the list of (size, mode) pairs 922 @type bep: dict 923 @param bep: the backend parameters for the instance 924 @type hvp: dict 925 @param hvp: the hypervisor parameters for the instance 926 @type hypervisor_name: string 927 @param hypervisor_name: the hypervisor for the instance 928 @rtype: dict 929 @return: the hook environment for this instance 930 931 """ 932 if status: 933 str_status = "up" 934 else: 935 str_status = "down" 936 env = { 937 "OP_TARGET": name, 938 "INSTANCE_NAME": name, 939 "INSTANCE_PRIMARY": primary_node, 940 "INSTANCE_SECONDARIES": " ".join(secondary_nodes), 941 "INSTANCE_OS_TYPE": os_type, 942 "INSTANCE_STATUS": str_status, 943 "INSTANCE_MEMORY": memory, 944 "INSTANCE_VCPUS": vcpus, 945 "INSTANCE_DISK_TEMPLATE": disk_template, 946 "INSTANCE_HYPERVISOR": hypervisor_name, 947 } 948 949 if nics: 950 nic_count = len(nics) 951 for idx, (ip, mac, mode, link) in enumerate(nics): 952 if ip is None: 953 ip = "" 954 env["INSTANCE_NIC%d_IP" % idx] = ip 955 env["INSTANCE_NIC%d_MAC" % idx] = mac 956 env["INSTANCE_NIC%d_MODE" % idx] = mode 957 env["INSTANCE_NIC%d_LINK" % idx] = link 958 if mode == constants.NIC_MODE_BRIDGED: 959 env["INSTANCE_NIC%d_BRIDGE" % idx] = link 960 else: 961 nic_count = 0 962 963 env["INSTANCE_NIC_COUNT"] = nic_count 964 965 if disks: 966 disk_count = len(disks) 967 for idx, (size, mode) in enumerate(disks): 968 env["INSTANCE_DISK%d_SIZE" % idx] = size 969 env["INSTANCE_DISK%d_MODE" % idx] = mode 970 else: 971 disk_count = 0 972 973 env["INSTANCE_DISK_COUNT"] = disk_count 974 975 for source, kind in [(bep, "BE"), (hvp, "HV")]: 976 for key, value in source.items(): 977 env["INSTANCE_%s_%s" % (kind, key)] = value 978 979 return env
980
981 982 -def _NICListToTuple(lu, nics):
983 """Build a list of nic information tuples. 984 985 This list is suitable to be passed to _BuildInstanceHookEnv or as a return 986 value in LUQueryInstanceData. 987 988 @type lu: L{LogicalUnit} 989 @param lu: the logical unit on whose behalf we execute 990 @type nics: list of L{objects.NIC} 991 @param nics: list of nics to convert to hooks tuples 992 993 """ 994 hooks_nics = [] 995 cluster = lu.cfg.GetClusterInfo() 996 for nic in nics: 997 ip = nic.ip 998 mac = nic.mac 999 filled_params = cluster.SimpleFillNIC(nic.nicparams) 1000 mode = filled_params[constants.NIC_MODE] 1001 link = filled_params[constants.NIC_LINK] 1002 hooks_nics.append((ip, mac, mode, link)) 1003 return hooks_nics
1004
1005 1006 -def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1007 """Builds instance related env variables for hooks from an object. 1008 1009 @type lu: L{LogicalUnit} 1010 @param lu: the logical unit on whose behalf we execute 1011 @type instance: L{objects.Instance} 1012 @param instance: the instance for which we should build the 1013 environment 1014 @type override: dict 1015 @param override: dictionary with key/values that will override 1016 our values 1017 @rtype: dict 1018 @return: the hook environment dictionary 1019 1020 """ 1021 cluster = lu.cfg.GetClusterInfo() 1022 bep = cluster.FillBE(instance) 1023 hvp = cluster.FillHV(instance) 1024 args = { 1025 'name': instance.name, 1026 'primary_node': instance.primary_node, 1027 'secondary_nodes': instance.secondary_nodes, 1028 'os_type': instance.os, 1029 'status': instance.admin_up, 1030 'memory': bep[constants.BE_MEMORY], 1031 'vcpus': bep[constants.BE_VCPUS], 1032 'nics': _NICListToTuple(lu, instance.nics), 1033 'disk_template': instance.disk_template, 1034 'disks': [(disk.size, disk.mode) for disk in instance.disks], 1035 'bep': bep, 1036 'hvp': hvp, 1037 'hypervisor_name': instance.hypervisor, 1038 } 1039 if override: 1040 args.update(override) 1041 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1042
1043 1044 -def _AdjustCandidatePool(lu, exceptions):
1045 """Adjust the candidate pool after node operations. 1046 1047 """ 1048 mod_list = lu.cfg.MaintainCandidatePool(exceptions) 1049 if mod_list: 1050 lu.LogInfo("Promoted nodes to master candidate role: %s", 1051 utils.CommaJoin(node.name for node in mod_list)) 1052 for name in mod_list: 1053 lu.context.ReaddNode(name) 1054 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions) 1055 if mc_now > mc_max: 1056 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" % 1057 (mc_now, mc_max))
1058
1059 1060 -def _DecideSelfPromotion(lu, exceptions=None):
1061 """Decide whether I should promote myself as a master candidate. 1062 1063 """ 1064 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 1065 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 1066 # the new node will increase mc_max with one, so: 1067 mc_should = min(mc_should + 1, cp_size) 1068 return mc_now < mc_should
1069
1070 1071 -def _CheckNicsBridgesExist(lu, target_nics, target_node):
1072 """Check that the brigdes needed by a list of nics exist. 1073 1074 """ 1075 cluster = lu.cfg.GetClusterInfo() 1076 paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics] 1077 brlist = [params[constants.NIC_LINK] for params in paramslist 1078 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED] 1079 if brlist: 1080 result = lu.rpc.call_bridges_exist(target_node, brlist) 1081 result.Raise("Error checking bridges on destination node '%s'" % 1082 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1083
1084 1085 -def _CheckInstanceBridgesExist(lu, instance, node=None):
1086 """Check that the brigdes needed by an instance exist. 1087 1088 """ 1089 if node is None: 1090 node = instance.primary_node 1091 _CheckNicsBridgesExist(lu, instance.nics, node)
1092
1093 1094 -def _CheckOSVariant(os_obj, name):
1095 """Check whether an OS name conforms to the os variants specification. 1096 1097 @type os_obj: L{objects.OS} 1098 @param os_obj: OS object to check 1099 @type name: string 1100 @param name: OS name passed by the user, to check for validity 1101 1102 """ 1103 if not os_obj.supported_variants: 1104 return 1105 variant = objects.OS.GetVariant(name) 1106 if not variant: 1107 raise errors.OpPrereqError("OS name must include a variant", 1108 errors.ECODE_INVAL) 1109 1110 if variant not in os_obj.supported_variants: 1111 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1112
1113 1114 -def _GetNodeInstancesInner(cfg, fn):
1115 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1116
1117 1118 -def _GetNodeInstances(cfg, node_name):
1119 """Returns a list of all primary and secondary instances on a node. 1120 1121 """ 1122 1123 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1124
1125 1126 -def _GetNodePrimaryInstances(cfg, node_name):
1127 """Returns primary instances on a node. 1128 1129 """ 1130 return _GetNodeInstancesInner(cfg, 1131 lambda inst: node_name == inst.primary_node)
1132
1133 1134 -def _GetNodeSecondaryInstances(cfg, node_name):
1135 """Returns secondary instances on a node. 1136 1137 """ 1138 return _GetNodeInstancesInner(cfg, 1139 lambda inst: node_name in inst.secondary_nodes)
1140
1141 1142 -def _GetStorageTypeArgs(cfg, storage_type):
1143 """Returns the arguments for a storage type. 1144 1145 """ 1146 # Special case for file storage 1147 if storage_type == constants.ST_FILE: 1148 # storage.FileStorage wants a list of storage directories 1149 return [[cfg.GetFileStorageDir()]] 1150 1151 return []
1152
1153 1154 -def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1155 faulty = [] 1156 1157 for dev in instance.disks: 1158 cfg.SetDiskID(dev, node_name) 1159 1160 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) 1161 result.Raise("Failed to get disk status from node %s" % node_name, 1162 prereq=prereq, ecode=errors.ECODE_ENVIRON) 1163 1164 for idx, bdev_status in enumerate(result.payload): 1165 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: 1166 faulty.append(idx) 1167 1168 return faulty
1169
1170 1171 -def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1172 """Check the sanity of iallocator and node arguments and use the 1173 cluster-wide iallocator if appropriate. 1174 1175 Check that at most one of (iallocator, node) is specified. If none is 1176 specified, then the LU's opcode's iallocator slot is filled with the 1177 cluster-wide default iallocator. 1178 1179 @type iallocator_slot: string 1180 @param iallocator_slot: the name of the opcode iallocator slot 1181 @type node_slot: string 1182 @param node_slot: the name of the opcode target node slot 1183 1184 """ 1185 node = getattr(lu.op, node_slot, None) 1186 iallocator = getattr(lu.op, iallocator_slot, None) 1187 1188 if node is not None and iallocator is not None: 1189 raise errors.OpPrereqError("Do not specify both, iallocator and node.", 1190 errors.ECODE_INVAL) 1191 elif node is None and iallocator is None: 1192 default_iallocator = lu.cfg.GetDefaultIAllocator() 1193 if default_iallocator: 1194 setattr(lu.op, iallocator_slot, default_iallocator) 1195 else: 1196 raise errors.OpPrereqError("No iallocator or node given and no" 1197 " cluster-wide default iallocator found." 1198 " Please specify either an iallocator or a" 1199 " node, or set a cluster-wide default" 1200 " iallocator.")
1201
1202 1203 -class LUPostInitCluster(LogicalUnit):
1204 """Logical unit for running hooks after cluster initialization. 1205 1206 """ 1207 HPATH = "cluster-init" 1208 HTYPE = constants.HTYPE_CLUSTER 1209
1210 - def BuildHooksEnv(self):
1211 """Build hooks env. 1212 1213 """ 1214 env = {"OP_TARGET": self.cfg.GetClusterName()} 1215 mn = self.cfg.GetMasterNode() 1216 return env, [], [mn]
1217
1218 - def Exec(self, feedback_fn):
1219 """Nothing to do. 1220 1221 """ 1222 return True
1223
1224 1225 -class LUDestroyCluster(LogicalUnit):
1226 """Logical unit for destroying the cluster. 1227 1228 """ 1229 HPATH = "cluster-destroy" 1230 HTYPE = constants.HTYPE_CLUSTER 1231
1232 - def BuildHooksEnv(self):
1233 """Build hooks env. 1234 1235 """ 1236 env = {"OP_TARGET": self.cfg.GetClusterName()} 1237 return env, [], []
1238
1239 - def CheckPrereq(self):
1240 """Check prerequisites. 1241 1242 This checks whether the cluster is empty. 1243 1244 Any errors are signaled by raising errors.OpPrereqError. 1245 1246 """ 1247 master = self.cfg.GetMasterNode() 1248 1249 nodelist = self.cfg.GetNodeList() 1250 if len(nodelist) != 1 or nodelist[0] != master: 1251 raise errors.OpPrereqError("There are still %d node(s) in" 1252 " this cluster." % (len(nodelist) - 1), 1253 errors.ECODE_INVAL) 1254 instancelist = self.cfg.GetInstanceList() 1255 if instancelist: 1256 raise errors.OpPrereqError("There are still %d instance(s) in" 1257 " this cluster." % len(instancelist), 1258 errors.ECODE_INVAL)
1259
1260 - def Exec(self, feedback_fn):
1261 """Destroys the cluster. 1262 1263 """ 1264 master = self.cfg.GetMasterNode() 1265 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1266 1267 # Run post hooks on master node before it's removed 1268 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) 1269 try: 1270 hm.RunPhase(constants.HOOKS_PHASE_POST, [master]) 1271 except: 1272 # pylint: disable-msg=W0702 1273 self.LogWarning("Errors occurred running hooks on %s" % master) 1274 1275 result = self.rpc.call_node_stop_master(master, False) 1276 result.Raise("Could not disable the master role") 1277 1278 if modify_ssh_setup: 1279 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) 1280 utils.CreateBackup(priv_key) 1281 utils.CreateBackup(pub_key) 1282 1283 return master
1284
1285 1286 -def _VerifyCertificate(filename):
1287 """Verifies a certificate for LUVerifyCluster. 1288 1289 @type filename: string 1290 @param filename: Path to PEM file 1291 1292 """ 1293 try: 1294 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1295 utils.ReadFile(filename)) 1296 except Exception, err: # pylint: disable-msg=W0703 1297 return (LUVerifyCluster.ETYPE_ERROR, 1298 "Failed to load X509 certificate %s: %s" % (filename, err)) 1299 1300 (errcode, msg) = \ 1301 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN, 1302 constants.SSL_CERT_EXPIRATION_ERROR) 1303 1304 if msg: 1305 fnamemsg = "While verifying %s: %s" % (filename, msg) 1306 else: 1307 fnamemsg = None 1308 1309 if errcode is None: 1310 return (None, fnamemsg) 1311 elif errcode == utils.CERT_WARNING: 1312 return (LUVerifyCluster.ETYPE_WARNING, fnamemsg) 1313 elif errcode == utils.CERT_ERROR: 1314 return (LUVerifyCluster.ETYPE_ERROR, fnamemsg) 1315 1316 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1317
1318 1319 -class LUVerifyCluster(LogicalUnit):
1320 """Verifies the cluster status. 1321 1322 """ 1323 HPATH = "cluster-verify" 1324 HTYPE = constants.HTYPE_CLUSTER 1325 _OP_PARAMS = [ 1326 ("skip_checks", _EmptyList, 1327 _TListOf(_TElemOf(constants.VERIFY_OPTIONAL_CHECKS))), 1328 ("verbose", False, _TBool), 1329 ("error_codes", False, _TBool), 1330 ("debug_simulate_errors", False, _TBool), 1331 ] 1332 REQ_BGL = False 1333 1334 TCLUSTER = "cluster" 1335 TNODE = "node" 1336 TINSTANCE = "instance" 1337 1338 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") 1339 ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") 1340 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") 1341 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") 1342 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") 1343 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") 1344 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") 1345 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") 1346 ENODEDRBD = (TNODE, "ENODEDRBD") 1347 ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER") 1348 ENODEFILECHECK = (TNODE, "ENODEFILECHECK") 1349 ENODEHOOKS = (TNODE, "ENODEHOOKS") 1350 ENODEHV = (TNODE, "ENODEHV") 1351 ENODELVM = (TNODE, "ENODELVM") 1352 ENODEN1 = (TNODE, "ENODEN1") 1353 ENODENET = (TNODE, "ENODENET") 1354 ENODEOS = (TNODE, "ENODEOS") 1355 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE") 1356 ENODEORPHANLV = (TNODE, "ENODEORPHANLV") 1357 ENODERPC = (TNODE, "ENODERPC") 1358 ENODESSH = (TNODE, "ENODESSH") 1359 ENODEVERSION = (TNODE, "ENODEVERSION") 1360 ENODESETUP = (TNODE, "ENODESETUP") 1361 ENODETIME = (TNODE, "ENODETIME") 1362 1363 ETYPE_FIELD = "code" 1364 ETYPE_ERROR = "ERROR" 1365 ETYPE_WARNING = "WARNING" 1366
1367 - class NodeImage(object):
1368 """A class representing the logical and physical status of a node. 1369 1370 @type name: string 1371 @ivar name: the node name to which this object refers 1372 @ivar volumes: a structure as returned from 1373 L{ganeti.backend.GetVolumeList} (runtime) 1374 @ivar instances: a list of running instances (runtime) 1375 @ivar pinst: list of configured primary instances (config) 1376 @ivar sinst: list of configured secondary instances (config) 1377 @ivar sbp: diction of {secondary-node: list of instances} of all peers 1378 of this node (config) 1379 @ivar mfree: free memory, as reported by hypervisor (runtime) 1380 @ivar dfree: free disk, as reported by the node (runtime) 1381 @ivar offline: the offline status (config) 1382 @type rpc_fail: boolean 1383 @ivar rpc_fail: whether the RPC verify call was successfull (overall, 1384 not whether the individual keys were correct) (runtime) 1385 @type lvm_fail: boolean 1386 @ivar lvm_fail: whether the RPC call didn't return valid LVM data 1387 @type hyp_fail: boolean 1388 @ivar hyp_fail: whether the RPC call didn't return the instance list 1389 @type ghost: boolean 1390 @ivar ghost: whether this is a known node or not (config) 1391 @type os_fail: boolean 1392 @ivar os_fail: whether the RPC call didn't return valid OS data 1393 @type oslist: list 1394 @ivar oslist: list of OSes as diagnosed by DiagnoseOS 1395 1396 """
1397 - def __init__(self, offline=False, name=None):
1398 self.name = name 1399 self.volumes = {} 1400 self.instances = [] 1401 self.pinst = [] 1402 self.sinst = [] 1403 self.sbp = {} 1404 self.mfree = 0 1405 self.dfree = 0 1406 self.offline = offline 1407 self.rpc_fail = False 1408 self.lvm_fail = False 1409 self.hyp_fail = False 1410 self.ghost = False 1411 self.os_fail = False 1412 self.oslist = {}
1413
1414 - def ExpandNames(self):
1415 self.needed_locks = { 1416 locking.LEVEL_NODE: locking.ALL_SET, 1417 locking.LEVEL_INSTANCE: locking.ALL_SET, 1418 } 1419 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1420
1421 - def _Error(self, ecode, item, msg, *args, **kwargs):
1422 """Format an error message. 1423 1424 Based on the opcode's error_codes parameter, either format a 1425 parseable error code, or a simpler error string. 1426 1427 This must be called only from Exec and functions called from Exec. 1428 1429 """ 1430 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) 1431 itype, etxt = ecode 1432 # first complete the msg 1433 if args: 1434 msg = msg % args 1435 # then format the whole message 1436 if self.op.error_codes: 1437 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) 1438 else: 1439 if item: 1440 item = " " + item 1441 else: 1442 item = "" 1443 msg = "%s: %s%s: %s" % (ltype, itype, item, msg) 1444 # and finally report it via the feedback_fn 1445 self._feedback_fn(" - %s" % msg)
1446
1447 - def _ErrorIf(self, cond, *args, **kwargs):
1448 """Log an error message if the passed condition is True. 1449 1450 """ 1451 cond = bool(cond) or self.op.debug_simulate_errors 1452 if cond: 1453 self._Error(*args, **kwargs) 1454 # do not mark the operation as failed for WARN cases only 1455 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: 1456 self.bad = self.bad or cond
1457
1458 - def _VerifyNode(self, ninfo, nresult):
1459 """Perform some basic validation on data returned from a node. 1460 1461 - check the result data structure is well formed and has all the 1462 mandatory fields 1463 - check ganeti version 1464 1465 @type ninfo: L{objects.Node} 1466 @param ninfo: the node to check 1467 @param nresult: the results from the node 1468 @rtype: boolean 1469 @return: whether overall this call was successful (and we can expect 1470 reasonable values in the respose) 1471 1472 """ 1473 node = ninfo.name 1474 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1475 1476 # main result, nresult should be a non-empty dict 1477 test = not nresult or not isinstance(nresult, dict) 1478 _ErrorIf(test, self.ENODERPC, node, 1479 "unable to verify node: no data returned") 1480 if test: 1481 return False 1482 1483 # compares ganeti version 1484 local_version = constants.PROTOCOL_VERSION 1485 remote_version = nresult.get("version", None) 1486 test = not (remote_version and 1487 isinstance(remote_version, (list, tuple)) and 1488 len(remote_version) == 2) 1489 _ErrorIf(test, self.ENODERPC, node, 1490 "connection to node returned invalid data") 1491 if test: 1492 return False 1493 1494 test = local_version != remote_version[0] 1495 _ErrorIf(test, self.ENODEVERSION, node, 1496 "incompatible protocol versions: master %s," 1497 " node %s", local_version, remote_version[0]) 1498 if test: 1499 return False 1500 1501 # node seems compatible, we can actually try to look into its results 1502 1503 # full package version 1504 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], 1505 self.ENODEVERSION, node, 1506 "software version mismatch: master %s, node %s", 1507 constants.RELEASE_VERSION, remote_version[1], 1508 code=self.ETYPE_WARNING) 1509 1510 hyp_result = nresult.get(constants.NV_HYPERVISOR, None) 1511 if isinstance(hyp_result, dict): 1512 for hv_name, hv_result in hyp_result.iteritems(): 1513 test = hv_result is not None 1514 _ErrorIf(test, self.ENODEHV, node, 1515 "hypervisor %s verify failure: '%s'", hv_name, hv_result) 1516 1517 1518 test = nresult.get(constants.NV_NODESETUP, 1519 ["Missing NODESETUP results"]) 1520 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", 1521 "; ".join(test)) 1522 1523 return True
1524
1525 - def _VerifyNodeTime(self, ninfo, nresult, 1526 nvinfo_starttime, nvinfo_endtime):
1527 """Check the node time. 1528 1529 @type ninfo: L{objects.Node} 1530 @param ninfo: the node to check 1531 @param nresult: the remote results for the node 1532 @param nvinfo_starttime: the start time of the RPC call 1533 @param nvinfo_endtime: the end time of the RPC call 1534 1535 """ 1536 node = ninfo.name 1537 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1538 1539 ntime = nresult.get(constants.NV_TIME, None) 1540 try: 1541 ntime_merged = utils.MergeTime(ntime) 1542 except (ValueError, TypeError): 1543 _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time") 1544 return 1545 1546 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): 1547 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged) 1548 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW): 1549 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime) 1550 else: 1551 ntime_diff = None 1552 1553 _ErrorIf(ntime_diff is not None, self.ENODETIME, node, 1554 "Node time diverges by at least %s from master node time", 1555 ntime_diff)
1556
1557 - def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1558 """Check the node time. 1559 1560 @type ninfo: L{objects.Node} 1561 @param ninfo: the node to check 1562 @param nresult: the remote results for the node 1563 @param vg_name: the configured VG name 1564 1565 """ 1566 if vg_name is None: 1567 return 1568 1569 node = ninfo.name 1570 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1571 1572 # checks vg existence and size > 20G 1573 vglist = nresult.get(constants.NV_VGLIST, None) 1574 test = not vglist 1575 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") 1576 if not test: 1577 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, 1578 constants.MIN_VG_SIZE) 1579 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) 1580 1581 # check pv names 1582 pvlist = nresult.get(constants.NV_PVLIST, None) 1583 test = pvlist is None 1584 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node") 1585 if not test: 1586 # check that ':' is not present in PV names, since it's a 1587 # special character for lvcreate (denotes the range of PEs to 1588 # use on the PV) 1589 for _, pvname, owner_vg in pvlist: 1590 test = ":" in pvname 1591 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" 1592 " '%s' of VG '%s'", pvname, owner_vg)
1593
1594 - def _VerifyNodeNetwork(self, ninfo, nresult):
1595 """Check the node time. 1596 1597 @type ninfo: L{objects.Node} 1598 @param ninfo: the node to check 1599 @param nresult: the remote results for the node 1600 1601 """ 1602 node = ninfo.name 1603 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1604 1605 test = constants.NV_NODELIST not in nresult 1606 _ErrorIf(test, self.ENODESSH, node, 1607 "node hasn't returned node ssh connectivity data") 1608 if not test: 1609 if nresult[constants.NV_NODELIST]: 1610 for a_node, a_msg in nresult[constants.NV_NODELIST].items(): 1611 _ErrorIf(True, self.ENODESSH, node, 1612 "ssh communication with node '%s': %s", a_node, a_msg) 1613 1614 test = constants.NV_NODENETTEST not in nresult 1615 _ErrorIf(test, self.ENODENET, node, 1616 "node hasn't returned node tcp connectivity data") 1617 if not test: 1618 if nresult[constants.NV_NODENETTEST]: 1619 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys()) 1620 for anode in nlist: 1621 _ErrorIf(True, self.ENODENET, node, 1622 "tcp communication with node '%s': %s", 1623 anode, nresult[constants.NV_NODENETTEST][anode]) 1624 1625 test = constants.NV_MASTERIP not in nresult 1626 _ErrorIf(test, self.ENODENET, node, 1627 "node hasn't returned node master IP reachability data") 1628 if not test: 1629 if not nresult[constants.NV_MASTERIP]: 1630 if node == self.master_node: 1631 msg = "the master node cannot reach the master IP (not configured?)" 1632 else: 1633 msg = "cannot reach the master IP" 1634 _ErrorIf(True, self.ENODENET, node, msg)
1635 1636
1637 - def _VerifyInstance(self, instance, instanceconfig, node_image):
1638 """Verify an instance. 1639 1640 This function checks to see if the required block devices are 1641 available on the instance's node. 1642 1643 """ 1644 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1645 node_current = instanceconfig.primary_node 1646 1647 node_vol_should = {} 1648 instanceconfig.MapLVsByNode(node_vol_should) 1649 1650 for node in node_vol_should: 1651 n_img = node_image[node] 1652 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: 1653 # ignore missing volumes on offline or broken nodes 1654 continue 1655 for volume in node_vol_should[node]: 1656 test = volume not in n_img.volumes 1657 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance, 1658 "volume %s missing on node %s", volume, node) 1659 1660 if instanceconfig.admin_up: 1661 pri_img = node_image[node_current] 1662 test = instance not in pri_img.instances and not pri_img.offline 1663 _ErrorIf(test, self.EINSTANCEDOWN, instance, 1664 "instance not running on its primary node %s", 1665 node_current) 1666 1667 for node, n_img in node_image.items(): 1668 if (not node == node_current): 1669 test = instance in n_img.instances 1670 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, 1671 "instance should not run on node %s", node)
1672
1673 - def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1674 """Verify if there are any unknown volumes in the cluster. 1675 1676 The .os, .swap and backup volumes are ignored. All other volumes are 1677 reported as unknown. 1678 1679 @type reserved: L{ganeti.utils.FieldSet} 1680 @param reserved: a FieldSet of reserved volume names 1681 1682 """ 1683 for node, n_img in node_image.items(): 1684 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: 1685 # skip non-healthy nodes 1686 continue 1687 for volume in n_img.volumes: 1688 test = ((node not in node_vol_should or 1689 volume not in node_vol_should[node]) and 1690 not reserved.Matches(volume)) 1691 self._ErrorIf(test, self.ENODEORPHANLV, node, 1692 "volume %s is unknown", volume)
1693
1694 - def _VerifyOrphanInstances(self, instancelist, node_image):
1695 """Verify the list of running instances. 1696 1697 This checks what instances are running but unknown to the cluster. 1698 1699 """ 1700 for node, n_img in node_image.items(): 1701 for o_inst in n_img.instances: 1702 test = o_inst not in instancelist 1703 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, 1704 "instance %s on node %s should not exist", o_inst, node)
1705
1706 - def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1707 """Verify N+1 Memory Resilience. 1708 1709 Check that if one single node dies we can still start all the 1710 instances it was primary for. 1711 1712 """ 1713 for node, n_img in node_image.items(): 1714 # This code checks that every node which is now listed as 1715 # secondary has enough memory to host all instances it is 1716 # supposed to should a single other node in the cluster fail. 1717 # FIXME: not ready for failover to an arbitrary node 1718 # FIXME: does not support file-backed instances 1719 # WARNING: we currently take into account down instances as well 1720 # as up ones, considering that even if they're down someone 1721 # might want to start them even in the event of a node failure. 1722 for prinode, instances in n_img.sbp.items(): 1723 needed_mem = 0 1724 for instance in instances: 1725 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance]) 1726 if bep[constants.BE_AUTO_BALANCE]: 1727 needed_mem += bep[constants.BE_MEMORY] 1728 test = n_img.mfree < needed_mem 1729 self._ErrorIf(test, self.ENODEN1, node, 1730 "not enough memory on to accommodate" 1731 " failovers should peer node %s fail", prinode)
1732
1733 - def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum, 1734 master_files):
1735 """Verifies and computes the node required file checksums. 1736 1737 @type ninfo: L{objects.Node} 1738 @param ninfo: the node to check 1739 @param nresult: the remote results for the node 1740 @param file_list: required list of files 1741 @param local_cksum: dictionary of local files and their checksums 1742 @param master_files: list of files that only masters should have 1743 1744 """ 1745 node = ninfo.name 1746 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1747 1748 remote_cksum = nresult.get(constants.NV_FILELIST, None) 1749 test = not isinstance(remote_cksum, dict) 1750 _ErrorIf(test, self.ENODEFILECHECK, node, 1751 "node hasn't returned file checksum data") 1752 if test: 1753 return 1754 1755 for file_name in file_list: 1756 node_is_mc = ninfo.master_candidate 1757 must_have = (file_name not in master_files) or node_is_mc 1758 # missing 1759 test1 = file_name not in remote_cksum 1760 # invalid checksum 1761 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] 1762 # existing and good 1763 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] 1764 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, 1765 "file '%s' missing", file_name) 1766 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, 1767 "file '%s' has wrong checksum", file_name) 1768 # not candidate and this is not a must-have file 1769 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, 1770 "file '%s' should not exist on non master" 1771 " candidates (and the file is outdated)", file_name) 1772 # all good, except non-master/non-must have combination 1773 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, 1774 "file '%s' should not exist" 1775 " on non master candidates", file_name)
1776
1777 - def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper, 1778 drbd_map):
1779 """Verifies and the node DRBD status. 1780 1781 @type ninfo: L{objects.Node} 1782 @param ninfo: the node to check 1783 @param nresult: the remote results for the node 1784 @param instanceinfo: the dict of instances 1785 @param drbd_helper: the configured DRBD usermode helper 1786 @param drbd_map: the DRBD map as returned by 1787 L{ganeti.config.ConfigWriter.ComputeDRBDMap} 1788 1789 """ 1790 node = ninfo.name 1791 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1792 1793 if drbd_helper: 1794 helper_result = nresult.get(constants.NV_DRBDHELPER, None) 1795 test = (helper_result == None) 1796 _ErrorIf(test, self.ENODEDRBDHELPER, node, 1797 "no drbd usermode helper returned") 1798 if helper_result: 1799 status, payload = helper_result 1800 test = not status 1801 _ErrorIf(test, self.ENODEDRBDHELPER, node, 1802 "drbd usermode helper check unsuccessful: %s", payload) 1803 test = status and (payload != drbd_helper) 1804 _ErrorIf(test, self.ENODEDRBDHELPER, node, 1805 "wrong drbd usermode helper: %s", payload) 1806 1807 # compute the DRBD minors 1808 node_drbd = {} 1809 for minor, instance in drbd_map[node].items(): 1810 test = instance not in instanceinfo 1811 _ErrorIf(test, self.ECLUSTERCFG, None, 1812 "ghost instance '%s' in temporary DRBD map", instance) 1813 # ghost instance should not be running, but otherwise we 1814 # don't give double warnings (both ghost instance and 1815 # unallocated minor in use) 1816 if test: 1817 node_drbd[minor] = (instance, False) 1818 else: 1819 instance = instanceinfo[instance] 1820 node_drbd[minor] = (instance.name, instance.admin_up) 1821 1822 # and now check them 1823 used_minors = nresult.get(constants.NV_DRBDLIST, []) 1824 test = not isinstance(used_minors, (tuple, list)) 1825 _ErrorIf(test, self.ENODEDRBD, node, 1826 "cannot parse drbd status file: %s", str(used_minors)) 1827 if test: 1828 # we cannot check drbd status 1829 return 1830 1831 for minor, (iname, must_exist) in node_drbd.items(): 1832 test = minor not in used_minors and must_exist 1833 _ErrorIf(test, self.ENODEDRBD, node, 1834 "drbd minor %d of instance %s is not active", minor, iname) 1835 for minor in used_minors: 1836 test = minor not in node_drbd 1837 _ErrorIf(test, self.ENODEDRBD, node, 1838 "unallocated drbd minor %d is in use", minor)
1839
1840 - def _UpdateNodeOS(self, ninfo, nresult, nimg):
1841 """Builds the node OS structures. 1842 1843 @type ninfo: L{objects.Node} 1844 @param ninfo: the node to check 1845 @param nresult: the remote results for the node 1846 @param nimg: the node image object 1847 1848 """ 1849 node = ninfo.name 1850 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1851 1852 remote_os = nresult.get(constants.NV_OSLIST, None) 1853 test = (not isinstance(remote_os, list) or 1854 not compat.all(isinstance(v, list) and len(v) == 7 1855 for v in remote_os)) 1856 1857 _ErrorIf(test, self.ENODEOS, node, 1858 "node hasn't returned valid OS data") 1859 1860 nimg.os_fail = test 1861 1862 if test: 1863 return 1864 1865 os_dict = {} 1866 1867 for (name, os_path, status, diagnose, 1868 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]: 1869 1870 if name not in os_dict: 1871 os_dict[name] = [] 1872 1873 # parameters is a list of lists instead of list of tuples due to 1874 # JSON lacking a real tuple type, fix it: 1875 parameters = [tuple(v) for v in parameters] 1876 os_dict[name].append((os_path, status, diagnose, 1877 set(variants), set(parameters), set(api_ver))) 1878 1879 nimg.oslist = os_dict
1880
1881 - def _VerifyNodeOS(self, ninfo, nimg, base):
1882 """Verifies the node OS list. 1883 1884 @type ninfo: L{objects.Node} 1885 @param ninfo: the node to check 1886 @param nimg: the node image object 1887 @param base: the 'template' node we match against (e.g. from the master) 1888 1889 """ 1890 node = ninfo.name 1891 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1892 1893 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?" 1894 1895 for os_name, os_data in nimg.oslist.items(): 1896 assert os_data, "Empty OS status for OS %s?!" % os_name 1897 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0] 1898 _ErrorIf(not f_status, self.ENODEOS, node, 1899 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag) 1900 _ErrorIf(len(os_data) > 1, self.ENODEOS, node, 1901 "OS '%s' has multiple entries (first one shadows the rest): %s", 1902 os_name, utils.CommaJoin([v[0] for v in os_data])) 1903 # this will catched in backend too 1904 _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api) 1905 and not f_var, self.ENODEOS, node, 1906 "OS %s with API at least %d does not declare any variant", 1907 os_name, constants.OS_API_V15) 1908 # comparisons with the 'base' image 1909 test = os_name not in base.oslist 1910 _ErrorIf(test, self.ENODEOS, node, 1911 "Extra OS %s not present on reference node (%s)", 1912 os_name, base.name) 1913 if test: 1914 continue 1915 assert base.oslist[os_name], "Base node has empty OS status?" 1916 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0] 1917 if not b_status: 1918 # base OS is invalid, skipping 1919 continue 1920 for kind, a, b in [("API version", f_api, b_api), 1921 ("variants list", f_var, b_var), 1922 ("parameters", f_param, b_param)]: 1923 _ErrorIf(a != b, self.ENODEOS, node, 1924 "OS %s %s differs from reference node %s: %s vs. %s", 1925 kind, os_name, base.name, 1926 utils.CommaJoin(a), utils.CommaJoin(b)) 1927 1928 # check any missing OSes 1929 missing = set(base.oslist.keys()).difference(nimg.oslist.keys()) 1930 _ErrorIf(missing, self.ENODEOS, node, 1931 "OSes present on reference node %s but missing on this node: %s", 1932 base.name, utils.CommaJoin(missing))
1933
1934 - def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1935 """Verifies and updates the node volume data. 1936 1937 This function will update a L{NodeImage}'s internal structures 1938 with data from the remote call. 1939 1940 @type ninfo: L{objects.Node} 1941 @param ninfo: the node to check 1942 @param nresult: the remote results for the node 1943 @param nimg: the node image object 1944 @param vg_name: the configured VG name 1945 1946 """ 1947 node = ninfo.name 1948 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1949 1950 nimg.lvm_fail = True 1951 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") 1952 if vg_name is None: 1953 pass 1954 elif isinstance(lvdata, basestring): 1955 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", 1956 utils.SafeEncode(lvdata)) 1957 elif not isinstance(lvdata, dict): 1958 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") 1959 else: 1960 nimg.volumes = lvdata 1961 nimg.lvm_fail = False
1962
1963 - def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1964 """Verifies and updates the node instance list. 1965 1966 If the listing was successful, then updates this node's instance 1967 list. Otherwise, it marks the RPC call as failed for the instance 1968 list key. 1969 1970 @type ninfo: L{objects.Node} 1971 @param ninfo: the node to check 1972 @param nresult: the remote results for the node 1973 @param nimg: the node image object 1974 1975 """ 1976 idata = nresult.get(constants.NV_INSTANCELIST, None) 1977 test = not isinstance(idata, list) 1978 self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed" 1979 " (instancelist): %s", utils.SafeEncode(str(idata))) 1980 if test: 1981 nimg.hyp_fail = True 1982 else: 1983 nimg.instances = idata
1984
1985 - def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1986 """Verifies and computes a node information map 1987 1988 @type ninfo: L{objects.Node} 1989 @param ninfo: the node to check 1990 @param nresult: the remote results for the node 1991 @param nimg: the node image object 1992 @param vg_name: the configured VG name 1993 1994 """ 1995 node = ninfo.name 1996 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1997 1998 # try to read free memory (from the hypervisor) 1999 hv_info = nresult.get(constants.NV_HVINFO, None) 2000 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info 2001 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") 2002 if not test: 2003 try: 2004 nimg.mfree = int(hv_info["memory_free"]) 2005 except (ValueError, TypeError): 2006 _ErrorIf(True, self.ENODERPC, node, 2007 "node returned invalid nodeinfo, check hypervisor") 2008 2009 # FIXME: devise a free space model for file based instances as well 2010 if vg_name is not None: 2011 test = (constants.NV_VGLIST not in nresult or 2012 vg_name not in nresult[constants.NV_VGLIST]) 2013 _ErrorIf(test, self.ENODELVM, node, 2014 "node didn't return data for the volume group '%s'" 2015 " - it is either missing or broken", vg_name) 2016 if not test: 2017 try: 2018 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name]) 2019 except (ValueError, TypeError): 2020 _ErrorIf(True, self.ENODERPC, node, 2021 "node returned invalid LVM info, check LVM status")
2022
2023 - def BuildHooksEnv(self):
2024 """Build hooks env. 2025 2026 Cluster-Verify hooks just ran in the post phase and their failure makes 2027 the output be logged in the verify output and the verification to fail. 2028 2029 """ 2030 all_nodes = self.cfg.GetNodeList() 2031 env = { 2032 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) 2033 } 2034 for node in self.cfg.GetAllNodesInfo().values(): 2035 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags()) 2036 2037 return env, [], all_nodes
2038
2039 - def Exec(self, feedback_fn):
2040 """Verify integrity of cluster, performing various test on nodes. 2041 2042 """ 2043 self.bad = False 2044 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 2045 verbose = self.op.verbose 2046 self._feedback_fn = feedback_fn 2047 feedback_fn("* Verifying global settings") 2048 for msg in self.cfg.VerifyConfig(): 2049 _ErrorIf(True, self.ECLUSTERCFG, None, msg) 2050 2051 # Check the cluster certificates 2052 for cert_filename in constants.ALL_CERT_FILES: 2053 (errcode, msg) = _VerifyCertificate(cert_filename) 2054 _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) 2055 2056 vg_name = self.cfg.GetVGName() 2057 drbd_helper = self.cfg.GetDRBDHelper() 2058 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors 2059 cluster = self.cfg.GetClusterInfo() 2060 nodelist = utils.NiceSort(self.cfg.GetNodeList()) 2061 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] 2062 instancelist = utils.NiceSort(self.cfg.GetInstanceList()) 2063 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname)) 2064 for iname in instancelist) 2065 i_non_redundant = [] # Non redundant instances 2066 i_non_a_balanced = [] # Non auto-balanced instances 2067 n_offline = 0 # Count of offline nodes 2068 n_drained = 0 # Count of nodes being drained 2069 node_vol_should = {} 2070 2071 # FIXME: verify OS list 2072 # do local checksums 2073 master_files = [constants.CLUSTER_CONF_FILE] 2074 master_node = self.master_node = self.cfg.GetMasterNode() 2075 master_ip = self.cfg.GetMasterIP() 2076 2077 file_names = ssconf.SimpleStore().GetFileList() 2078 file_names.extend(constants.ALL_CERT_FILES) 2079 file_names.extend(master_files) 2080 if cluster.modify_etc_hosts: 2081 file_names.append(constants.ETC_HOSTS) 2082 2083 local_checksums = utils.FingerprintFiles(file_names) 2084 2085 feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) 2086 node_verify_param = { 2087 constants.NV_FILELIST: file_names, 2088 constants.NV_NODELIST: [node.name for node in nodeinfo 2089 if not node.offline], 2090 constants.NV_HYPERVISOR: hypervisors, 2091 constants.NV_NODENETTEST: [(node.name, node.primary_ip, 2092 node.secondary_ip) for node in nodeinfo 2093 if not node.offline], 2094 constants.NV_INSTANCELIST: hypervisors, 2095 constants.NV_VERSION: None, 2096 constants.NV_HVINFO: self.cfg.GetHypervisorType(), 2097 constants.NV_NODESETUP: None, 2098 constants.NV_TIME: None, 2099 constants.NV_MASTERIP: (master_node, master_ip), 2100 constants.NV_OSLIST: None, 2101 } 2102 2103 if vg_name is not None: 2104 node_verify_param[constants.NV_VGLIST] = None 2105 node_verify_param[constants.NV_LVLIST] = vg_name 2106 node_verify_param[constants.NV_PVLIST] = [vg_name] 2107 node_verify_param[constants.NV_DRBDLIST] = None 2108 2109 if drbd_helper: 2110 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper 2111 2112 # Build our expected cluster state 2113 node_image = dict((node.name, self.NodeImage(offline=node.offline, 2114 name=node.name)) 2115 for node in nodeinfo) 2116 2117 for instance in instancelist: 2118 inst_config = instanceinfo[instance] 2119 2120 for nname in inst_config.all_nodes: 2121 if nname not in node_image: 2122 # ghost node 2123 gnode = self.NodeImage(name=nname) 2124 gnode.ghost = True 2125 node_image[nname] = gnode 2126 2127 inst_config.MapLVsByNode(node_vol_should) 2128 2129 pnode = inst_config.primary_node 2130 node_image[pnode].pinst.append(instance) 2131 2132 for snode in inst_config.secondary_nodes: 2133 nimg = node_image[snode] 2134 nimg.sinst.append(instance) 2135 if pnode not in nimg.sbp: 2136 nimg.sbp[pnode] = [] 2137 nimg.sbp[pnode].append(instance) 2138 2139 # At this point, we have the in-memory data structures complete, 2140 # except for the runtime information, which we'll gather next 2141 2142 # Due to the way our RPC system works, exact response times cannot be 2143 # guaranteed (e.g. a broken node could run into a timeout). By keeping the 2144 # time before and after executing the request, we can at least have a time 2145 # window. 2146 nvinfo_starttime = time.time() 2147 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, 2148 self.cfg.GetClusterName()) 2149 nvinfo_endtime = time.time() 2150 2151 all_drbd_map = self.cfg.ComputeDRBDMap() 2152 2153 feedback_fn("* Verifying node status") 2154 2155 refos_img = None 2156 2157 for node_i in nodeinfo: 2158 node = node_i.name 2159 nimg = node_image[node] 2160 2161 if node_i.offline: 2162 if verbose: 2163 feedback_fn("* Skipping offline node %s" % (node,)) 2164 n_offline += 1 2165 continue 2166 2167 if node == master_node: 2168 ntype = "master" 2169 elif node_i.master_candidate: 2170 ntype = "master candidate" 2171 elif node_i.drained: 2172 ntype = "drained" 2173 n_drained += 1 2174 else: 2175 ntype = "regular" 2176 if verbose: 2177 feedback_fn("* Verifying node %s (%s)" % (node, ntype)) 2178 2179 msg = all_nvinfo[node].fail_msg 2180 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg) 2181 if msg: 2182 nimg.rpc_fail = True 2183 continue 2184 2185 nresult = all_nvinfo[node].payload 2186 2187 nimg.call_ok = self._VerifyNode(node_i, nresult) 2188 self._VerifyNodeNetwork(node_i, nresult) 2189 self._VerifyNodeLVM(node_i, nresult, vg_name) 2190 self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums, 2191 master_files) 2192 self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper, 2193 all_drbd_map) 2194 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) 2195 2196 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) 2197 self._UpdateNodeInstances(node_i, nresult, nimg) 2198 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name) 2199 self._UpdateNodeOS(node_i, nresult, nimg) 2200 if not nimg.os_fail: 2201 if refos_img is None: 2202 refos_img = nimg 2203 self._VerifyNodeOS(node_i, nimg, refos_img) 2204 2205 feedback_fn("* Verifying instance status") 2206 for instance in instancelist: 2207 if verbose: 2208 feedback_fn("* Verifying instance %s" % instance) 2209 inst_config = instanceinfo[instance] 2210 self._VerifyInstance(instance, inst_config, node_image) 2211 inst_nodes_offline = [] 2212 2213 pnode = inst_config.primary_node 2214 pnode_img = node_image[pnode] 2215 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline, 2216 self.ENODERPC, pnode, "instance %s, connection to" 2217 " primary node failed", instance) 2218 2219 if pnode_img.offline: 2220 inst_nodes_offline.append(pnode) 2221 2222 # If the instance is non-redundant we cannot survive losing its primary 2223 # node, so we are not N+1 compliant. On the other hand we have no disk 2224 # templates with more than one secondary so that situation is not well 2225 # supported either. 2226 # FIXME: does not support file-backed instances 2227 if not inst_config.secondary_nodes: 2228 i_non_redundant.append(instance) 2229 _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT, 2230 instance, "instance has multiple secondary nodes: %s", 2231 utils.CommaJoin(inst_config.secondary_nodes), 2232 code=self.ETYPE_WARNING) 2233 2234 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]: 2235 i_non_a_balanced.append(instance) 2236 2237 for snode in inst_config.secondary_nodes: 2238 s_img = node_image[snode] 2239 _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode, 2240 "instance %s, connection to secondary node failed", instance) 2241 2242 if s_img.offline: 2243 inst_nodes_offline.append(snode) 2244 2245 # warn that the instance lives on offline nodes 2246 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance, 2247 "instance lives on offline node(s) %s", 2248 utils.CommaJoin(inst_nodes_offline)) 2249 # ... or ghost nodes 2250 for node in inst_config.all_nodes: 2251 _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance, 2252 "instance lives on ghost node %s", node) 2253 2254 feedback_fn("* Verifying orphan volumes") 2255 reserved = utils.FieldSet(*cluster.reserved_lvs) 2256 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) 2257 2258 feedback_fn("* Verifying orphan instances") 2259 self._VerifyOrphanInstances(instancelist, node_image) 2260 2261 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks: 2262 feedback_fn("* Verifying N+1 Memory redundancy") 2263 self._VerifyNPlusOneMemory(node_image, instanceinfo) 2264 2265 feedback_fn("* Other Notes") 2266 if i_non_redundant: 2267 feedback_fn(" - NOTICE: %d non-redundant instance(s) found." 2268 % len(i_non_redundant)) 2269 2270 if i_non_a_balanced: 2271 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found." 2272 % len(i_non_a_balanced)) 2273 2274 if n_offline: 2275 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline) 2276 2277 if n_drained: 2278 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained) 2279 2280 return not self.bad
2281
2282 - def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2283 """Analyze the post-hooks' result 2284 2285 This method analyses the hook result, handles it, and sends some 2286 nicely-formatted feedback back to the user. 2287 2288 @param phase: one of L{constants.HOOKS_PHASE_POST} or 2289 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 2290 @param hooks_results: the results of the multi-node hooks rpc call 2291 @param feedback_fn: function used send feedback back to the caller 2292 @param lu_result: previous Exec result 2293 @return: the new Exec result, based on the previous result 2294 and hook results 2295 2296 """ 2297 # We only really run POST phase hooks, and are only interested in 2298 # their results 2299 if phase == constants.HOOKS_PHASE_POST: 2300 # Used to change hooks' output to proper indentation 2301 indent_re = re.compile('^', re.M) 2302 feedback_fn("* Hooks Results") 2303 assert hooks_results, "invalid result from hooks" 2304 2305 for node_name in hooks_results: 2306 res = hooks_results[node_name] 2307 msg = res.fail_msg 2308 test = msg and not res.offline 2309 self._ErrorIf(test, self.ENODEHOOKS, node_name, 2310 "Communication failure in hooks execution: %s", msg) 2311 if res.offline or msg: 2312 # No need to investigate payload if node is offline or gave an error. 2313 # override manually lu_result here as _ErrorIf only 2314 # overrides self.bad 2315 lu_result = 1 2316 continue 2317 for script, hkr, output in res.payload: 2318 test = hkr == constants.HKR_FAIL 2319 self._ErrorIf(test, self.ENODEHOOKS, node_name, 2320 "Script %s failed, output:", script) 2321 if test: 2322 output = indent_re.sub(' ', output) 2323 feedback_fn("%s" % output) 2324 lu_result = 0 2325 2326 return lu_result
2327
2328 2329 -class LUVerifyDisks(NoHooksLU):
2330 """Verifies the cluster disks status. 2331 2332 """ 2333 REQ_BGL = False 2334
2335 - def ExpandNames(self):
2336 self.needed_locks = { 2337 locking.LEVEL_NODE: locking.ALL_SET, 2338 locking.LEVEL_INSTANCE: locking.ALL_SET, 2339 } 2340 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2341
2342 - def Exec(self, feedback_fn):
2343 """Verify integrity of cluster disks. 2344 2345 @rtype: tuple of three items 2346 @return: a tuple of (dict of node-to-node_error, list of instances 2347 which need activate-disks, dict of instance: (node, volume) for 2348 missing volumes 2349 2350 """ 2351 result = res_nodes, res_instances, res_missing = {}, [], {} 2352 2353 vg_name = self.cfg.GetVGName() 2354 nodes = utils.NiceSort(self.cfg.GetNodeList()) 2355 instances = [self.cfg.GetInstanceInfo(name) 2356 for name in self.cfg.GetInstanceList()] 2357 2358 nv_dict = {} 2359 for inst in instances: 2360 inst_lvs = {} 2361 if (not inst.admin_up or 2362 inst.disk_template not in constants.DTS_NET_MIRROR): 2363 continue 2364 inst.MapLVsByNode(inst_lvs) 2365 # transform { iname: {node: [vol,],},} to {(node, vol): iname} 2366 for node, vol_list in inst_lvs.iteritems(): 2367 for vol in vol_list: 2368 nv_dict[(node, vol)] = inst 2369 2370 if not nv_dict: 2371 return result 2372 2373 node_lvs = self.rpc.call_lv_list(nodes, vg_name) 2374 2375 for node in nodes: 2376 # node_volume 2377 node_res = node_lvs[node] 2378 if node_res.offline: 2379 continue 2380 msg = node_res.fail_msg 2381 if msg: 2382 logging.warning("Error enumerating LVs on node %s: %s", node, msg) 2383 res_nodes[node] = msg 2384 continue 2385 2386 lvs = node_res.payload 2387 for lv_name, (_, _, lv_online) in lvs.items(): 2388 inst = nv_dict.pop((node, lv_name), None) 2389 if (not lv_online and inst is not None 2390 and inst.name not in res_instances): 2391 res_instances.append(inst.name) 2392 2393 # any leftover items in nv_dict are missing LVs, let's arrange the 2394 # data better 2395 for key, inst in nv_dict.iteritems(): 2396 if inst.name not in res_missing: 2397 res_missing[inst.name] = [] 2398 res_missing[inst.name].append(key) 2399 2400 return result
2401
2402 2403 -class LURepairDiskSizes(NoHooksLU):
2404 """Verifies the cluster disks sizes. 2405 2406 """ 2407 _OP_PARAMS = [("instances", _EmptyList, _TListOf(_TNonEmptyString))] 2408 REQ_BGL = False 2409
2410 - def ExpandNames(self):
2411 if self.op.instances: 2412 self.wanted_names = [] 2413 for name in self.op.instances: 2414 full_name = _ExpandInstanceName(self.cfg, name) 2415 self.wanted_names.append(full_name) 2416 self.needed_locks = { 2417 locking.LEVEL_NODE: [], 2418 locking.LEVEL_INSTANCE: self.wanted_names, 2419 } 2420 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 2421 else: 2422 self.wanted_names = None 2423 self.needed_locks = { 2424 locking.LEVEL_NODE: locking.ALL_SET, 2425 locking.LEVEL_INSTANCE: locking.ALL_SET, 2426 } 2427 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2428
2429 - def DeclareLocks(self, level):
2430 if level == locking.LEVEL_NODE and self.wanted_names is not None: 2431 self._LockInstancesNodes(primary_only=True)
2432
2433 - def CheckPrereq(self):
2434 """Check prerequisites. 2435 2436 This only checks the optional instance list against the existing names. 2437 2438 """ 2439 if self.wanted_names is None: 2440 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] 2441 2442 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name 2443 in self.wanted_names]
2444
2445 - def _EnsureChildSizes(self, disk):
2446 """Ensure children of the disk have the needed disk size. 2447 2448 This is valid mainly for DRBD8 and fixes an issue where the 2449 children have smaller disk size. 2450 2451 @param disk: an L{ganeti.objects.Disk} object 2452 2453 """ 2454 if disk.dev_type == constants.LD_DRBD8: 2455 assert disk.children, "Empty children for DRBD8?" 2456 fchild = disk.children[0] 2457 mismatch = fchild.size < disk.size 2458 if mismatch: 2459 self.LogInfo("Child disk has size %d, parent %d, fixing", 2460 fchild.size, disk.size) 2461 fchild.size = disk.size 2462 2463 # and we recurse on this child only, not on the metadev 2464 return self._EnsureChildSizes(fchild) or mismatch 2465 else: 2466 return False
2467
2468 - def Exec(self, feedback_fn):
2469 """Verify the size of cluster disks. 2470 2471 """ 2472 # TODO: check child disks too 2473 # TODO: check differences in size between primary/secondary nodes 2474 per_node_disks = {} 2475 for instance in self.wanted_instances: 2476 pnode = instance.primary_node 2477 if pnode not in per_node_disks: 2478 per_node_disks[pnode] = [] 2479 for idx, disk in enumerate(instance.disks): 2480 per_node_disks[pnode].append((instance, idx, disk)) 2481 2482 changed = [] 2483 for node, dskl in per_node_disks.items(): 2484 newl = [v[2].Copy() for v in dskl] 2485 for dsk in newl: 2486 self.cfg.SetDiskID(dsk, node) 2487 result = self.rpc.call_blockdev_getsizes(node, newl) 2488 if result.fail_msg: 2489 self.LogWarning("Failure in blockdev_getsizes call to node" 2490 " %s, ignoring", node) 2491 continue 2492 if len(result.data) != len(dskl): 2493 self.LogWarning("Invalid result from node %s, ignoring node results", 2494 node) 2495 continue 2496 for ((instance, idx, disk), size) in zip(dskl, result.data): 2497 if size is None: 2498 self.LogWarning("Disk %d of instance %s did not return size" 2499 " information, ignoring", idx, instance.name) 2500 continue 2501 if not isinstance(size, (int, long)): 2502 self.LogWarning("Disk %d of instance %s did not return valid" 2503 " size information, ignoring", idx, instance.name) 2504 continue 2505 size = size >> 20 2506 if size != disk.size: 2507 self.LogInfo("Disk %d of instance %s has mismatched size," 2508 " correcting: recorded %d, actual %d", idx, 2509 instance.name, disk.size, size) 2510 disk.size = size 2511 self.cfg.Update(instance, feedback_fn) 2512 changed.append((instance.name, idx, size)) 2513 if self._EnsureChildSizes(disk): 2514 self.cfg.Update(instance, feedback_fn) 2515 changed.append((instance.name, idx, disk.size)) 2516 return changed
2517
2518 2519 -class LURenameCluster(LogicalUnit):
2520 """Rename the cluster. 2521 2522 """ 2523 HPATH = "cluster-rename" 2524 HTYPE = constants.HTYPE_CLUSTER 2525 _OP_PARAMS = [("name", _NoDefault, _TNonEmptyString)] 2526
2527 - def BuildHooksEnv(self):
2528 """Build hooks env. 2529 2530 """ 2531 env = { 2532 "OP_TARGET": self.cfg.GetClusterName(), 2533 "NEW_NAME": self.op.name, 2534 } 2535 mn = self.cfg.GetMasterNode() 2536 all_nodes = self.cfg.GetNodeList() 2537 return env, [mn], all_nodes
2538
2539 - def CheckPrereq(self):
2540 """Verify that the passed name is a valid one. 2541 2542 """ 2543 hostname = netutils.GetHostInfo(self.op.name) 2544 2545 new_name = hostname.name 2546 self.ip = new_ip = hostname.ip 2547 old_name = self.cfg.GetClusterName() 2548 old_ip = self.cfg.GetMasterIP() 2549 if new_name == old_name and new_ip == old_ip: 2550 raise errors.OpPrereqError("Neither the name nor the IP address of the" 2551 " cluster has changed", 2552 errors.ECODE_INVAL) 2553 if new_ip != old_ip: 2554 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): 2555 raise errors.OpPrereqError("The given cluster IP address (%s) is" 2556 " reachable on the network. Aborting." % 2557 new_ip, errors.ECODE_NOTUNIQUE) 2558 2559 self.op.name = new_name
2560
2561 - def Exec(self, feedback_fn):
2562 """Rename the cluster. 2563 2564 """ 2565 clustername = self.op.name 2566 ip = self.ip 2567 2568 # shutdown the master IP 2569 master = self.cfg.GetMasterNode() 2570 result = self.rpc.call_node_stop_master(master, False) 2571 result.Raise("Could not disable the master role") 2572 2573 try: 2574 cluster = self.cfg.GetClusterInfo() 2575 cluster.cluster_name = clustername 2576 cluster.master_ip = ip 2577 self.cfg.Update(cluster, feedback_fn) 2578 2579 # update the known hosts file 2580 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) 2581 node_list = self.cfg.GetNodeList() 2582 try: 2583 node_list.remove(master) 2584 except ValueError: 2585 pass 2586 result = self.rpc.call_upload_file(node_list, 2587 constants.SSH_KNOWN_HOSTS_FILE) 2588 for to_node, to_result in result.iteritems(): 2589 msg = to_result.fail_msg 2590 if msg: 2591 msg = ("Copy of file %s to node %s failed: %s" % 2592 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg)) 2593 self.proc.LogWarning(msg) 2594 2595 finally: 2596 result = self.rpc.call_node_start_master(master, False, False) 2597 msg = result.fail_msg 2598 if msg: 2599 self.LogWarning("Could not re-enable the master role on" 2600 " the master, please restart manually: %s", msg) 2601 2602 return clustername
2603
2604 2605 -class LUSetClusterParams(LogicalUnit):
2606 """Change the parameters of the cluster. 2607 2608 """ 2609 HPATH = "cluster-modify" 2610 HTYPE = constants.HTYPE_CLUSTER 2611 _OP_PARAMS = [ 2612 ("vg_name", None, _TMaybeString), 2613 ("enabled_hypervisors", None, 2614 _TOr(_TAnd(_TListOf(_TElemOf(constants.HYPER_TYPES)), _TTrue), _TNone)), 2615 ("hvparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)), 2616 ("beparams", None, _TOr(_TDict, _TNone)), 2617 ("os_hvp", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)), 2618 ("osparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)), 2619 ("candidate_pool_size", None, _TOr(_TStrictPositiveInt, _TNone)), 2620 ("uid_pool", None, _NoType), 2621 ("add_uids", None, _NoType), 2622 ("remove_uids", None, _NoType), 2623 ("maintain_node_health", None, _TMaybeBool), 2624 ("nicparams", None, _TOr(_TDict, _TNone)), 2625 ("drbd_helper", None, _TOr(_TString, _TNone)), 2626 ("default_iallocator", None, _TMaybeString), 2627 ("reserved_lvs", None, _TOr(_TListOf(_TNonEmptyString), _TNone)), 2628 ("hidden_os", None, _TOr(_TListOf(\ 2629 _TAnd(_TList, 2630 _TIsLength(2), 2631 _TMap(lambda v: v[0], _TElemOf(constants.DDMS_VALUES)))), 2632 _TNone)), 2633 ("blacklisted_os", None, _TOr(_TListOf(\ 2634 _TAnd(_TList, 2635 _TIsLength(2), 2636 _TMap(lambda v: v[0], _TElemOf(constants.DDMS_VALUES)))), 2637 _TNone)), 2638 ] 2639 REQ_BGL = False 2640
2641 - def CheckArguments(self):
2642 """Check parameters 2643 2644 """ 2645 if self.op.uid_pool: 2646 uidpool.CheckUidPool(self.op.uid_pool) 2647 2648 if self.op.add_uids: 2649 uidpool.CheckUidPool(self.op.add_uids) 2650 2651 if self.op.remove_uids: 2652 uidpool.CheckUidPool(self.op.remove_uids)
2653
2654 - def ExpandNames(self):
2655 # FIXME: in the future maybe other cluster params won't require checking on 2656 # all nodes to be modified. 2657 self.needed_locks = { 2658 locking.LEVEL_NODE: locking.ALL_SET, 2659 } 2660 self.share_locks[locking.LEVEL_NODE] = 1
2661
2662 - def BuildHooksEnv(self):
2663 """Build hooks env. 2664 2665 """ 2666 env = { 2667 "OP_TARGET": self.cfg.GetClusterName(), 2668 "NEW_VG_NAME": self.op.vg_name, 2669 } 2670 mn = self.cfg.GetMasterNode() 2671 return env, [mn], [mn]
2672
2673 - def CheckPrereq(self):
2674 """Check prerequisites. 2675 2676 This checks whether the given params don't conflict and 2677 if the given volume group is valid. 2678 2679 """ 2680 if self.op.vg_name is not None and not self.op.vg_name: 2681 if self.cfg.HasAnyDiskOfType(constants.LD_LV): 2682 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based" 2683 " instances exist", errors.ECODE_INVAL) 2684 2685 if self.op.drbd_helper is not None and not self.op.drbd_helper: 2686 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8): 2687 raise errors.OpPrereqError("Cannot disable drbd helper while" 2688 " drbd-based instances exist", 2689 errors.ECODE_INVAL) 2690 2691 node_list = self.acquired_locks[locking.LEVEL_NODE] 2692 2693 # if vg_name not None, checks given volume group on all nodes 2694 if self.op.vg_name: 2695 vglist = self.rpc.call_vg_list(node_list) 2696 for node in node_list: 2697 msg = vglist[node].fail_msg 2698 if msg: 2699 # ignoring down node 2700 self.LogWarning("Error while gathering data on node %s" 2701 " (ignoring node): %s", node, msg) 2702 continue 2703 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload, 2704 self.op.vg_name, 2705 constants.MIN_VG_SIZE) 2706 if vgstatus: 2707 raise errors.OpPrereqError("Error on node '%s': %s" % 2708 (node, vgstatus), errors.ECODE_ENVIRON) 2709 2710 if self.op.drbd_helper: 2711 # checks given drbd helper on all nodes 2712 helpers = self.rpc.call_drbd_helper(node_list) 2713 for node in node_list: 2714 ninfo = self.cfg.GetNodeInfo(node) 2715 if ninfo.offline: 2716 self.LogInfo("Not checking drbd helper on offline node %s", node) 2717 continue 2718 msg = helpers[node].fail_msg 2719 if msg: 2720 raise errors.OpPrereqError("Error checking drbd helper on node" 2721 " '%s': %s" % (node, msg), 2722 errors.ECODE_ENVIRON) 2723 node_helper = helpers[node].payload 2724 if node_helper != self.op.drbd_helper: 2725 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" % 2726 (node, node_helper), errors.ECODE_ENVIRON) 2727 2728 self.cluster = cluster = self.cfg.GetClusterInfo() 2729 # validate params changes 2730 if self.op.beparams: 2731 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) 2732 self.new_beparams = cluster.SimpleFillBE(self.op.beparams) 2733 2734 if self.op.nicparams: 2735 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) 2736 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams) 2737 objects.NIC.CheckParameterSyntax(self.new_nicparams) 2738 nic_errors = [] 2739 2740 # check all instances for consistency 2741 for instance in self.cfg.GetAllInstancesInfo().values(): 2742 for nic_idx, nic in enumerate(instance.nics): 2743 params_copy = copy.deepcopy(nic.nicparams) 2744 params_filled = objects.FillDict(self.new_nicparams, params_copy) 2745 2746 # check parameter syntax 2747 try: 2748 objects.NIC.CheckParameterSyntax(params_filled) 2749 except errors.ConfigurationError, err: 2750 nic_errors.append("Instance %s, nic/%d: %s" % 2751 (instance.name, nic_idx, err)) 2752 2753 # if we're moving instances to routed, check that they have an ip 2754 target_mode = params_filled[constants.NIC_MODE] 2755 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: 2756 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" % 2757 (instance.name, nic_idx)) 2758 if nic_errors: 2759 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % 2760 "\n".join(nic_errors)) 2761 2762 # hypervisor list/parameters 2763 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {}) 2764 if self.op.hvparams: 2765 for hv_name, hv_dict in self.op.hvparams.items(): 2766 if hv_name not in self.new_hvparams: 2767 self.new_hvparams[hv_name] = hv_dict 2768 else: 2769 self.new_hvparams[hv_name].update(hv_dict) 2770 2771 # os hypervisor parameters 2772 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {}) 2773 if self.op.os_hvp: 2774 for os_name, hvs in self.op.os_hvp.items(): 2775 if os_name not in self.new_os_hvp: 2776 self.new_os_hvp[os_name] = hvs 2777 else: 2778 for hv_name, hv_dict in hvs.items(): 2779 if hv_name not in self.new_os_hvp[os_name]: 2780 self.new_os_hvp[os_name][hv_name] = hv_dict 2781 else: 2782 self.new_os_hvp[os_name][hv_name].update(hv_dict) 2783 2784 # os parameters 2785 self.new_osp = objects.FillDict(cluster.osparams, {}) 2786 if self.op.osparams: 2787 for os_name, osp in self.op.osparams.items(): 2788 if os_name not in self.new_osp: 2789 self.new_osp[os_name] = {} 2790 2791 self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp, 2792 use_none=True) 2793 2794 if not self.new_osp[os_name]: 2795 # we removed all parameters 2796 del self.new_osp[os_name] 2797 else: 2798 # check the parameter validity (remote check) 2799 _CheckOSParams(self, False, [self.cfg.GetMasterNode()], 2800 os_name, self.new_osp[os_name]) 2801 2802 # changes to the hypervisor list 2803 if self.op.enabled_hypervisors is not None: 2804 self.hv_list = self.op.enabled_hypervisors 2805 for hv in self.hv_list: 2806 # if the hypervisor doesn't already exist in the cluster 2807 # hvparams, we initialize it to empty, and then (in both 2808 # cases) we make sure to fill the defaults, as we might not 2809 # have a complete defaults list if the hypervisor wasn't 2810 # enabled before 2811 if hv not in new_hvp: 2812 new_hvp[hv] = {} 2813 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv]) 2814 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES) 2815 else: 2816 self.hv_list = cluster.enabled_hypervisors 2817 2818 if self.op.hvparams or self.op.enabled_hypervisors is not None: 2819 # either the enabled list has changed, or the parameters have, validate 2820 for hv_name, hv_params in self.new_hvparams.items(): 2821 if ((self.op.hvparams and hv_name in self.op.hvparams) or 2822 (self.op.enabled_hypervisors and 2823 hv_name in self.op.enabled_hypervisors)): 2824 # either this is a new hypervisor, or its parameters have changed 2825 hv_class = hypervisor.GetHypervisor(hv_name) 2826 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 2827 hv_class.CheckParameterSyntax(hv_params) 2828 _CheckHVParams(self, node_list, hv_name, hv_params) 2829 2830 if self.op.os_hvp: 2831 # no need to check any newly-enabled hypervisors, since the 2832 # defaults have already been checked in the above code-block 2833 for os_name, os_hvp in self.new_os_hvp.items(): 2834 for hv_name, hv_params in os_hvp.items(): 2835 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 2836 # we need to fill in the new os_hvp on top of the actual hv_p 2837 cluster_defaults = self.new_hvparams.get(hv_name, {}) 2838 new_osp = objects.FillDict(cluster_defaults, hv_params) 2839 hv_class = hypervisor.GetHypervisor(hv_name) 2840 hv_class.CheckParameterSyntax(new_osp) 2841 _CheckHVParams(self, node_list, hv_name, new_osp) 2842 2843 if self.op.default_iallocator: 2844 alloc_script = utils.FindFile(self.op.default_iallocator, 2845 constants.IALLOCATOR_SEARCH_PATH, 2846 os.path.isfile) 2847 if alloc_script is None: 2848 raise errors.OpPrereqError("Invalid default iallocator script '%s'" 2849 " specified" % self.op.default_iallocator, 2850 errors.ECODE_INVAL)
2851
2852 - def Exec(self, feedback_fn):
2853 """Change the parameters of the cluster. 2854 2855 """ 2856 if self.op.vg_name is not None: 2857 new_volume = self.op.vg_name 2858 if not new_volume: 2859 new_volume = None 2860 if new_volume != self.cfg.GetVGName(): 2861 self.cfg.SetVGName(new_volume) 2862 else: 2863 feedback_fn("Cluster LVM configuration already in desired" 2864 " state, not changing") 2865 if self.op.drbd_helper is not None: 2866 new_helper = self.op.drbd_helper 2867 if not new_helper: 2868 new_helper = None 2869 if new_helper != self.cfg.GetDRBDHelper(): 2870 self.cfg.SetDRBDHelper(new_helper) 2871 else: 2872 feedback_fn("Cluster DRBD helper already in desired state," 2873 " not changing") 2874 if self.op.hvparams: 2875 self.cluster.hvparams = self.new_hvparams 2876 if self.op.os_hvp: 2877 self.cluster.os_hvp = self.new_os_hvp 2878 if self.op.enabled_hypervisors is not None: 2879 self.cluster.hvparams = self.new_hvparams 2880 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors 2881 if self.op.beparams: 2882 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams 2883 if self.op.nicparams: 2884 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams 2885 if self.op.osparams: 2886 self.cluster.osparams = self.new_osp 2887 2888 if self.op.candidate_pool_size is not None: 2889 self.cluster.candidate_pool_size = self.op.candidate_pool_size 2890 # we need to update the pool size here, otherwise the save will fail 2891 _AdjustCandidatePool(self, []) 2892 2893 if self.op.maintain_node_health is not None: 2894 self.cluster.maintain_node_health = self.op.maintain_node_health 2895 2896 if self.op.add_uids is not None: 2897 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids) 2898 2899 if self.op.remove_uids is not None: 2900 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids) 2901 2902 if self.op.uid_pool is not None: 2903 self.cluster.uid_pool = self.op.uid_pool 2904 2905 if self.op.default_iallocator is not None: 2906 self.cluster.default_iallocator = self.op.default_iallocator 2907 2908 if self.op.reserved_lvs is not None: 2909 self.cluster.reserved_lvs = self.op.reserved_lvs 2910 2911 def helper_os(aname, mods, desc): 2912 desc += " OS list" 2913 lst = getattr(self.cluster, aname) 2914 for key, val in mods: 2915 if key == constants.DDM_ADD: 2916 if val in lst: 2917 feedback_fn("OS %s already in %s, ignoring", val, desc) 2918 else: 2919 lst.append(val) 2920 elif key == constants.DDM_REMOVE: 2921 if val in lst: 2922 lst.remove(val) 2923 else: 2924 feedback_fn("OS %s not found in %s, ignoring", val, desc) 2925 else: 2926 raise errors.ProgrammerError("Invalid modification '%s'" % key)
2927 2928 if self.op.hidden_os: 2929 helper_os("hidden_os", self.op.hidden_os, "hidden") 2930 2931 if self.op.blacklisted_os: 2932 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted") 2933 2934 self.cfg.Update(self.cluster, feedback_fn)
2935
2936 2937 -def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2938 """Distribute additional files which are part of the cluster configuration. 2939 2940 ConfigWriter takes care of distributing the config and ssconf files, but 2941 there are more files which should be distributed to all nodes. This function 2942 makes sure those are copied. 2943 2944 @param lu: calling logical unit 2945 @param additional_nodes: list of nodes not in the config to distribute to 2946 2947 """ 2948 # 1. Gather target nodes 2949 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) 2950 dist_nodes = lu.cfg.GetOnlineNodeList() 2951 if additional_nodes is not None: 2952 dist_nodes.extend(additional_nodes) 2953 if myself.name in dist_nodes: 2954 dist_nodes.remove(myself.name) 2955 2956 # 2. Gather files to distribute 2957 dist_files = set([constants.ETC_HOSTS, 2958 constants.SSH_KNOWN_HOSTS_FILE, 2959 constants.RAPI_CERT_FILE, 2960 constants.RAPI_USERS_FILE, 2961 constants.CONFD_HMAC_KEY, 2962 constants.CLUSTER_DOMAIN_SECRET_FILE, 2963 ]) 2964 2965 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors 2966 for hv_name in enabled_hypervisors: 2967 hv_class = hypervisor.GetHypervisor(hv_name) 2968 dist_files.update(hv_class.GetAncillaryFiles()) 2969 2970 # 3. Perform the files upload 2971 for fname in dist_files: 2972 if os.path.exists(fname): 2973 result = lu.rpc.call_upload_file(dist_nodes, fname) 2974 for to_node, to_result in result.items(): 2975 msg = to_result.fail_msg 2976 if msg: 2977 msg = ("Copy of file %s to node %s failed: %s" % 2978 (fname, to_node, msg)) 2979 lu.proc.LogWarning(msg)
2980
2981 2982 -class LURedistributeConfig(NoHooksLU):
2983 """Force the redistribution of cluster configuration. 2984 2985 This is a very simple LU. 2986 2987 """ 2988 REQ_BGL = False 2989
2990 - def ExpandNames(self):
2991 self.needed_locks = { 2992 locking.LEVEL_NODE: locking.ALL_SET, 2993 } 2994 self.share_locks[locking.LEVEL_NODE] = 1
2995
2996 - def Exec(self, feedback_fn):
2997 """Redistribute the configuration. 2998 2999 """ 3000 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn) 3001 _RedistributeAncillaryFiles(self)
3002
3003 3004 -def _WaitForSync(lu, instance, disks=None, oneshot=False):
3005 """Sleep and poll for an instance's disk to sync. 3006 3007 """ 3008 if not instance.disks or disks is not None and not disks: 3009 return True 3010 3011 disks = _ExpandCheckDisks(instance, disks) 3012 3013 if not oneshot: 3014 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name) 3015 3016 node = instance.primary_node 3017 3018 for dev in disks: 3019 lu.cfg.SetDiskID(dev, node) 3020 3021 # TODO: Convert to utils.Retry 3022 3023 retries = 0 3024 degr_retries = 10 # in seconds, as we sleep 1 second each time 3025 while True: 3026 max_time = 0 3027 done = True 3028 cumul_degraded = False 3029 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks) 3030 msg = rstats.fail_msg 3031 if msg: 3032 lu.LogWarning("Can't get any data from node %s: %s", node, msg) 3033 retries += 1 3034 if retries >= 10: 3035 raise errors.RemoteError("Can't contact node %s for mirror data," 3036 " aborting." % node) 3037 time.sleep(6) 3038 continue 3039 rstats = rstats.payload 3040 retries = 0 3041 for i, mstat in enumerate(rstats): 3042 if mstat is None: 3043 lu.LogWarning("Can't compute data for node %s/%s", 3044 node, disks[i].iv_name) 3045 continue 3046 3047 cumul_degraded = (cumul_degraded or 3048 (mstat.is_degraded and mstat.sync_percent is None)) 3049 if mstat.sync_percent is not None: 3050 done = False 3051 if mstat.estimated_time is not None: 3052 rem_time = ("%s remaining (estimated)" % 3053 utils.FormatSeconds(mstat.estimated_time)) 3054 max_time = mstat.estimated_time 3055 else: 3056 rem_time = "no time estimate" 3057 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % 3058 (disks[i].iv_name, mstat.sync_percent, rem_time)) 3059 3060 # if we're done but degraded, let's do a few small retries, to 3061 # make sure we see a stable and not transient situation; therefore 3062 # we force restart of the loop 3063 if (done or oneshot) and cumul_degraded and degr_retries > 0: 3064 logging.info("Degraded disks found, %d retries left", degr_retries) 3065 degr_retries -= 1 3066 time.sleep(1) 3067 continue 3068 3069 if done or oneshot: 3070 break 3071 3072 time.sleep(min(60, max_time)) 3073 3074 if done: 3075 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name) 3076 return not cumul_degraded
3077
3078 3079 -def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3080 """Check that mirrors are not degraded. 3081 3082 The ldisk parameter, if True, will change the test from the 3083 is_degraded attribute (which represents overall non-ok status for 3084 the device(s)) to the ldisk (representing the local storage status). 3085 3086 """ 3087 lu.cfg.SetDiskID(dev, node) 3088 3089 result = True 3090 3091 if on_primary or dev.AssembleOnSecondary(): 3092 rstats = lu.rpc.call_blockdev_find(node, dev) 3093 msg = rstats.fail_msg 3094 if msg: 3095 lu.LogWarning("Can't find disk on node %s: %s", node, msg) 3096 result = False 3097 elif not rstats.payload: 3098 lu.LogWarning("Can't find disk on node %s", node) 3099 result = False 3100 else: 3101 if ldisk: 3102 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY 3103 else: 3104 result = result and not rstats.payload.is_degraded 3105 3106 if dev.children: 3107 for child in dev.children: 3108 result = result and _CheckDiskConsistency(lu, child, node, on_primary