Package ganeti :: Module cmdlib
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the master-side code.""" 
  23   
  24  # pylint: disable-msg=W0201 
  25   
  26  # W0201 since most LU attributes are defined in CheckPrereq or similar 
  27  # functions 
  28   
  29  import os 
  30  import os.path 
  31  import time 
  32  import re 
  33  import platform 
  34  import logging 
  35  import copy 
  36  import OpenSSL 
  37   
  38  from ganeti import ssh 
  39  from ganeti import utils 
  40  from ganeti import errors 
  41  from ganeti import hypervisor 
  42  from ganeti import locking 
  43  from ganeti import constants 
  44  from ganeti import objects 
  45  from ganeti import serializer 
  46  from ganeti import ssconf 
  47  from ganeti import uidpool 
  48  from ganeti import compat 
49 50 51 -class LogicalUnit(object):
52 """Logical Unit base class. 53 54 Subclasses must follow these rules: 55 - implement ExpandNames 56 - implement CheckPrereq (except when tasklets are used) 57 - implement Exec (except when tasklets are used) 58 - implement BuildHooksEnv 59 - redefine HPATH and HTYPE 60 - optionally redefine their run requirements: 61 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively 62 63 Note that all commands require root permissions. 64 65 @ivar dry_run_result: the value (if any) that will be returned to the caller 66 in dry-run mode (signalled by opcode dry_run parameter) 67 68 """ 69 HPATH = None 70 HTYPE = None 71 _OP_REQP = [] 72 REQ_BGL = True 73
74 - def __init__(self, processor, op, context, rpc):
75 """Constructor for LogicalUnit. 76 77 This needs to be overridden in derived classes in order to check op 78 validity. 79 80 """ 81 self.proc = processor 82 self.op = op 83 self.cfg = context.cfg 84 self.context = context 85 self.rpc = rpc 86 # Dicts used to declare locking needs to mcpu 87 self.needed_locks = None 88 self.acquired_locks = {} 89 self.share_locks = dict.fromkeys(locking.LEVELS, 0) 90 self.add_locks = {} 91 self.remove_locks = {} 92 # Used to force good behavior when calling helper functions 93 self.recalculate_locks = {} 94 self.__ssh = None 95 # logging 96 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 97 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 98 self.LogStep = processor.LogStep # pylint: disable-msg=C0103 99 # support for dry-run 100 self.dry_run_result = None 101 # support for generic debug attribute 102 if (not hasattr(self.op, "debug_level") or 103 not isinstance(self.op.debug_level, int)): 104 self.op.debug_level = 0 105 106 # Tasklets 107 self.tasklets = None 108 109 for attr_name in self._OP_REQP: 110 attr_val = getattr(op, attr_name, None) 111 if attr_val is None: 112 raise errors.OpPrereqError("Required parameter '%s' missing" % 113 attr_name, errors.ECODE_INVAL) 114 115 self.CheckArguments()
116
117 - def __GetSSH(self):
118 """Returns the SshRunner object 119 120 """ 121 if not self.__ssh: 122 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName()) 123 return self.__ssh
124 125 ssh = property(fget=__GetSSH) 126
127 - def CheckArguments(self):
128 """Check syntactic validity for the opcode arguments. 129 130 This method is for doing a simple syntactic check and ensure 131 validity of opcode parameters, without any cluster-related 132 checks. While the same can be accomplished in ExpandNames and/or 133 CheckPrereq, doing these separate is better because: 134 135 - ExpandNames is left as as purely a lock-related function 136 - CheckPrereq is run after we have acquired locks (and possible 137 waited for them) 138 139 The function is allowed to change the self.op attribute so that 140 later methods can no longer worry about missing parameters. 141 142 """ 143 pass
144
145 - def ExpandNames(self):
146 """Expand names for this LU. 147 148 This method is called before starting to execute the opcode, and it should 149 update all the parameters of the opcode to their canonical form (e.g. a 150 short node name must be fully expanded after this method has successfully 151 completed). This way locking, hooks, logging, ecc. can work correctly. 152 153 LUs which implement this method must also populate the self.needed_locks 154 member, as a dict with lock levels as keys, and a list of needed lock names 155 as values. Rules: 156 157 - use an empty dict if you don't need any lock 158 - if you don't need any lock at a particular level omit that level 159 - don't put anything for the BGL level 160 - if you want all locks at a level use locking.ALL_SET as a value 161 162 If you need to share locks (rather than acquire them exclusively) at one 163 level you can modify self.share_locks, setting a true value (usually 1) for 164 that level. By default locks are not shared. 165 166 This function can also define a list of tasklets, which then will be 167 executed in order instead of the usual LU-level CheckPrereq and Exec 168 functions, if those are not defined by the LU. 169 170 Examples:: 171 172 # Acquire all nodes and one instance 173 self.needed_locks = { 174 locking.LEVEL_NODE: locking.ALL_SET, 175 locking.LEVEL_INSTANCE: ['instance1.example.tld'], 176 } 177 # Acquire just two nodes 178 self.needed_locks = { 179 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'], 180 } 181 # Acquire no locks 182 self.needed_locks = {} # No, you can't leave it to the default value None 183 184 """ 185 # The implementation of this method is mandatory only if the new LU is 186 # concurrent, so that old LUs don't need to be changed all at the same 187 # time. 188 if self.REQ_BGL: 189 self.needed_locks = {} # Exclusive LUs don't need locks. 190 else: 191 raise NotImplementedError
192
193 - def DeclareLocks(self, level):
194 """Declare LU locking needs for a level 195 196 While most LUs can just declare their locking needs at ExpandNames time, 197 sometimes there's the need to calculate some locks after having acquired 198 the ones before. This function is called just before acquiring locks at a 199 particular level, but after acquiring the ones at lower levels, and permits 200 such calculations. It can be used to modify self.needed_locks, and by 201 default it does nothing. 202 203 This function is only called if you have something already set in 204 self.needed_locks for the level. 205 206 @param level: Locking level which is going to be locked 207 @type level: member of ganeti.locking.LEVELS 208 209 """
210
211 - def CheckPrereq(self):
212 """Check prerequisites for this LU. 213 214 This method should check that the prerequisites for the execution 215 of this LU are fulfilled. It can do internode communication, but 216 it should be idempotent - no cluster or system changes are 217 allowed. 218 219 The method should raise errors.OpPrereqError in case something is 220 not fulfilled. Its return value is ignored. 221 222 This method should also update all the parameters of the opcode to 223 their canonical form if it hasn't been done by ExpandNames before. 224 225 """ 226 if self.tasklets is not None: 227 for (idx, tl) in enumerate(self.tasklets): 228 logging.debug("Checking prerequisites for tasklet %s/%s", 229 idx + 1, len(self.tasklets)) 230 tl.CheckPrereq() 231 else: 232 raise NotImplementedError
233
234 - def Exec(self, feedback_fn):
235 """Execute the LU. 236 237 This method should implement the actual work. It should raise 238 errors.OpExecError for failures that are somewhat dealt with in 239 code, or expected. 240 241 """ 242 if self.tasklets is not None: 243 for (idx, tl) in enumerate(self.tasklets): 244 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets)) 245 tl.Exec(feedback_fn) 246 else: 247 raise NotImplementedError
248
249 - def BuildHooksEnv(self):
250 """Build hooks environment for this LU. 251 252 This method should return a three-node tuple consisting of: a dict 253 containing the environment that will be used for running the 254 specific hook for this LU, a list of node names on which the hook 255 should run before the execution, and a list of node names on which 256 the hook should run after the execution. 257 258 The keys of the dict must not have 'GANETI_' prefixed as this will 259 be handled in the hooks runner. Also note additional keys will be 260 added by the hooks runner. If the LU doesn't define any 261 environment, an empty dict (and not None) should be returned. 262 263 No nodes should be returned as an empty list (and not None). 264 265 Note that if the HPATH for a LU class is None, this function will 266 not be called. 267 268 """ 269 raise NotImplementedError
270
271 - def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
272 """Notify the LU about the results of its hooks. 273 274 This method is called every time a hooks phase is executed, and notifies 275 the Logical Unit about the hooks' result. The LU can then use it to alter 276 its result based on the hooks. By default the method does nothing and the 277 previous result is passed back unchanged but any LU can define it if it 278 wants to use the local cluster hook-scripts somehow. 279 280 @param phase: one of L{constants.HOOKS_PHASE_POST} or 281 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 282 @param hook_results: the results of the multi-node hooks rpc call 283 @param feedback_fn: function used send feedback back to the caller 284 @param lu_result: the previous Exec result this LU had, or None 285 in the PRE phase 286 @return: the new Exec result, based on the previous result 287 and hook results 288 289 """ 290 # API must be kept, thus we ignore the unused argument and could 291 # be a function warnings 292 # pylint: disable-msg=W0613,R0201 293 return lu_result
294
295 - def _ExpandAndLockInstance(self):
296 """Helper function to expand and lock an instance. 297 298 Many LUs that work on an instance take its name in self.op.instance_name 299 and need to expand it and then declare the expanded name for locking. This 300 function does it, and then updates self.op.instance_name to the expanded 301 name. It also initializes needed_locks as a dict, if this hasn't been done 302 before. 303 304 """ 305 if self.needed_locks is None: 306 self.needed_locks = {} 307 else: 308 assert locking.LEVEL_INSTANCE not in self.needed_locks, \ 309 "_ExpandAndLockInstance called with instance-level locks set" 310 self.op.instance_name = _ExpandInstanceName(self.cfg, 311 self.op.instance_name) 312 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
313
314 - def _LockInstancesNodes(self, primary_only=False):
315 """Helper function to declare instances' nodes for locking. 316 317 This function should be called after locking one or more instances to lock 318 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE] 319 with all primary or secondary nodes for instances already locked and 320 present in self.needed_locks[locking.LEVEL_INSTANCE]. 321 322 It should be called from DeclareLocks, and for safety only works if 323 self.recalculate_locks[locking.LEVEL_NODE] is set. 324 325 In the future it may grow parameters to just lock some instance's nodes, or 326 to just lock primaries or secondary nodes, if needed. 327 328 If should be called in DeclareLocks in a way similar to:: 329 330 if level == locking.LEVEL_NODE: 331 self._LockInstancesNodes() 332 333 @type primary_only: boolean 334 @param primary_only: only lock primary nodes of locked instances 335 336 """ 337 assert locking.LEVEL_NODE in self.recalculate_locks, \ 338 "_LockInstancesNodes helper function called with no nodes to recalculate" 339 340 # TODO: check if we're really been called with the instance locks held 341 342 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the 343 # future we might want to have different behaviors depending on the value 344 # of self.recalculate_locks[locking.LEVEL_NODE] 345 wanted_nodes = [] 346 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: 347 instance = self.context.cfg.GetInstanceInfo(instance_name) 348 wanted_nodes.append(instance.primary_node) 349 if not primary_only: 350 wanted_nodes.extend(instance.secondary_nodes) 351 352 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE: 353 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes 354 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND: 355 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes) 356 357 del self.recalculate_locks[locking.LEVEL_NODE]
358
359 360 -class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
361 """Simple LU which runs no hooks. 362 363 This LU is intended as a parent for other LogicalUnits which will 364 run no hooks, in order to reduce duplicate code. 365 366 """ 367 HPATH = None 368 HTYPE = None 369
370 - def BuildHooksEnv(self):
371 """Empty BuildHooksEnv for NoHooksLu. 372 373 This just raises an error. 374 375 """ 376 assert False, "BuildHooksEnv called for NoHooksLUs"
377
378 379 -class Tasklet:
380 """Tasklet base class. 381 382 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or 383 they can mix legacy code with tasklets. Locking needs to be done in the LU, 384 tasklets know nothing about locks. 385 386 Subclasses must follow these rules: 387 - Implement CheckPrereq 388 - Implement Exec 389 390 """
391 - def __init__(self, lu):
392 self.lu = lu 393 394 # Shortcuts 395 self.cfg = lu.cfg 396 self.rpc = lu.rpc
397
398 - def CheckPrereq(self):
399 """Check prerequisites for this tasklets. 400 401 This method should check whether the prerequisites for the execution of 402 this tasklet are fulfilled. It can do internode communication, but it 403 should be idempotent - no cluster or system changes are allowed. 404 405 The method should raise errors.OpPrereqError in case something is not 406 fulfilled. Its return value is ignored. 407 408 This method should also update all parameters to their canonical form if it 409 hasn't been done before. 410 411 """ 412 raise NotImplementedError
413
414 - def Exec(self, feedback_fn):
415 """Execute the tasklet. 416 417 This method should implement the actual work. It should raise 418 errors.OpExecError for failures that are somewhat dealt with in code, or 419 expected. 420 421 """ 422 raise NotImplementedError
423
424 425 -def _GetWantedNodes(lu, nodes):
426 """Returns list of checked and expanded node names. 427 428 @type lu: L{LogicalUnit} 429 @param lu: the logical unit on whose behalf we execute 430 @type nodes: list 431 @param nodes: list of node names or None for all nodes 432 @rtype: list 433 @return: the list of nodes, sorted 434 @raise errors.ProgrammerError: if the nodes parameter is wrong type 435 436 """ 437 if not isinstance(nodes, list): 438 raise errors.OpPrereqError("Invalid argument type 'nodes'", 439 errors.ECODE_INVAL) 440 441 if not nodes: 442 raise errors.ProgrammerError("_GetWantedNodes should only be called with a" 443 " non-empty list of nodes whose name is to be expanded.") 444 445 wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes] 446 return utils.NiceSort(wanted)
447
448 449 -def _GetWantedInstances(lu, instances):
450 """Returns list of checked and expanded instance names. 451 452 @type lu: L{LogicalUnit} 453 @param lu: the logical unit on whose behalf we execute 454 @type instances: list 455 @param instances: list of instance names or None for all instances 456 @rtype: list 457 @return: the list of instances, sorted 458 @raise errors.OpPrereqError: if the instances parameter is wrong type 459 @raise errors.OpPrereqError: if any of the passed instances is not found 460 461 """ 462 if not isinstance(instances, list): 463 raise errors.OpPrereqError("Invalid argument type 'instances'", 464 errors.ECODE_INVAL) 465 466 if instances: 467 wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances] 468 else: 469 wanted = utils.NiceSort(lu.cfg.GetInstanceList()) 470 return wanted
471
472 473 -def _CheckOutputFields(static, dynamic, selected):
474 """Checks whether all selected fields are valid. 475 476 @type static: L{utils.FieldSet} 477 @param static: static fields set 478 @type dynamic: L{utils.FieldSet} 479 @param dynamic: dynamic fields set 480 481 """ 482 f = utils.FieldSet() 483 f.Extend(static) 484 f.Extend(dynamic) 485 486 delta = f.NonMatching(selected) 487 if delta: 488 raise errors.OpPrereqError("Unknown output fields selected: %s" 489 % ",".join(delta), errors.ECODE_INVAL)
490
491 492 -def _CheckBooleanOpField(op, name):
493 """Validates boolean opcode parameters. 494 495 This will ensure that an opcode parameter is either a boolean value, 496 or None (but that it always exists). 497 498 """ 499 val = getattr(op, name, None) 500 if not (val is None or isinstance(val, bool)): 501 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" % 502 (name, str(val)), errors.ECODE_INVAL) 503 setattr(op, name, val)
504
505 506 -def _CheckGlobalHvParams(params):
507 """Validates that given hypervisor params are not global ones. 508 509 This will ensure that instances don't get customised versions of 510 global params. 511 512 """ 513 used_globals = constants.HVC_GLOBALS.intersection(params) 514 if used_globals: 515 msg = ("The following hypervisor parameters are global and cannot" 516 " be customized at instance level, please modify them at" 517 " cluster level: %s" % utils.CommaJoin(used_globals)) 518 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
519
520 521 -def _CheckNodeOnline(lu, node):
522 """Ensure that a given node is online. 523 524 @param lu: the LU on behalf of which we make the check 525 @param node: the node to check 526 @raise errors.OpPrereqError: if the node is offline 527 528 """ 529 if lu.cfg.GetNodeInfo(node).offline: 530 raise errors.OpPrereqError("Can't use offline node %s" % node, 531 errors.ECODE_INVAL)
532
533 534 -def _CheckNodeNotDrained(lu, node):
535 """Ensure that a given node is not drained. 536 537 @param lu: the LU on behalf of which we make the check 538 @param node: the node to check 539 @raise errors.OpPrereqError: if the node is drained 540 541 """ 542 if lu.cfg.GetNodeInfo(node).drained: 543 raise errors.OpPrereqError("Can't use drained node %s" % node, 544 errors.ECODE_INVAL)
545
546 547 -def _CheckNodeHasOS(lu, node, os_name, force_variant):
548 """Ensure that a node supports a given OS. 549 550 @param lu: the LU on behalf of which we make the check 551 @param node: the node to check 552 @param os_name: the OS to query about 553 @param force_variant: whether to ignore variant errors 554 @raise errors.OpPrereqError: if the node is not supporting the OS 555 556 """ 557 result = lu.rpc.call_os_get(node, os_name) 558 result.Raise("OS '%s' not in supported OS list for node %s" % 559 (os_name, node), 560 prereq=True, ecode=errors.ECODE_INVAL) 561 if not force_variant: 562 _CheckOSVariant(result.payload, os_name)
563
564 565 -def _RequireFileStorage():
566 """Checks that file storage is enabled. 567 568 @raise errors.OpPrereqError: when file storage is disabled 569 570 """ 571 if not constants.ENABLE_FILE_STORAGE: 572 raise errors.OpPrereqError("File storage disabled at configure time", 573 errors.ECODE_INVAL)
574
575 576 -def _CheckDiskTemplate(template):
577 """Ensure a given disk template is valid. 578 579 """ 580 if template not in constants.DISK_TEMPLATES: 581 msg = ("Invalid disk template name '%s', valid templates are: %s" % 582 (template, utils.CommaJoin(constants.DISK_TEMPLATES))) 583 raise errors.OpPrereqError(msg, errors.ECODE_INVAL) 584 if template == constants.DT_FILE: 585 _RequireFileStorage()
586
587 588 -def _CheckStorageType(storage_type):
589 """Ensure a given storage type is valid. 590 591 """ 592 if storage_type not in constants.VALID_STORAGE_TYPES: 593 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, 594 errors.ECODE_INVAL) 595 if storage_type == constants.ST_FILE: 596 _RequireFileStorage()
597
598 599 600 -def _CheckInstanceDown(lu, instance, reason):
601 """Ensure that an instance is not running.""" 602 if instance.admin_up: 603 raise errors.OpPrereqError("Instance %s is marked to be up, %s" % 604 (instance.name, reason), errors.ECODE_STATE) 605 606 pnode = instance.primary_node 607 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] 608 ins_l.Raise("Can't contact node %s for instance information" % pnode, 609 prereq=True, ecode=errors.ECODE_ENVIRON) 610 611 if instance.name in ins_l.payload: 612 raise errors.OpPrereqError("Instance %s is running, %s" % 613 (instance.name, reason), errors.ECODE_STATE)
614
615 616 -def _ExpandItemName(fn, name, kind):
617 """Expand an item name. 618 619 @param fn: the function to use for expansion 620 @param name: requested item name 621 @param kind: text description ('Node' or 'Instance') 622 @return: the resolved (full) name 623 @raise errors.OpPrereqError: if the item is not found 624 625 """ 626 full_name = fn(name) 627 if full_name is None: 628 raise errors.OpPrereqError("%s '%s' not known" % (kind, name), 629 errors.ECODE_NOENT) 630 return full_name
631
632 633 -def _ExpandNodeName(cfg, name):
634 """Wrapper over L{_ExpandItemName} for nodes.""" 635 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
636
637 638 -def _ExpandInstanceName(cfg, name):
639 """Wrapper over L{_ExpandItemName} for instance.""" 640 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
641
642 643 -def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, 644 memory, vcpus, nics, disk_template, disks, 645 bep, hvp, hypervisor_name):
646 """Builds instance related env variables for hooks 647 648 This builds the hook environment from individual variables. 649 650 @type name: string 651 @param name: the name of the instance 652 @type primary_node: string 653 @param primary_node: the name of the instance's primary node 654 @type secondary_nodes: list 655 @param secondary_nodes: list of secondary nodes as strings 656 @type os_type: string 657 @param os_type: the name of the instance's OS 658 @type status: boolean 659 @param status: the should_run status of the instance 660 @type memory: string 661 @param memory: the memory size of the instance 662 @type vcpus: string 663 @param vcpus: the count of VCPUs the instance has 664 @type nics: list 665 @param nics: list of tuples (ip, mac, mode, link) representing 666 the NICs the instance has 667 @type disk_template: string 668 @param disk_template: the disk template of the instance 669 @type disks: list 670 @param disks: the list of (size, mode) pairs 671 @type bep: dict 672 @param bep: the backend parameters for the instance 673 @type hvp: dict 674 @param hvp: the hypervisor parameters for the instance 675 @type hypervisor_name: string 676 @param hypervisor_name: the hypervisor for the instance 677 @rtype: dict 678 @return: the hook environment for this instance 679 680 """ 681 if status: 682 str_status = "up" 683 else: 684 str_status = "down" 685 env = { 686 "OP_TARGET": name, 687 "INSTANCE_NAME": name, 688 "INSTANCE_PRIMARY": primary_node, 689 "INSTANCE_SECONDARIES": " ".join(secondary_nodes), 690 "INSTANCE_OS_TYPE": os_type, 691 "INSTANCE_STATUS": str_status, 692 "INSTANCE_MEMORY": memory, 693 "INSTANCE_VCPUS": vcpus, 694 "INSTANCE_DISK_TEMPLATE": disk_template, 695 "INSTANCE_HYPERVISOR": hypervisor_name, 696 } 697 698 if nics: 699 nic_count = len(nics) 700 for idx, (ip, mac, mode, link) in enumerate(nics): 701 if ip is None: 702 ip = "" 703 env["INSTANCE_NIC%d_IP" % idx] = ip 704 env["INSTANCE_NIC%d_MAC" % idx] = mac 705 env["INSTANCE_NIC%d_MODE" % idx] = mode 706 env["INSTANCE_NIC%d_LINK" % idx] = link 707 if mode == constants.NIC_MODE_BRIDGED: 708 env["INSTANCE_NIC%d_BRIDGE" % idx] = link 709 else: 710 nic_count = 0 711 712 env["INSTANCE_NIC_COUNT"] = nic_count 713 714 if disks: 715 disk_count = len(disks) 716 for idx, (size, mode) in enumerate(disks): 717 env["INSTANCE_DISK%d_SIZE" % idx] = size 718 env["INSTANCE_DISK%d_MODE" % idx] = mode 719 else: 720 disk_count = 0 721 722 env["INSTANCE_DISK_COUNT"] = disk_count 723 724 for source, kind in [(bep, "BE"), (hvp, "HV")]: 725 for key, value in source.items(): 726 env["INSTANCE_%s_%s" % (kind, key)] = value 727 728 return env
729
730 731 -def _NICListToTuple(lu, nics):
732 """Build a list of nic information tuples. 733 734 This list is suitable to be passed to _BuildInstanceHookEnv or as a return 735 value in LUQueryInstanceData. 736 737 @type lu: L{LogicalUnit} 738 @param lu: the logical unit on whose behalf we execute 739 @type nics: list of L{objects.NIC} 740 @param nics: list of nics to convert to hooks tuples 741 742 """ 743 hooks_nics = [] 744 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT] 745 for nic in nics: 746 ip = nic.ip 747 mac = nic.mac 748 filled_params = objects.FillDict(c_nicparams, nic.nicparams) 749 mode = filled_params[constants.NIC_MODE] 750 link = filled_params[constants.NIC_LINK] 751 hooks_nics.append((ip, mac, mode, link)) 752 return hooks_nics
753
754 755 -def _BuildInstanceHookEnvByObject(lu, instance, override=None):
756 """Builds instance related env variables for hooks from an object. 757 758 @type lu: L{LogicalUnit} 759 @param lu: the logical unit on whose behalf we execute 760 @type instance: L{objects.Instance} 761 @param instance: the instance for which we should build the 762 environment 763 @type override: dict 764 @param override: dictionary with key/values that will override 765 our values 766 @rtype: dict 767 @return: the hook environment dictionary 768 769 """ 770 cluster = lu.cfg.GetClusterInfo() 771 bep = cluster.FillBE(instance) 772 hvp = cluster.FillHV(instance) 773 args = { 774 'name': instance.name, 775 'primary_node': instance.primary_node, 776 'secondary_nodes': instance.secondary_nodes, 777 'os_type': instance.os, 778 'status': instance.admin_up, 779 'memory': bep[constants.BE_MEMORY], 780 'vcpus': bep[constants.BE_VCPUS], 781 'nics': _NICListToTuple(lu, instance.nics), 782 'disk_template': instance.disk_template, 783 'disks': [(disk.size, disk.mode) for disk in instance.disks], 784 'bep': bep, 785 'hvp': hvp, 786 'hypervisor_name': instance.hypervisor, 787 } 788 if override: 789 args.update(override) 790 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
791
792 793 -def _AdjustCandidatePool(lu, exceptions):
794 """Adjust the candidate pool after node operations. 795 796 """ 797 mod_list = lu.cfg.MaintainCandidatePool(exceptions) 798 if mod_list: 799 lu.LogInfo("Promoted nodes to master candidate role: %s", 800 utils.CommaJoin(node.name for node in mod_list)) 801 for name in mod_list: 802 lu.context.ReaddNode(name) 803 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions) 804 if mc_now > mc_max: 805 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" % 806 (mc_now, mc_max))
807
808 809 -def _DecideSelfPromotion(lu, exceptions=None):
810 """Decide whether I should promote myself as a master candidate. 811 812 """ 813 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 814 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 815 # the new node will increase mc_max with one, so: 816 mc_should = min(mc_should + 1, cp_size) 817 return mc_now < mc_should
818
819 820 -def _CheckNicsBridgesExist(lu, target_nics, target_node, 821 profile=constants.PP_DEFAULT):
822 """Check that the brigdes needed by a list of nics exist. 823 824 """ 825 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile] 826 paramslist = [objects.FillDict(c_nicparams, nic.nicparams) 827 for nic in target_nics] 828 brlist = [params[constants.NIC_LINK] for params in paramslist 829 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED] 830 if brlist: 831 result = lu.rpc.call_bridges_exist(target_node, brlist) 832 result.Raise("Error checking bridges on destination node '%s'" % 833 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
834
835 836 -def _CheckInstanceBridgesExist(lu, instance, node=None):
837 """Check that the brigdes needed by an instance exist. 838 839 """ 840 if node is None: 841 node = instance.primary_node 842 _CheckNicsBridgesExist(lu, instance.nics, node)
843
844 845 -def _CheckOSVariant(os_obj, name):
846 """Check whether an OS name conforms to the os variants specification. 847 848 @type os_obj: L{objects.OS} 849 @param os_obj: OS object to check 850 @type name: string 851 @param name: OS name passed by the user, to check for validity 852 853 """ 854 if not os_obj.supported_variants: 855 return 856 try: 857 variant = name.split("+", 1)[1] 858 except IndexError: 859 raise errors.OpPrereqError("OS name must include a variant", 860 errors.ECODE_INVAL) 861 862 if variant not in os_obj.supported_variants: 863 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
864
865 866 -def _GetNodeInstancesInner(cfg, fn):
867 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
868
869 870 -def _GetNodeInstances(cfg, node_name):
871 """Returns a list of all primary and secondary instances on a node. 872 873 """ 874 875 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
876
877 878 -def _GetNodePrimaryInstances(cfg, node_name):
879 """Returns primary instances on a node. 880 881 """ 882 return _GetNodeInstancesInner(cfg, 883 lambda inst: node_name == inst.primary_node)
884
885 886 -def _GetNodeSecondaryInstances(cfg, node_name):
887 """Returns secondary instances on a node. 888 889 """ 890 return _GetNodeInstancesInner(cfg, 891 lambda inst: node_name in inst.secondary_nodes)
892
893 894 -def _GetStorageTypeArgs(cfg, storage_type):
895 """Returns the arguments for a storage type. 896 897 """ 898 # Special case for file storage 899 if storage_type == constants.ST_FILE: 900 # storage.FileStorage wants a list of storage directories 901 return [[cfg.GetFileStorageDir()]] 902 903 return []
904
905 906 -def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
907 faulty = [] 908 909 for dev in instance.disks: 910 cfg.SetDiskID(dev, node_name) 911 912 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) 913 result.Raise("Failed to get disk status from node %s" % node_name, 914 prereq=prereq, ecode=errors.ECODE_ENVIRON) 915 916 for idx, bdev_status in enumerate(result.payload): 917 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: 918 faulty.append(idx) 919 920 return faulty
921
922 923 -def _FormatTimestamp(secs):
924 """Formats a Unix timestamp with the local timezone. 925 926 """ 927 return time.strftime("%F %T %Z", time.gmtime(secs))
928
929 930 -class LUPostInitCluster(LogicalUnit):
931 """Logical unit for running hooks after cluster initialization. 932 933 """ 934 HPATH = "cluster-init" 935 HTYPE = constants.HTYPE_CLUSTER 936 _OP_REQP = [] 937
938 - def BuildHooksEnv(self):
939 """Build hooks env. 940 941 """ 942 env = {"OP_TARGET": self.cfg.GetClusterName()} 943 mn = self.cfg.GetMasterNode() 944 return env, [], [mn]
945
946 - def CheckPrereq(self):
947 """No prerequisites to check. 948 949 """ 950 return True
951
952 - def Exec(self, feedback_fn):
953 """Nothing to do. 954 955 """ 956 return True
957
958 959 -class LUDestroyCluster(LogicalUnit):
960 """Logical unit for destroying the cluster. 961 962 """ 963 HPATH = "cluster-destroy" 964 HTYPE = constants.HTYPE_CLUSTER 965 _OP_REQP = [] 966
967 - def BuildHooksEnv(self):
968 """Build hooks env. 969 970 """ 971 env = {"OP_TARGET": self.cfg.GetClusterName()} 972 return env, [], []
973
974 - def CheckPrereq(self):
975 """Check prerequisites. 976 977 This checks whether the cluster is empty. 978 979 Any errors are signaled by raising errors.OpPrereqError. 980 981 """ 982 master = self.cfg.GetMasterNode() 983 984 nodelist = self.cfg.GetNodeList() 985 if len(nodelist) != 1 or nodelist[0] != master: 986 raise errors.OpPrereqError("There are still %d node(s) in" 987 " this cluster." % (len(nodelist) - 1), 988 errors.ECODE_INVAL) 989 instancelist = self.cfg.GetInstanceList() 990 if instancelist: 991 raise errors.OpPrereqError("There are still %d instance(s) in" 992 " this cluster." % len(instancelist), 993 errors.ECODE_INVAL)
994
995 - def Exec(self, feedback_fn):
996 """Destroys the cluster. 997 998 """ 999 master = self.cfg.GetMasterNode() 1000 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1001 1002 # Run post hooks on master node before it's removed 1003 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) 1004 try: 1005 hm.RunPhase(constants.HOOKS_PHASE_POST, [master]) 1006 except: 1007 # pylint: disable-msg=W0702 1008 self.LogWarning("Errors occurred running hooks on %s" % master) 1009 1010 result = self.rpc.call_node_stop_master(master, False) 1011 result.Raise("Could not disable the master role") 1012 1013 if modify_ssh_setup: 1014 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) 1015 utils.CreateBackup(priv_key) 1016 utils.CreateBackup(pub_key) 1017 1018 return master
1019
1020 1021 -def _VerifyCertificateInner(filename, expired, not_before, not_after, now, 1022 warn_days=constants.SSL_CERT_EXPIRATION_WARN, 1023 error_days=constants.SSL_CERT_EXPIRATION_ERROR):
1024 """Verifies certificate details for LUVerifyCluster. 1025 1026 """ 1027 if expired: 1028 msg = "Certificate %s is expired" % filename 1029 1030 if not_before is not None and not_after is not None: 1031 msg += (" (valid from %s to %s)" % 1032 (_FormatTimestamp(not_before), 1033 _FormatTimestamp(not_after))) 1034 elif not_before is not None: 1035 msg += " (valid from %s)" % _FormatTimestamp(not_before) 1036 elif not_after is not None: 1037 msg += " (valid until %s)" % _FormatTimestamp(not_after) 1038 1039 return (LUVerifyCluster.ETYPE_ERROR, msg) 1040 1041 elif not_before is not None and not_before > now: 1042 return (LUVerifyCluster.ETYPE_WARNING, 1043 "Certificate %s not yet valid (valid from %s)" % 1044 (filename, _FormatTimestamp(not_before))) 1045 1046 elif not_after is not None: 1047 remaining_days = int((not_after - now) / (24 * 3600)) 1048 1049 msg = ("Certificate %s expires in %d days" % (filename, remaining_days)) 1050 1051 if remaining_days <= error_days: 1052 return (LUVerifyCluster.ETYPE_ERROR, msg) 1053 1054 if remaining_days <= warn_days: 1055 return (LUVerifyCluster.ETYPE_WARNING, msg) 1056 1057 return (None, None)
1058
1059 1060 -def _VerifyCertificate(filename):
1061 """Verifies a certificate for LUVerifyCluster. 1062 1063 @type filename: string 1064 @param filename: Path to PEM file 1065 1066 """ 1067 try: 1068 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1069 utils.ReadFile(filename)) 1070 except Exception, err: # pylint: disable-msg=W0703 1071 return (LUVerifyCluster.ETYPE_ERROR, 1072 "Failed to load X509 certificate %s: %s" % (filename, err)) 1073 1074 # Depending on the pyOpenSSL version, this can just return (None, None) 1075 (not_before, not_after) = utils.GetX509CertValidity(cert) 1076 1077 return _VerifyCertificateInner(filename, cert.has_expired(), 1078 not_before, not_after, time.time())
1079
1080 1081 -class LUVerifyCluster(LogicalUnit):
1082 """Verifies the cluster status. 1083 1084 """ 1085 HPATH = "cluster-verify" 1086 HTYPE = constants.HTYPE_CLUSTER 1087 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"] 1088 REQ_BGL = False 1089 1090 TCLUSTER = "cluster" 1091 TNODE = "node" 1092 TINSTANCE = "instance" 1093 1094 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") 1095 ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") 1096 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") 1097 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") 1098 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") 1099 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") 1100 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") 1101 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") 1102 ENODEDRBD = (TNODE, "ENODEDRBD") 1103 ENODEFILECHECK = (TNODE, "ENODEFILECHECK") 1104 ENODEHOOKS = (TNODE, "ENODEHOOKS") 1105 ENODEHV = (TNODE, "ENODEHV") 1106 ENODELVM = (TNODE, "ENODELVM") 1107 ENODEN1 = (TNODE, "ENODEN1") 1108 ENODENET = (TNODE, "ENODENET") 1109 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE") 1110 ENODEORPHANLV = (TNODE, "ENODEORPHANLV") 1111 ENODERPC = (TNODE, "ENODERPC") 1112 ENODESSH = (TNODE, "ENODESSH") 1113 ENODEVERSION = (TNODE, "ENODEVERSION") 1114 ENODESETUP = (TNODE, "ENODESETUP") 1115 ENODETIME = (TNODE, "ENODETIME") 1116 1117 ETYPE_FIELD = "code" 1118 ETYPE_ERROR = "ERROR" 1119 ETYPE_WARNING = "WARNING" 1120
1121 - class NodeImage(object):
1122 """A class representing the logical and physical status of a node. 1123 1124 @ivar volumes: a structure as returned from 1125 L{ganeti.backend.GetVolumeList} (runtime) 1126 @ivar instances: a list of running instances (runtime) 1127 @ivar pinst: list of configured primary instances (config) 1128 @ivar sinst: list of configured secondary instances (config) 1129 @ivar sbp: diction of {secondary-node: list of instances} of all peers 1130 of this node (config) 1131 @ivar mfree: free memory, as reported by hypervisor (runtime) 1132 @ivar dfree: free disk, as reported by the node (runtime) 1133 @ivar offline: the offline status (config) 1134 @type rpc_fail: boolean 1135 @ivar rpc_fail: whether the RPC verify call was successfull (overall, 1136 not whether the individual keys were correct) (runtime) 1137 @type lvm_fail: boolean 1138 @ivar lvm_fail: whether the RPC call didn't return valid LVM data 1139 @type hyp_fail: boolean 1140 @ivar hyp_fail: whether the RPC call didn't return the instance list 1141 @type ghost: boolean 1142 @ivar ghost: whether this is a known node or not (config) 1143 1144 """
1145 - def __init__(self, offline=False):
1146 self.volumes = {} 1147 self.instances = [] 1148 self.pinst = [] 1149 self.sinst = [] 1150 self.sbp = {} 1151 self.mfree = 0 1152 self.dfree = 0 1153 self.offline = offline 1154 self.rpc_fail = False 1155 self.lvm_fail = False 1156 self.hyp_fail = False 1157 self.ghost = False
1158
1159 - def ExpandNames(self):
1160 self.needed_locks = { 1161 locking.LEVEL_NODE: locking.ALL_SET, 1162 locking.LEVEL_INSTANCE: locking.ALL_SET, 1163 } 1164 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1165
1166 - def _Error(self, ecode, item, msg, *args, **kwargs):
1167 """Format an error message. 1168 1169 Based on the opcode's error_codes parameter, either format a 1170 parseable error code, or a simpler error string. 1171 1172 This must be called only from Exec and functions called from Exec. 1173 1174 """ 1175 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) 1176 itype, etxt = ecode 1177 # first complete the msg 1178 if args: 1179 msg = msg % args 1180 # then format the whole message 1181 if self.op.error_codes: 1182 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) 1183 else: 1184 if item: 1185 item = " " + item 1186 else: 1187 item = "" 1188 msg = "%s: %s%s: %s" % (ltype, itype, item, msg) 1189 # and finally report it via the feedback_fn 1190 self._feedback_fn(" - %s" % msg)
1191
1192 - def _ErrorIf(self, cond, *args, **kwargs):
1193 """Log an error message if the passed condition is True. 1194 1195 """ 1196 cond = bool(cond) or self.op.debug_simulate_errors 1197 if cond: 1198 self._Error(*args, **kwargs) 1199 # do not mark the operation as failed for WARN cases only 1200 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: 1201 self.bad = self.bad or cond
1202
1203 - def _VerifyNode(self, ninfo, nresult):
1204 """Run multiple tests against a node. 1205 1206 Test list: 1207 1208 - compares ganeti version 1209 - checks vg existence and size > 20G 1210 - checks config file checksum 1211 - checks ssh to other nodes 1212 1213 @type ninfo: L{objects.Node} 1214 @param ninfo: the node to check 1215 @param nresult: the results from the node 1216 @rtype: boolean 1217 @return: whether overall this call was successful (and we can expect 1218 reasonable values in the respose) 1219 1220 """ 1221 node = ninfo.name 1222 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1223 1224 # main result, nresult should be a non-empty dict 1225 test = not nresult or not isinstance(nresult, dict) 1226 _ErrorIf(test, self.ENODERPC, node, 1227 "unable to verify node: no data returned") 1228 if test: 1229 return False 1230 1231 # compares ganeti version 1232 local_version = constants.PROTOCOL_VERSION 1233 remote_version = nresult.get("version", None) 1234 test = not (remote_version and 1235 isinstance(remote_version, (list, tuple)) and 1236 len(remote_version) == 2) 1237 _ErrorIf(test, self.ENODERPC, node, 1238 "connection to node returned invalid data") 1239 if test: 1240 return False 1241 1242 test = local_version != remote_version[0] 1243 _ErrorIf(test, self.ENODEVERSION, node, 1244 "incompatible protocol versions: master %s," 1245 " node %s", local_version, remote_version[0]) 1246 if test: 1247 return False 1248 1249 # node seems compatible, we can actually try to look into its results 1250 1251 # full package version 1252 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], 1253 self.ENODEVERSION, node, 1254 "software version mismatch: master %s, node %s", 1255 constants.RELEASE_VERSION, remote_version[1], 1256 code=self.ETYPE_WARNING) 1257 1258 hyp_result = nresult.get(constants.NV_HYPERVISOR, None) 1259 if isinstance(hyp_result, dict): 1260 for hv_name, hv_result in hyp_result.iteritems(): 1261 test = hv_result is not None 1262 _ErrorIf(test, self.ENODEHV, node, 1263 "hypervisor %s verify failure: '%s'", hv_name, hv_result) 1264 1265 1266 test = nresult.get(constants.NV_NODESETUP, 1267 ["Missing NODESETUP results"]) 1268 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", 1269 "; ".join(test)) 1270 1271 return True
1272
1273 - def _VerifyNodeTime(self, ninfo, nresult, 1274 nvinfo_starttime, nvinfo_endtime):
1275 """Check the node time. 1276 1277 @type ninfo: L{objects.Node} 1278 @param ninfo: the node to check 1279 @param nresult: the remote results for the node 1280 @param nvinfo_starttime: the start time of the RPC call 1281 @param nvinfo_endtime: the end time of the RPC call 1282 1283 """ 1284 node = ninfo.name 1285 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1286 1287 ntime = nresult.get(constants.NV_TIME, None) 1288 try: 1289 ntime_merged = utils.MergeTime(ntime) 1290 except (ValueError, TypeError): 1291 _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time") 1292 return 1293 1294 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): 1295 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged) 1296 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW): 1297 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime) 1298 else: 1299 ntime_diff = None 1300 1301 _ErrorIf(ntime_diff is not None, self.ENODETIME, node, 1302 "Node time diverges by at least %s from master node time", 1303 ntime_diff)
1304
1305 - def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1306 """Check the node time. 1307 1308 @type ninfo: L{objects.Node} 1309 @param ninfo: the node to check 1310 @param nresult: the remote results for the node 1311 @param vg_name: the configured VG name 1312 1313 """ 1314 if vg_name is None: 1315 return 1316 1317 node = ninfo.name 1318 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1319 1320 # checks vg existence and size > 20G 1321 vglist = nresult.get(constants.NV_VGLIST, None) 1322 test = not vglist 1323 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") 1324 if not test: 1325 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, 1326 constants.MIN_VG_SIZE) 1327 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) 1328 1329 # check pv names 1330 pvlist = nresult.get(constants.NV_PVLIST, None) 1331 test = pvlist is None 1332 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node") 1333 if not test: 1334 # check that ':' is not present in PV names, since it's a 1335 # special character for lvcreate (denotes the range of PEs to 1336 # use on the PV) 1337 for _, pvname, owner_vg in pvlist: 1338 test = ":" in pvname 1339 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" 1340 " '%s' of VG '%s'", pvname, owner_vg)
1341
1342 - def _VerifyNodeNetwork(self, ninfo, nresult):
1343 """Check the node time. 1344 1345 @type ninfo: L{objects.Node} 1346 @param ninfo: the node to check 1347 @param nresult: the remote results for the node 1348 1349 """ 1350 node = ninfo.name 1351 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1352 1353 test = constants.NV_NODELIST not in nresult 1354 _ErrorIf(test, self.ENODESSH, node, 1355 "node hasn't returned node ssh connectivity data") 1356 if not test: 1357 if nresult[constants.NV_NODELIST]: 1358 for a_node, a_msg in nresult[constants.NV_NODELIST].items(): 1359 _ErrorIf(True, self.ENODESSH, node, 1360 "ssh communication with node '%s': %s", a_node, a_msg) 1361 1362 test = constants.NV_NODENETTEST not in nresult 1363 _ErrorIf(test, self.ENODENET, node, 1364 "node hasn't returned node tcp connectivity data") 1365 if not test: 1366 if nresult[constants.NV_NODENETTEST]: 1367 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys()) 1368 for anode in nlist: 1369 _ErrorIf(True, self.ENODENET, node, 1370 "tcp communication with node '%s': %s", 1371 anode, nresult[constants.NV_NODENETTEST][anode]) 1372 1373 test = constants.NV_MASTERIP not in nresult 1374 _ErrorIf(test, self.ENODENET, node, 1375 "node hasn't returned node master IP reachability data") 1376 if not test: 1377 if not nresult[constants.NV_MASTERIP]: 1378 if node == self.master_node: 1379 msg = "the master node cannot reach the master IP (not configured?)" 1380 else: 1381 msg = "cannot reach the master IP" 1382 _ErrorIf(True, self.ENODENET, node, msg)
1383 1384
1385 - def _VerifyInstance(self, instance, instanceconfig, node_image):
1386 """Verify an instance. 1387 1388 This function checks to see if the required block devices are 1389 available on the instance's node. 1390 1391 """ 1392 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1393 node_current = instanceconfig.primary_node 1394 1395 node_vol_should = {} 1396 instanceconfig.MapLVsByNode(node_vol_should) 1397 1398 for node in node_vol_should: 1399 n_img = node_image[node] 1400 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: 1401 # ignore missing volumes on offline or broken nodes 1402 continue 1403 for volume in node_vol_should[node]: 1404 test = volume not in n_img.volumes 1405 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance, 1406 "volume %s missing on node %s", volume, node) 1407 1408 if instanceconfig.admin_up: 1409 pri_img = node_image[node_current] 1410 test = instance not in pri_img.instances and not pri_img.offline 1411 _ErrorIf(test, self.EINSTANCEDOWN, instance, 1412 "instance not running on its primary node %s", 1413 node_current) 1414 1415 for node, n_img in node_image.items(): 1416 if (not node == node_current): 1417 test = instance in n_img.instances 1418 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, 1419 "instance should not run on node %s", node)
1420
1421 - def _VerifyOrphanVolumes(self, node_vol_should, node_image):
1422 """Verify if there are any unknown volumes in the cluster. 1423 1424 The .os, .swap and backup volumes are ignored. All other volumes are 1425 reported as unknown. 1426 1427 """ 1428 for node, n_img in node_image.items(): 1429 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: 1430 # skip non-healthy nodes 1431 continue 1432 for volume in n_img.volumes: 1433 test = (node not in node_vol_should or 1434 volume not in node_vol_should[node]) 1435 self._ErrorIf(test, self.ENODEORPHANLV, node, 1436 "volume %s is unknown", volume)
1437
1438 - def _VerifyOrphanInstances(self, instancelist, node_image):
1439 """Verify the list of running instances. 1440 1441 This checks what instances are running but unknown to the cluster. 1442 1443 """ 1444 for node, n_img in node_image.items(): 1445 for o_inst in n_img.instances: 1446 test = o_inst not in instancelist 1447 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, 1448 "instance %s on node %s should not exist", o_inst, node)
1449
1450 - def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1451 """Verify N+1 Memory Resilience. 1452 1453 Check that if one single node dies we can still start all the 1454 instances it was primary for. 1455 1456 """ 1457 for node, n_img in node_image.items(): 1458 # This code checks that every node which is now listed as 1459 # secondary has enough memory to host all instances it is 1460 # supposed to should a single other node in the cluster fail. 1461 # FIXME: not ready for failover to an arbitrary node 1462 # FIXME: does not support file-backed instances 1463 # WARNING: we currently take into account down instances as well 1464 # as up ones, considering that even if they're down someone 1465 # might want to start them even in the event of a node failure. 1466 for prinode, instances in n_img.sbp.items(): 1467 needed_mem = 0 1468 for instance in instances: 1469 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance]) 1470 if bep[constants.BE_AUTO_BALANCE]: 1471 needed_mem += bep[constants.BE_MEMORY] 1472 test = n_img.mfree < needed_mem 1473 self._ErrorIf(test, self.ENODEN1, node, 1474 "not enough memory on to accommodate" 1475 " failovers should peer node %s fail", prinode)
1476
1477 - def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum, 1478 master_files):
1479 """Verifies and computes the node required file checksums. 1480 1481 @type ninfo: L{objects.Node} 1482 @param ninfo: the node to check 1483 @param nresult: the remote results for the node 1484 @param file_list: required list of files 1485 @param local_cksum: dictionary of local files and their checksums 1486 @param master_files: list of files that only masters should have 1487 1488 """ 1489 node = ninfo.name 1490 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1491 1492 remote_cksum = nresult.get(constants.NV_FILELIST, None) 1493 test = not isinstance(remote_cksum, dict) 1494 _ErrorIf(test, self.ENODEFILECHECK, node, 1495 "node hasn't returned file checksum data") 1496 if test: 1497 return 1498 1499 for file_name in file_list: 1500 node_is_mc = ninfo.master_candidate 1501 must_have = (file_name not in master_files) or node_is_mc 1502 # missing 1503 test1 = file_name not in remote_cksum 1504 # invalid checksum 1505 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] 1506 # existing and good 1507 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] 1508 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, 1509 "file '%s' missing", file_name) 1510 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, 1511 "file '%s' has wrong checksum", file_name) 1512 # not candidate and this is not a must-have file 1513 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, 1514 "file '%s' should not exist on non master" 1515 " candidates (and the file is outdated)", file_name) 1516 # all good, except non-master/non-must have combination 1517 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, 1518 "file '%s' should not exist" 1519 " on non master candidates", file_name)
1520
1521 - def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map):
1522 """Verifies and the node DRBD status. 1523 1524 @type ninfo: L{objects.Node} 1525 @param ninfo: the node to check 1526 @param nresult: the remote results for the node 1527 @param instanceinfo: the dict of instances 1528 @param drbd_map: the DRBD map as returned by 1529 L{ganeti.config.ConfigWriter.ComputeDRBDMap} 1530 1531 """ 1532 node = ninfo.name 1533 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1534 1535 # compute the DRBD minors 1536 node_drbd = {} 1537 for minor, instance in drbd_map[node].items(): 1538 test = instance not in instanceinfo 1539 _ErrorIf(test, self.ECLUSTERCFG, None, 1540 "ghost instance '%s' in temporary DRBD map", instance) 1541 # ghost instance should not be running, but otherwise we 1542 # don't give double warnings (both ghost instance and 1543 # unallocated minor in use) 1544 if test: 1545 node_drbd[minor] = (instance, False) 1546 else: 1547 instance = instanceinfo[instance] 1548 node_drbd[minor] = (instance.name, instance.admin_up) 1549 1550 # and now check them 1551 used_minors = nresult.get(constants.NV_DRBDLIST, []) 1552 test = not isinstance(used_minors, (tuple, list)) 1553 _ErrorIf(test, self.ENODEDRBD, node, 1554 "cannot parse drbd status file: %s", str(used_minors)) 1555 if test: 1556 # we cannot check drbd status 1557 return 1558 1559 for minor, (iname, must_exist) in node_drbd.items(): 1560 test = minor not in used_minors and must_exist 1561 _ErrorIf(test, self.ENODEDRBD, node, 1562 "drbd minor %d of instance %s is not active", minor, iname) 1563 for minor in used_minors: 1564 test = minor not in node_drbd 1565 _ErrorIf(test, self.ENODEDRBD, node, 1566 "unallocated drbd minor %d is in use", minor)
1567
1568 - def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1569 """Verifies and updates the node volume data. 1570 1571 This function will update a L{NodeImage}'s internal structures 1572 with data from the remote call. 1573 1574 @type ninfo: L{objects.Node} 1575 @param ninfo: the node to check 1576 @param nresult: the remote results for the node 1577 @param nimg: the node image object 1578 @param vg_name: the configured VG name 1579 1580 """ 1581 node = ninfo.name 1582 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1583 1584 nimg.lvm_fail = True 1585 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") 1586 if vg_name is None: 1587 pass 1588 elif isinstance(lvdata, basestring): 1589 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", 1590 utils.SafeEncode(lvdata)) 1591 elif not isinstance(lvdata, dict): 1592 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") 1593 else: 1594 nimg.volumes = lvdata 1595 nimg.lvm_fail = False
1596
1597 - def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1598 """Verifies and updates the node instance list. 1599 1600 If the listing was successful, then updates this node's instance 1601 list. Otherwise, it marks the RPC call as failed for the instance 1602 list key. 1603 1604 @type ninfo: L{objects.Node} 1605 @param ninfo: the node to check 1606 @param nresult: the remote results for the node 1607 @param nimg: the node image object 1608 1609 """ 1610 idata = nresult.get(constants.NV_INSTANCELIST, None) 1611 test = not isinstance(idata, list) 1612 self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed" 1613 " (instancelist): %s", utils.SafeEncode(str(idata))) 1614 if test: 1615 nimg.hyp_fail = True 1616 else: 1617 nimg.instances = idata
1618
1619 - def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1620 """Verifies and computes a node information map 1621 1622 @type ninfo: L{objects.Node} 1623 @param ninfo: the node to check 1624 @param nresult: the remote results for the node 1625 @param nimg: the node image object 1626 @param vg_name: the configured VG name 1627 1628 """ 1629 node = ninfo.name 1630 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1631 1632 # try to read free memory (from the hypervisor) 1633 hv_info = nresult.get(constants.NV_HVINFO, None) 1634 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info 1635 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") 1636 if not test: 1637 try: 1638 nimg.mfree = int(hv_info["memory_free"]) 1639 except (ValueError, TypeError): 1640 _ErrorIf(True, self.ENODERPC, node, 1641 "node returned invalid nodeinfo, check hypervisor") 1642 1643 # FIXME: devise a free space model for file based instances as well 1644 if vg_name is not None: 1645 test = (constants.NV_VGLIST not in nresult or 1646 vg_name not in nresult[constants.NV_VGLIST]) 1647 _ErrorIf(test, self.ENODELVM, node, 1648 "node didn't return data for the volume group '%s'" 1649 " - it is either missing or broken", vg_name) 1650 if not test: 1651 try: 1652 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name]) 1653 except (ValueError, TypeError): 1654 _ErrorIf(True, self.ENODERPC, node, 1655 "node returned invalid LVM info, check LVM status")
1656
1657 - def CheckPrereq(self):
1658 """Check prerequisites. 1659 1660 Transform the list of checks we're going to skip into a set and check that 1661 all its members are valid. 1662 1663 """ 1664 self.skip_set = frozenset(self.op.skip_checks) 1665 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set): 1666 raise errors.OpPrereqError("Invalid checks to be skipped specified", 1667 errors.ECODE_INVAL)
1668
1669 - def BuildHooksEnv(self):
1670 """Build hooks env. 1671 1672 Cluster-Verify hooks just ran in the post phase and their failure makes 1673 the output be logged in the verify output and the verification to fail. 1674 1675 """ 1676 all_nodes = self.cfg.GetNodeList() 1677 env = { 1678 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) 1679 } 1680 for node in self.cfg.GetAllNodesInfo().values(): 1681 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags()) 1682 1683 return env, [], all_nodes
1684
1685 - def Exec(self, feedback_fn):
1686 """Verify integrity of cluster, performing various test on nodes. 1687 1688 """ 1689 self.bad = False 1690 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 1691 verbose = self.op.verbose 1692 self._feedback_fn = feedback_fn 1693 feedback_fn("* Verifying global settings") 1694 for msg in self.cfg.VerifyConfig(): 1695 _ErrorIf(True, self.ECLUSTERCFG, None, msg) 1696 1697 # Check the cluster certificates 1698 for cert_filename in constants.ALL_CERT_FILES: 1699 (errcode, msg) = _VerifyCertificate(cert_filename) 1700 _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) 1701 1702 vg_name = self.cfg.GetVGName() 1703 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors 1704 cluster = self.cfg.GetClusterInfo() 1705 nodelist = utils.NiceSort(self.cfg.GetNodeList()) 1706 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] 1707 instancelist = utils.NiceSort(self.cfg.GetInstanceList()) 1708 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname)) 1709 for iname in instancelist) 1710 i_non_redundant = [] # Non redundant instances 1711 i_non_a_balanced = [] # Non auto-balanced instances 1712 n_offline = 0 # Count of offline nodes 1713 n_drained = 0 # Count of nodes being drained 1714 node_vol_should = {} 1715 1716 # FIXME: verify OS list 1717 # do local checksums 1718 master_files = [constants.CLUSTER_CONF_FILE] 1719 master_node = self.master_node = self.cfg.GetMasterNode() 1720 master_ip = self.cfg.GetMasterIP() 1721 1722 file_names = ssconf.SimpleStore().GetFileList() 1723 file_names.extend(constants.ALL_CERT_FILES) 1724 file_names.extend(master_files) 1725 if cluster.modify_etc_hosts: 1726 file_names.append(constants.ETC_HOSTS) 1727 1728 local_checksums = utils.FingerprintFiles(file_names) 1729 1730 feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) 1731 node_verify_param = { 1732 constants.NV_FILELIST: file_names, 1733 constants.NV_NODELIST: [node.name for node in nodeinfo 1734 if not node.offline], 1735 constants.NV_HYPERVISOR: hypervisors, 1736 constants.NV_NODENETTEST: [(node.name, node.primary_ip, 1737 node.secondary_ip) for node in nodeinfo 1738 if not node.offline], 1739 constants.NV_INSTANCELIST: hypervisors, 1740 constants.NV_VERSION: None, 1741 constants.NV_HVINFO: self.cfg.GetHypervisorType(), 1742 constants.NV_NODESETUP: None, 1743 constants.NV_TIME: None, 1744 constants.NV_MASTERIP: (master_node, master_ip), 1745 } 1746 1747 if vg_name is not None: 1748 node_verify_param[constants.NV_VGLIST] = None 1749 node_verify_param[constants.NV_LVLIST] = vg_name 1750 node_verify_param[constants.NV_PVLIST] = [vg_name] 1751 node_verify_param[constants.NV_DRBDLIST] = None 1752 1753 # Build our expected cluster state 1754 node_image = dict((node.name, self.NodeImage(offline=node.offline)) 1755 for node in nodeinfo) 1756 1757 for instance in instancelist: 1758 inst_config = instanceinfo[instance] 1759 1760 for nname in inst_config.all_nodes: 1761 if nname not in node_image: 1762 # ghost node 1763 gnode = self.NodeImage() 1764 gnode.ghost = True 1765 node_image[nname] = gnode 1766 1767 inst_config.MapLVsByNode(node_vol_should) 1768 1769 pnode = inst_config.primary_node 1770 node_image[pnode].pinst.append(instance) 1771 1772 for snode in inst_config.secondary_nodes: 1773 nimg = node_image[snode] 1774 nimg.sinst.append(instance) 1775 if pnode not in nimg.sbp: 1776 nimg.sbp[pnode] = [] 1777 nimg.sbp[pnode].append(instance) 1778 1779 # At this point, we have the in-memory data structures complete, 1780 # except for the runtime information, which we'll gather next 1781 1782 # Due to the way our RPC system works, exact response times cannot be 1783 # guaranteed (e.g. a broken node could run into a timeout). By keeping the 1784 # time before and after executing the request, we can at least have a time 1785 # window. 1786 nvinfo_starttime = time.time() 1787 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, 1788 self.cfg.GetClusterName()) 1789 nvinfo_endtime = time.time() 1790 1791 all_drbd_map = self.cfg.ComputeDRBDMap() 1792 1793 feedback_fn("* Verifying node status") 1794 for node_i in nodeinfo: 1795 node = node_i.name 1796 nimg = node_image[node] 1797 1798 if node_i.offline: 1799 if verbose: 1800 feedback_fn("* Skipping offline node %s" % (node,)) 1801 n_offline += 1 1802 continue 1803 1804 if node == master_node: 1805 ntype = "master" 1806 elif node_i.master_candidate: 1807 ntype = "master candidate" 1808 elif node_i.drained: 1809 ntype = "drained" 1810 n_drained += 1 1811 else: 1812 ntype = "regular" 1813 if verbose: 1814 feedback_fn("* Verifying node %s (%s)" % (node, ntype)) 1815 1816 msg = all_nvinfo[node].fail_msg 1817 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg) 1818 if msg: 1819 nimg.rpc_fail = True 1820 continue 1821 1822 nresult = all_nvinfo[node].payload 1823 1824 nimg.call_ok = self._VerifyNode(node_i, nresult) 1825 self._VerifyNodeNetwork(node_i, nresult) 1826 self._VerifyNodeLVM(node_i, nresult, vg_name) 1827 self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums, 1828 master_files) 1829 self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map) 1830 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) 1831 1832 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) 1833 self._UpdateNodeInstances(node_i, nresult, nimg) 1834 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name) 1835 1836 feedback_fn("* Verifying instance status") 1837 for instance in instancelist: 1838 if verbose: 1839 feedback_fn("* Verifying instance %s" % instance) 1840 inst_config = instanceinfo[instance] 1841 self._VerifyInstance(instance, inst_config, node_image) 1842 inst_nodes_offline = [] 1843 1844 pnode = inst_config.primary_node 1845 pnode_img = node_image[pnode] 1846 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline, 1847 self.ENODERPC, pnode, "instance %s, connection to" 1848 " primary node failed", instance) 1849 1850 if pnode_img.offline: 1851 inst_nodes_offline.append(pnode) 1852 1853 # If the instance is non-redundant we cannot survive losing its primary 1854 # node, so we are not N+1 compliant. On the other hand we have no disk 1855 # templates with more than one secondary so that situation is not well 1856 # supported either. 1857 # FIXME: does not support file-backed instances 1858 if not inst_config.secondary_nodes: 1859 i_non_redundant.append(instance) 1860 _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT, 1861 instance, "instance has multiple secondary nodes: %s", 1862 utils.CommaJoin(inst_config.secondary_nodes), 1863 code=self.ETYPE_WARNING) 1864 1865 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]: 1866 i_non_a_balanced.append(instance) 1867 1868 for snode in inst_config.secondary_nodes: 1869 s_img = node_image[snode] 1870 _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode, 1871 "instance %s, connection to secondary node failed", instance) 1872 1873 if s_img.offline: 1874 inst_nodes_offline.append(snode) 1875 1876 # warn that the instance lives on offline nodes 1877 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance, 1878 "instance lives on offline node(s) %s", 1879 utils.CommaJoin(inst_nodes_offline)) 1880 # ... or ghost nodes 1881 for node in inst_config.all_nodes: 1882 _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance, 1883 "instance lives on ghost node %s", node) 1884 1885 feedback_fn("* Verifying orphan volumes") 1886 self._VerifyOrphanVolumes(node_vol_should, node_image) 1887 1888 feedback_fn("* Verifying orphan instances") 1889 self._VerifyOrphanInstances(instancelist, node_image) 1890 1891 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: 1892 feedback_fn("* Verifying N+1 Memory redundancy") 1893 self._VerifyNPlusOneMemory(node_image, instanceinfo) 1894 1895 feedback_fn("* Other Notes") 1896 if i_non_redundant: 1897 feedback_fn(" - NOTICE: %d non-redundant instance(s) found." 1898 % len(i_non_redundant)) 1899 1900 if i_non_a_balanced: 1901 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found." 1902 % len(i_non_a_balanced)) 1903 1904 if n_offline: 1905 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline) 1906 1907 if n_drained: 1908 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained) 1909 1910 return not self.bad
1911
1912 - def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1913 """Analyze the post-hooks' result 1914 1915 This method analyses the hook result, handles it, and sends some 1916 nicely-formatted feedback back to the user. 1917 1918 @param phase: one of L{constants.HOOKS_PHASE_POST} or 1919 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 1920 @param hooks_results: the results of the multi-node hooks rpc call 1921 @param feedback_fn: function used send feedback back to the caller 1922 @param lu_result: previous Exec result 1923 @return: the new Exec result, based on the previous result 1924 and hook results 1925 1926 """ 1927 # We only really run POST phase hooks, and are only interested in 1928 # their results 1929 if phase == constants.HOOKS_PHASE_POST: 1930 # Used to change hooks' output to proper indentation 1931 indent_re = re.compile('^', re.M) 1932 feedback_fn("* Hooks Results") 1933 assert hooks_results, "invalid result from hooks" 1934 1935 for node_name in hooks_results: 1936 res = hooks_results[node_name] 1937 msg = res.fail_msg 1938 test = msg and not res.offline 1939 self._ErrorIf(test, self.ENODEHOOKS, node_name, 1940 "Communication failure in hooks execution: %s", msg) 1941 if res.offline or msg: 1942 # No need to investigate payload if node is offline or gave an error. 1943 # override manually lu_result here as _ErrorIf only 1944 # overrides self.bad 1945 lu_result = 1 1946 continue 1947 for script, hkr, output in res.payload: 1948 test = hkr == constants.HKR_FAIL 1949 self._ErrorIf(test, self.ENODEHOOKS, node_name, 1950 "Script %s failed, output:", script) 1951 if test: 1952 output = indent_re.sub(' ', output) 1953 feedback_fn("%s" % output) 1954 lu_result = 0 1955 1956 return lu_result
1957
1958 1959 -class LUVerifyDisks(NoHooksLU):
1960 """Verifies the cluster disks status. 1961 1962 """ 1963 _OP_REQP = [] 1964 REQ_BGL = False 1965
1966 - def ExpandNames(self):
1967 self.needed_locks = { 1968 locking.LEVEL_NODE: locking.ALL_SET, 1969 locking.LEVEL_INSTANCE: locking.ALL_SET, 1970 } 1971 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1972
1973 - def CheckPrereq(self):
1974 """Check prerequisites. 1975 1976 This has no prerequisites. 1977 1978 """ 1979 pass
1980
1981 - def Exec(self, feedback_fn):
1982 """Verify integrity of cluster disks. 1983 1984 @rtype: tuple of three items 1985 @return: a tuple of (dict of node-to-node_error, list of instances 1986 which need activate-disks, dict of instance: (node, volume) for 1987 missing volumes 1988 1989 """ 1990 result = res_nodes, res_instances, res_missing = {}, [], {} 1991 1992 vg_name = self.cfg.GetVGName() 1993 nodes = utils.NiceSort(self.cfg.GetNodeList()) 1994 instances = [self.cfg.GetInstanceInfo(name) 1995 for name in self.cfg.GetInstanceList()] 1996 1997 nv_dict = {} 1998 for inst in instances: 1999 inst_lvs = {} 2000 if (not inst.admin_up or 2001 inst.disk_template not in constants.DTS_NET_MIRROR): 2002 continue 2003 inst.MapLVsByNode(inst_lvs) 2004 # transform { iname: {node: [vol,],},} to {(node, vol): iname} 2005 for node, vol_list in inst_lvs.iteritems(): 2006 for vol in vol_list: 2007 nv_dict[(node, vol)] = inst 2008 2009 if not nv_dict: 2010 return result 2011 2012 node_lvs = self.rpc.call_lv_list(nodes, vg_name) 2013 2014 for node in nodes: 2015 # node_volume 2016 node_res = node_lvs[node] 2017 if node_res.offline: 2018 continue 2019 msg = node_res.fail_msg 2020 if msg: 2021 logging.warning("Error enumerating LVs on node %s: %s", node, msg) 2022 res_nodes[node] = msg 2023 continue 2024 2025 lvs = node_res.payload 2026 for lv_name, (_, _, lv_online) in lvs.items(): 2027 inst = nv_dict.pop((node, lv_name), None) 2028 if (not lv_online and inst is not None 2029 and inst.name not in res_instances): 2030 res_instances.append(inst.name) 2031 2032 # any leftover items in nv_dict are missing LVs, let's arrange the 2033 # data better 2034 for key, inst in nv_dict.iteritems(): 2035 if inst.name not in res_missing: 2036 res_missing[inst.name] = [] 2037 res_missing[inst.name].append(key) 2038 2039 return result
2040
2041 2042 -class LURepairDiskSizes(NoHooksLU):
2043 """Verifies the cluster disks sizes. 2044 2045 """ 2046 _OP_REQP = ["instances"] 2047 REQ_BGL = False 2048
2049 - def ExpandNames(self):
2050 if not isinstance(self.op.instances, list): 2051 raise errors.OpPrereqError("Invalid argument type 'instances'", 2052 errors.ECODE_INVAL) 2053 2054 if self.op.instances: 2055 self.wanted_names = [] 2056 for name in self.op.instances: 2057 full_name = _ExpandInstanceName(self.cfg, name) 2058 self.wanted_names.append(full_name) 2059 self.needed_locks = { 2060 locking.LEVEL_NODE: [], 2061 locking.LEVEL_INSTANCE: self.wanted_names, 2062 } 2063 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 2064 else: 2065 self.wanted_names = None 2066 self.needed_locks = { 2067 locking.LEVEL_NODE: locking.ALL_SET, 2068 locking.LEVEL_INSTANCE: locking.ALL_SET, 2069 } 2070 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2071
2072 - def DeclareLocks(self, level):
2073 if level == locking.LEVEL_NODE and self.wanted_names is not None: 2074 self._LockInstancesNodes(primary_only=True)
2075
2076 - def CheckPrereq(self):
2077 """Check prerequisites. 2078 2079 This only checks the optional instance list against the existing names. 2080 2081 """ 2082 if self.wanted_names is None: 2083 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] 2084 2085 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name 2086 in self.wanted_names]
2087
2088 - def _EnsureChildSizes(self, disk):
2089 """Ensure children of the disk have the needed disk size. 2090 2091 This is valid mainly for DRBD8 and fixes an issue where the 2092 children have smaller disk size. 2093 2094 @param disk: an L{ganeti.objects.Disk} object 2095 2096 """ 2097 if disk.dev_type == constants.LD_DRBD8: 2098 assert disk.children, "Empty children for DRBD8?" 2099 fchild = disk.children[0] 2100 mismatch = fchild.size < disk.size 2101 if mismatch: 2102 self.LogInfo("Child disk has size %d, parent %d, fixing", 2103 fchild.size, disk.size) 2104 fchild.size = disk.size 2105 2106 # and we recurse on this child only, not on the metadev 2107 return self._EnsureChildSizes(fchild) or mismatch 2108 else: 2109 return False
2110
2111 - def Exec(self, feedback_fn):
2112 """Verify the size of cluster disks. 2113 2114 """ 2115 # TODO: check child disks too 2116 # TODO: check differences in size between primary/secondary nodes 2117 per_node_disks = {} 2118 for instance in self.wanted_instances: 2119 pnode = instance.primary_node 2120 if pnode not in per_node_disks: 2121 per_node_disks[pnode] = [] 2122 for idx, disk in enumerate(instance.disks): 2123 per_node_disks[pnode].append((instance, idx, disk)) 2124 2125 changed = [] 2126 for node, dskl in per_node_disks.items(): 2127 newl = [v[2].Copy() for v in dskl] 2128 for dsk in newl: 2129 self.cfg.SetDiskID(dsk, node) 2130 result = self.rpc.call_blockdev_getsizes(node, newl) 2131 if result.fail_msg: 2132 self.LogWarning("Failure in blockdev_getsizes call to node" 2133 " %s, ignoring", node) 2134 continue 2135 if len(result.data) != len(dskl): 2136 self.LogWarning("Invalid result from node %s, ignoring node results", 2137 node) 2138 continue 2139 for ((instance, idx, disk), size) in zip(dskl, result.data): 2140 if size is None: 2141 self.LogWarning("Disk %d of instance %s did not return size" 2142 " information, ignoring", idx, instance.name) 2143 continue 2144 if not isinstance(size, (int, long)): 2145 self.LogWarning("Disk %d of instance %s did not return valid" 2146 " size information, ignoring", idx, instance.name) 2147 continue 2148 size = size >> 20 2149 if size != disk.size: 2150 self.LogInfo("Disk %d of instance %s has mismatched size," 2151 " correcting: recorded %d, actual %d", idx, 2152 instance.name, disk.size, size) 2153 disk.size = size 2154 self.cfg.Update(instance, feedback_fn) 2155 changed.append((instance.name, idx, size)) 2156 if self._EnsureChildSizes(disk): 2157 self.cfg.Update(instance, feedback_fn) 2158 changed.append((instance.name, idx, disk.size)) 2159 return changed
2160
2161 2162 -class LURenameCluster(LogicalUnit):
2163 """Rename the cluster. 2164 2165 """ 2166 HPATH = "cluster-rename" 2167 HTYPE = constants.HTYPE_CLUSTER 2168 _OP_REQP = ["name"] 2169
2170 - def BuildHooksEnv(self):
2171 """Build hooks env. 2172 2173 """ 2174 env = { 2175 "OP_TARGET": self.cfg.GetClusterName(), 2176 "NEW_NAME": self.op.name, 2177 } 2178 mn = self.cfg.GetMasterNode() 2179 all_nodes = self.cfg.GetNodeList() 2180 return env, [mn], all_nodes
2181
2182 - def CheckPrereq(self):
2183 """Verify that the passed name is a valid one. 2184 2185 """ 2186 hostname = utils.GetHostInfo(self.op.name) 2187 2188 new_name = hostname.name 2189 self.ip = new_ip = hostname.ip 2190 old_name = self.cfg.GetClusterName() 2191 old_ip = self.cfg.GetMasterIP() 2192 if new_name == old_name and new_ip == old_ip: 2193 raise errors.OpPrereqError("Neither the name nor the IP address of the" 2194 " cluster has changed", 2195 errors.ECODE_INVAL) 2196 if new_ip != old_ip: 2197 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): 2198 raise errors.OpPrereqError("The given cluster IP address (%s) is" 2199 " reachable on the network. Aborting." % 2200 new_ip, errors.ECODE_NOTUNIQUE) 2201 2202 self.op.name = new_name
2203
2204 - def Exec(self, feedback_fn):
2205 """Rename the cluster. 2206 2207 """ 2208 clustername = self.op.name 2209 ip = self.ip 2210 2211 # shutdown the master IP 2212 master = self.cfg.GetMasterNode() 2213 result = self.rpc.call_node_stop_master(master, False) 2214 result.Raise("Could not disable the master role") 2215 2216 try: 2217 cluster = self.cfg.GetClusterInfo() 2218 cluster.cluster_name = clustername 2219 cluster.master_ip = ip 2220 self.cfg.Update(cluster, feedback_fn) 2221 2222 # update the known hosts file 2223 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) 2224 node_list = self.cfg.GetNodeList() 2225 try: 2226 node_list.remove(master) 2227 except ValueError: 2228 pass 2229 result = self.rpc.call_upload_file(node_list, 2230 constants.SSH_KNOWN_HOSTS_FILE) 2231 for to_node, to_result in result.iteritems(): 2232 msg = to_result.fail_msg 2233 if msg: 2234 msg = ("Copy of file %s to node %s failed: %s" % 2235 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg)) 2236 self.proc.LogWarning(msg) 2237 2238 finally: 2239 result = self.rpc.call_node_start_master(master, False, False) 2240 msg = result.fail_msg 2241 if msg: 2242 self.LogWarning("Could not re-enable the master role on" 2243 " the master, please restart manually: %s", msg)
2244
2245 2246 -def _RecursiveCheckIfLVMBased(disk):
2247 """Check if the given disk or its children are lvm-based. 2248 2249 @type disk: L{objects.Disk} 2250 @param disk: the disk to check 2251 @rtype: boolean 2252 @return: boolean indicating whether a LD_LV dev_type was found or not 2253 2254 """ 2255 if disk.children: 2256 for chdisk in disk.children: 2257 if _RecursiveCheckIfLVMBased(chdisk): 2258 return True 2259 return disk.dev_type == constants.LD_LV
2260
2261 2262 -class LUSetClusterParams(LogicalUnit):
2263 """Change the parameters of the cluster. 2264 2265 """ 2266 HPATH = "cluster-modify" 2267 HTYPE = constants.HTYPE_CLUSTER 2268 _OP_REQP = [] 2269 REQ_BGL = False 2270
2271 - def CheckArguments(self):
2272 """Check parameters 2273 2274 """ 2275 for attr in ["candidate_pool_size", 2276 "uid_pool", "add_uids", "remove_uids"]: 2277 if not hasattr(self.op, attr): 2278 setattr(self.op, attr, None) 2279 2280 if self.op.candidate_pool_size is not None: 2281 try: 2282 self.op.candidate_pool_size = int(self.op.candidate_pool_size) 2283 except (ValueError, TypeError), err: 2284 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" % 2285 str(err), errors.ECODE_INVAL) 2286 if self.op.candidate_pool_size < 1: 2287 raise errors.OpPrereqError("At least one master candidate needed", 2288 errors.ECODE_INVAL) 2289 2290 _CheckBooleanOpField(self.op, "maintain_node_health") 2291 2292 if self.op.uid_pool: 2293 uidpool.CheckUidPool(self.op.uid_pool) 2294 2295 if self.op.add_uids: 2296 uidpool.CheckUidPool(self.op.add_uids) 2297 2298 if self.op.remove_uids: 2299 uidpool.CheckUidPool(self.op.remove_uids)
2300
2301 - def ExpandNames(self):
2302 # FIXME: in the future maybe other cluster params won't require checking on 2303 # all nodes to be modified. 2304 self.needed_locks = { 2305 locking.LEVEL_NODE: locking.ALL_SET, 2306 } 2307 self.share_locks[locking.LEVEL_NODE] = 1
2308
2309 - def BuildHooksEnv(self):
2310 """Build hooks env. 2311 2312 """ 2313 env = { 2314 "OP_TARGET": self.cfg.GetClusterName(), 2315 "NEW_VG_NAME": self.op.vg_name, 2316 } 2317 mn = self.cfg.GetMasterNode() 2318 return env, [mn], [mn]
2319
2320 - def CheckPrereq(self):
2321 """Check prerequisites. 2322 2323 This checks whether the given params don't conflict and 2324 if the given volume group is valid. 2325 2326 """ 2327 if self.op.vg_name is not None and not self.op.vg_name: 2328 instances = self.cfg.GetAllInstancesInfo().values() 2329 for inst in instances: 2330 for disk in inst.disks: 2331 if _RecursiveCheckIfLVMBased(disk): 2332 raise errors.OpPrereqError("Cannot disable lvm storage while" 2333 " lvm-based instances exist", 2334 errors.ECODE_INVAL) 2335 2336 node_list = self.acquired_locks[locking.LEVEL_NODE] 2337 2338 # if vg_name not None, checks given volume group on all nodes 2339 if self.op.vg_name: 2340 vglist = self.rpc.call_vg_list(node_list) 2341 for node in node_list: 2342 msg = vglist[node].fail_msg 2343 if msg: 2344 # ignoring down node 2345 self.LogWarning("Error while gathering data on node %s" 2346 " (ignoring node): %s", node, msg) 2347 continue 2348 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload, 2349 self.op.vg_name, 2350 constants.MIN_VG_SIZE) 2351 if vgstatus: 2352 raise errors.OpPrereqError("Error on node '%s': %s" % 2353 (node, vgstatus), errors.ECODE_ENVIRON) 2354 2355 self.cluster = cluster = self.cfg.GetClusterInfo() 2356 # validate params changes 2357 if self.op.beparams: 2358 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) 2359 self.new_beparams = objects.FillDict( 2360 cluster.beparams[constants.PP_DEFAULT], self.op.beparams) 2361 2362 if self.op.nicparams: 2363 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) 2364 self.new_nicparams = objects.FillDict( 2365 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams) 2366 objects.NIC.CheckParameterSyntax(self.new_nicparams) 2367 nic_errors = [] 2368 2369 # check all instances for consistency 2370 for instance in self.cfg.GetAllInstancesInfo().values(): 2371 for nic_idx, nic in enumerate(instance.nics): 2372 params_copy = copy.deepcopy(nic.nicparams) 2373 params_filled = objects.FillDict(self.new_nicparams, params_copy) 2374 2375 # check parameter syntax 2376 try: 2377 objects.NIC.CheckParameterSyntax(params_filled) 2378 except errors.ConfigurationError, err: 2379 nic_errors.append("Instance %s, nic/%d: %s" % 2380 (instance.name, nic_idx, err)) 2381 2382 # if we're moving instances to routed, check that they have an ip 2383 target_mode = params_filled[constants.NIC_MODE] 2384 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: 2385 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" % 2386 (instance.name, nic_idx)) 2387 if nic_errors: 2388 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % 2389 "\n".join(nic_errors)) 2390 2391 # hypervisor list/parameters 2392 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {}) 2393 if self.op.hvparams: 2394 if not isinstance(self.op.hvparams, dict): 2395 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input", 2396 errors.ECODE_INVAL) 2397 for hv_name, hv_dict in self.op.hvparams.items(): 2398 if hv_name not in self.new_hvparams: 2399 self.new_hvparams[hv_name] = hv_dict 2400 else: 2401 self.new_hvparams[hv_name].update(hv_dict) 2402 2403 # os hypervisor parameters 2404 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {}) 2405 if self.op.os_hvp: 2406 if not isinstance(self.op.os_hvp, dict): 2407 raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input", 2408 errors.ECODE_INVAL) 2409 for os_name, hvs in self.op.os_hvp.items(): 2410 if not isinstance(hvs, dict): 2411 raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on" 2412 " input"), errors.ECODE_INVAL) 2413 if os_name not in self.new_os_hvp: 2414 self.new_os_hvp[os_name] = hvs 2415 else: 2416 for hv_name, hv_dict in hvs.items(): 2417 if hv_name not in self.new_os_hvp[os_name]: 2418 self.new_os_hvp[os_name][hv_name] = hv_dict 2419 else: 2420 self.new_os_hvp[os_name][hv_name].update(hv_dict) 2421 2422 # changes to the hypervisor list 2423 if self.op.enabled_hypervisors is not None: 2424 self.hv_list = self.op.enabled_hypervisors 2425 if not self.hv_list: 2426 raise errors.OpPrereqError("Enabled hypervisors list must contain at" 2427 " least one member", 2428 errors.ECODE_INVAL) 2429 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES 2430 if invalid_hvs: 2431 raise errors.OpPrereqError("Enabled hypervisors contains invalid" 2432 " entries: %s" % 2433 utils.CommaJoin(invalid_hvs), 2434 errors.ECODE_INVAL) 2435 for hv in self.hv_list: 2436 # if the hypervisor doesn't already exist in the cluster 2437 # hvparams, we initialize it to empty, and then (in both 2438 # cases) we make sure to fill the defaults, as we might not 2439 # have a complete defaults list if the hypervisor wasn't 2440 # enabled before 2441 if hv not in new_hvp: 2442 new_hvp[hv] = {} 2443 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv]) 2444 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES) 2445 else: 2446 self.hv_list = cluster.enabled_hypervisors 2447 2448 if self.op.hvparams or self.op.enabled_hypervisors is not None: 2449 # either the enabled list has changed, or the parameters have, validate 2450 for hv_name, hv_params in self.new_hvparams.items(): 2451 if ((self.op.hvparams and hv_name in self.op.hvparams) or 2452 (self.op.enabled_hypervisors and 2453 hv_name in self.op.enabled_hypervisors)): 2454 # either this is a new hypervisor, or its parameters have changed 2455 hv_class = hypervisor.GetHypervisor(hv_name) 2456 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 2457 hv_class.CheckParameterSyntax(hv_params) 2458 _CheckHVParams(self, node_list, hv_name, hv_params) 2459 2460 if self.op.os_hvp: 2461 # no need to check any newly-enabled hypervisors, since the 2462 # defaults have already been checked in the above code-block 2463 for os_name, os_hvp in self.new_os_hvp.items(): 2464 for hv_name, hv_params in os_hvp.items(): 2465 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 2466 # we need to fill in the new os_hvp on top of the actual hv_p 2467 cluster_defaults = self.new_hvparams.get(hv_name, {}) 2468 new_osp = objects.FillDict(cluster_defaults, hv_params) 2469 hv_class = hypervisor.GetHypervisor(hv_name) 2470 hv_class.CheckParameterSyntax(new_osp) 2471 _CheckHVParams(self, node_list, hv_name, new_osp)
2472 2473
2474 - def Exec(self, feedback_fn):
2475 """Change the parameters of the cluster. 2476 2477 """ 2478 if self.op.vg_name is not None: 2479 new_volume = self.op.vg_name 2480 if not new_volume: 2481 new_volume = None 2482 if new_volume != self.cfg.GetVGName(): 2483 self.cfg.SetVGName(new_volume) 2484 else: 2485 feedback_fn("Cluster LVM configuration already in desired" 2486 " state, not changing") 2487 if self.op.hvparams: 2488 self.cluster.hvparams = self.new_hvparams 2489 if self.op.os_hvp: 2490 self.cluster.os_hvp = self.new_os_hvp 2491 if self.op.enabled_hypervisors is not None: 2492 self.cluster.hvparams = self.new_hvparams 2493 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors 2494 if self.op.beparams: 2495 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams 2496 if self.op.nicparams: 2497 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams 2498 2499 if self.op.candidate_pool_size is not None: 2500 self.cluster.candidate_pool_size = self.op.candidate_pool_size 2501 # we need to update the pool size here, otherwise the save will fail 2502 _AdjustCandidatePool(self, []) 2503 2504 if self.op.maintain_node_health is not None: 2505 self.cluster.maintain_node_health = self.op.maintain_node_health 2506 2507 if self.op.add_uids is not None: 2508 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids) 2509 2510 if self.op.remove_uids is not None: 2511 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids) 2512 2513 if self.op.uid_pool is not None: 2514 self.cluster.uid_pool = self.op.uid_pool 2515 2516 self.cfg.Update(self.cluster, feedback_fn)
2517
2518 2519 -def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2520 """Distribute additional files which are part of the cluster configuration. 2521 2522 ConfigWriter takes care of distributing the config and ssconf files, but 2523 there are more files which should be distributed to all nodes. This function 2524 makes sure those are copied. 2525 2526 @param lu: calling logical unit 2527 @param additional_nodes: list of nodes not in the config to distribute to 2528 2529 """ 2530 # 1. Gather target nodes 2531 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) 2532 dist_nodes = lu.cfg.GetOnlineNodeList() 2533 if additional_nodes is not None: 2534 dist_nodes.extend(additional_nodes) 2535 if myself.name in dist_nodes: 2536 dist_nodes.remove(myself.name) 2537 2538 # 2. Gather files to distribute 2539 dist_files = set([constants.ETC_HOSTS, 2540 constants.SSH_KNOWN_HOSTS_FILE, 2541 constants.RAPI_CERT_FILE, 2542 constants.RAPI_USERS_FILE, 2543 constants.CONFD_HMAC_KEY, 2544 ]) 2545 2546 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors 2547 for hv_name in enabled_hypervisors: 2548 hv_class = hypervisor.GetHypervisor(hv_name) 2549 dist_files.update(hv_class.GetAncillaryFiles()) 2550 2551 # 3. Perform the files upload 2552 for fname in dist_files: 2553 if os.path.exists(fname): 2554 result = lu.rpc.call_upload_file(dist_nodes, fname) 2555 for to_node, to_result in result.items(): 2556 msg = to_result.fail_msg 2557 if msg: 2558 msg = ("Copy of file %s to node %s failed: %s" % 2559 (fname, to_node, msg)) 2560 lu.proc.LogWarning(msg)
2561
2562 2563 -class LURedistributeConfig(NoHooksLU):
2564 """Force the redistribution of cluster configuration. 2565 2566 This is a very simple LU. 2567 2568 """ 2569 _OP_REQP = [] 2570 REQ_BGL = False 2571
2572 - def ExpandNames(self):
2573 self.needed_locks = { 2574 locking.LEVEL_NODE: locking.ALL_SET, 2575 } 2576 self.share_locks[locking.LEVEL_NODE] = 1
2577
2578 - def CheckPrereq(self):
2579 """Check prerequisites. 2580 2581 """
2582
2583 - def Exec(self, feedback_fn):
2584 """Redistribute the configuration. 2585 2586 """ 2587 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn) 2588 _RedistributeAncillaryFiles(self)
2589
2590 2591 -def _WaitForSync(lu, instance, disks=None, oneshot=False):
2592 """Sleep and poll for an instance's disk to sync. 2593 2594 """ 2595 if not instance.disks or disks is not None and not disks: 2596 return True 2597 2598 disks = _ExpandCheckDisks(instance, disks) 2599 2600 if not oneshot: 2601 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name) 2602 2603 node = instance.primary_node 2604 2605 for dev in disks: 2606 lu.cfg.SetDiskID(dev, node) 2607 2608 # TODO: Convert to utils.Retry 2609 2610 retries = 0 2611 degr_retries = 10 # in seconds, as we sleep 1 second each time 2612 while True: 2613 max_time = 0 2614 done = True 2615 cumul_degraded = False 2616 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks) 2617 msg = rstats.fail_msg 2618 if msg: 2619 lu.LogWarning("Can't get any data from node %s: %s", node, msg) 2620 retries += 1 2621 if retries >= 10: 2622 raise errors.RemoteError("Can't contact node %s for mirror data," 2623 " aborting." % node) 2624 time.sleep(6) 2625 continue 2626 rstats = rstats.payload 2627 retries = 0 2628 for i, mstat in enumerate(rstats): 2629 if mstat is None: 2630 lu.LogWarning("Can't compute data for node %s/%s", 2631 node, disks[i].iv_name) 2632 continue 2633 2634 cumul_degraded = (cumul_degraded or 2635 (mstat.is_degraded and mstat.sync_percent is None)) 2636 if mstat.sync_percent is not None: 2637 done = False 2638 if mstat.estimated_time is not None: 2639 rem_time = "%d estimated seconds remaining" % mstat.estimated_time 2640 max_time = mstat.estimated_time 2641 else: 2642 rem_time = "no time estimate" 2643 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % 2644 (disks[i].iv_name, mstat.sync_percent, rem_time)) 2645 2646 # if we're done but degraded, let's do a few small retries, to 2647 # make sure we see a stable and not transient situation; therefore 2648 # we force restart of the loop 2649 if (done or oneshot) and cumul_degraded and degr_retries > 0: 2650 logging.info("Degraded disks found, %d retries left", degr_retries) 2651 degr_retries -= 1 2652 time.sleep(1) 2653 continue 2654 2655 if done or oneshot: 2656 break 2657 2658 time.sleep(min(60, max_time)) 2659 2660 if done: 2661 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name) 2662 return not cumul_degraded
2663
2664 2665 -def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2666 """Check that mirrors are not degraded. 2667 2668 The ldisk parameter, if True, will change the test from the 2669 is_degraded attribute (which represents overall non-ok status for 2670 the device(s)) to the ldisk (representing the local storage status). 2671 2672 """ 2673 lu.cfg.SetDiskID(dev, node) 2674 2675 result = True 2676 2677 if on_primary or dev.AssembleOnSecondary(): 2678 rstats = lu.rpc.call_blockdev_find(node, dev) 2679 msg = rstats.fail_msg 2680 if msg: 2681 lu.LogWarning("Can't find disk on node %s: %s", node, msg) 2682 result = False 2683 elif not rstats.payload: 2684 lu.LogWarning("Can't find disk on node %s", node) 2685 result = False 2686 else: 2687 if ldisk: 2688 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY 2689 else: 2690 result = result and not rstats.payload.is_degraded 2691 2692 if dev.children: 2693 for child in dev.children: 2694 result = result and _CheckDiskConsistency(lu, child, node, on_primary) 2695 2696 return result
2697
2698 2699 -class LUDiagnoseOS(NoHooksLU):
2700 """Logical unit for OS diagnose/query. 2701 2702 """ 2703 _OP_REQP = ["output_fields", "names"] 2704 REQ_BGL = False 2705 _FIELDS_STATIC = utils.FieldSet() 2706 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants") 2707 # Fields that need calculation of global os validity 2708 _FIELDS_NEEDVALID = frozenset(["valid", "variants"]) 2709
2710 - def ExpandNames(self):
2711 if self.op.names: 2712 raise errors.OpPrereqError("Selective OS query not supported", 2713 errors.ECODE_INVAL) 2714 2715 _CheckOutputFields(static=self._FIELDS_STATIC, 2716 dynamic=self._FIELDS_DYNAMIC, 2717 selected=self.op.output_fields) 2718 2719 # Lock all nodes, in shared mode 2720 # Temporary removal of locks, should be reverted later 2721 # TODO: reintroduce locks when they are lighter-weight 2722 self.needed_locks = {}
2723 #self.share_locks[locking.LEVEL_NODE] = 1 2724 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 2725
2726 - def CheckPrereq(self):
2727 """Check prerequisites. 2728 2729 """
2730 2731 @staticmethod
2732 - def _DiagnoseByOS(rlist):
2733 """Remaps a per-node return list into an a per-os per-node dictionary 2734 2735 @param rlist: a map with node names as keys and OS objects as values 2736 2737 @rtype: dict 2738 @return: a dictionary with osnames as keys and as value another map, with 2739 nodes as keys and tuples of (path, status, diagnose) as values, eg:: 2740 2741 {"debian-etch": {"node1": [(/usr/lib/..., True, ""), 2742 (/srv/..., False, "invalid api")], 2743 "node2": [(/srv/..., True, "")]} 2744 } 2745 2746 """ 2747 all_os = {} 2748 # we build here the list of nodes that didn't fail the RPC (at RPC 2749 # level), so that nodes with a non-responding node daemon don't 2750 # make all OSes invalid 2751 good_nodes = [node_name for node_name in rlist 2752 if not rlist[node_name].fail_msg] 2753 for node_name, nr in rlist.items(): 2754 if nr.fail_msg or not nr.payload: 2755 continue 2756 for name, path, status, diagnose, variants in nr.payload: 2757 if name not in all_os: 2758 # build a list of nodes for this os containing empty lists 2759 # for each node in node_list 2760 all_os[name] = {} 2761 for nname in good_nodes: 2762 all_os[name][nname] = [] 2763 all_os[name][node_name].append((path, status, diagnose, variants)) 2764 return all_os
2765
2766 - def Exec(self, feedback_fn):
2767 """Compute the list of OSes. 2768 2769 """ 2770 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()] 2771 node_data = self.rpc.call_os_diagnose(valid_nodes) 2772 pol = self._DiagnoseByOS(node_data) 2773 output = [] 2774 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields) 2775 calc_variants = "variants" in self.op.output_fields 2776 2777 for os_name, os_data in pol.items(): 2778 row = [] 2779 if calc_valid: 2780 valid = True 2781 variants = None 2782 for osl in os_data.values(): 2783 valid = bool(valid and osl and osl[0][1]) 2784 if not valid: 2785 variants = set() 2786 break 2787 if calc_variants: 2788 node_variants = osl[0][3] 2789 if variants is None: 2790 variants = set(node_variants) 2791 else: 2792 variants.intersection_update(node_variants) 2793 2794 for field in self.op.output_fields: 2795 if field == "name": 2796 val = os_name 2797 elif field == "valid": 2798 val = valid 2799 elif field == "node_status": 2800 # this is just a copy of the dict 2801 val = {} 2802 for node_name, nos_list in os_data.items(): 2803 val[node_name] = nos_list 2804 elif field == "variants": 2805 val = list(variants) 2806 else: 2807 raise errors.ParameterError(field) 2808 row.append(val) 2809 output.append(row) 2810 2811 return output
2812
2813 2814 -class LURemoveNode(LogicalUnit):
2815 """Logical unit for removing a node. 2816 2817 """ 2818 HPATH = "node-remove" 2819 HTYPE = constants.HTYPE_NODE 2820 _OP_REQP = ["node_name"] 2821
2822 - def BuildHooksEnv(self):
2823 """Build hooks env. 2824 2825 This doesn't run on the target node in the pre phase as a failed 2826 node would then be impossible to remove. 2827 2828 """ 2829 env = { 2830 "OP_TARGET": self.op.node_name, 2831 "NODE_NAME": self.op.node_name, 2832 } 2833 all_nodes = self.cfg.GetNodeList() 2834 try: 2835 all_nodes.remove(self.op.node_name) 2836 except ValueError: 2837 logging.warning("Node %s which is about to be removed not found" 2838 " in the all nodes list", self.op.node_name) 2839 return env, all_nodes, all_nodes
2840
2841 - def CheckPrereq(self):
2842 """Check prerequisites. 2843 2844 This checks: 2845 - the node exists in the configuration 2846 - it does not have primary or secondary instances 2847 - it's not the master 2848 2849 Any errors are signaled by raising errors.OpPrereqError. 2850 2851 """ 2852 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) 2853 node = self.cfg.GetNodeInfo(self.op.node_name) 2854 assert node is not None 2855 2856 instance_list = self.cfg.GetInstanceList() 2857 2858 masternode = self.cfg.GetMasterNode() 2859 if node.name == masternode: 2860 raise errors.OpPrereqError("Node is the master node," 2861 " you need to failover first.", 2862 errors.ECODE_INVAL) 2863 2864 for instance_name in instance_list: 2865 instance = self.cfg.GetInstanceInfo(instance_name) 2866 if node.name in instance.all_nodes: 2867 raise errors.OpPrereqError("Instance %s is still running on the node," 2868 " please remove first." % instance_name, 2869 errors.ECODE_INVAL) 2870 self.op.node_name = node.name 2871 self.node = node
2872
2873 - def Exec(self, feedback_fn):
2874 """Removes the node from the cluster. 2875 2876 """ 2877 node = self.node 2878 logging.info("Stopping the node daemon and removing configs from node %s", 2879 node.name) 2880 2881 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 2882 2883 # Promote nodes to master candidate as needed 2884 _AdjustCandidatePool(self, exceptions=[node.name]) 2885 self.context.RemoveNode(node.name) 2886 2887 # Run post hooks on the node before it's removed 2888 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) 2889 try: 2890 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name]) 2891 except: 2892 # pylint: disable-msg=W0702 2893 self.LogWarning("Errors occurred running hooks on %s" % node.name) 2894 2895 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup) 2896 msg = result.fail_msg 2897 if msg: 2898 self.LogWarning("Errors encountered on the remote node while leaving" 2899 " the cluster: %s", msg) 2900 2901 # Remove node from our /etc/hosts 2902 if self.cfg.GetClusterInfo().modify_etc_hosts: 2903 # FIXME: this should be done via an rpc call to node daemon 2904 utils.RemoveHostFromEtcHosts(node.name) 2905 _RedistributeAncillaryFiles(self)
2906
2907 2908 -class LUQueryNodes(NoHooksLU):
2909 """Logical unit for querying nodes. 2910 2911 """ 2912 # pylint: disable-msg=W0142 2913 _OP_REQP = ["output_fields", "names", "use_locking"] 2914 REQ_BGL = False 2915 2916 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid", 2917 "master_candidate", "offline", "drained"] 2918 2919 _FIELDS_DYNAMIC = utils.FieldSet( 2920 "dtotal", "dfree", 2921 "mtotal", "mnode", "mfree", 2922 "bootid", 2923 "ctotal", "cnodes", "csockets", 2924 ) 2925 2926 _FIELDS_STATIC = utils.FieldSet(*[ 2927 "pinst_cnt", "sinst_cnt", 2928 "pinst_list", "sinst_list", 2929 "pip", "sip", "tags", 2930 "master", 2931 "role"] + _SIMPLE_FIELDS 2932 ) 2933
2934 - def ExpandNames(self):
2935 _CheckOutputFields(static=self._FIELDS_STATIC, 2936 dynamic=self._FIELDS_DYNAMIC, 2937 selected=self.op.output_fields) 2938 2939 self.needed_locks = {} 2940 self.share_locks[locking.LEVEL_NODE] = 1 2941 2942 if self.op.names: 2943 self.wanted = _GetWantedNodes(self, self.op.names) 2944 else: 2945 self.wanted = locking.ALL_SET 2946 2947 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields) 2948 self.do_locking = self.do_node_query and self.op.use_locking 2949 if self.do_locking: 2950 # if we don't request only static fields, we need to lock the nodes 2951 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2952
2953 - def CheckPrereq(self):
2954 """Check prerequisites. 2955 2956 """ 2957 # The validation of the node list is done in the _GetWantedNodes, 2958 # if non empty, and if empty, there's no validation to do 2959 pass
2960
2961 - def Exec(self, feedback_fn):
2962 """Computes the list of nodes and their attributes. 2963 2964 """ 2965 all_info = self.cfg.GetAllNodesInfo() 2966 if self.do_locking: 2967 nodenames = self.acquired_locks[locking.LEVEL_NODE] 2968 elif self.wanted != locking.ALL_SET: 2969 nodenames = self.wanted 2970 missing = set(nodenames).difference(all_info.keys()) 2971 if missing: 2972 raise errors.OpExecError( 2973 "Some nodes were removed before retrieving their data: %s" % missing) 2974 else: 2975 nodenames = all_info.keys() 2976 2977 nodenames = utils.NiceSort(nodenames) 2978 nodelist = [all_info[name] for name in nodenames] 2979 2980 # begin data gathering 2981 2982 if self.do_node_query: 2983 live_data = {} 2984 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), 2985 self.cfg.GetHypervisorType()) 2986 for name in nodenames: 2987 nodeinfo = node_data[name] 2988 if not nodeinfo.fail_msg and nodeinfo.payload: 2989 nodeinfo = nodeinfo.payload 2990 fn = utils.TryConvert 2991 live_data[name] = { 2992 "mtotal": fn(int, nodeinfo.get('memory_total', None)), 2993 "mnode": fn(int, nodeinfo.get('memory_dom0', None)), 2994 "mfree": fn(int, nodeinfo.get('memory_free', None)), 2995 "dtotal": fn(int, nodeinfo.get('vg_size', None)), 2996 "dfree": fn(int, nodeinfo.get('vg_free', None)), 2997 "ctotal": fn(int, nodeinfo.get('cpu_total', None)), 2998 "bootid": nodeinfo.get('bootid', None), 2999 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)), 3000 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)), 3001 } 3002 else: 3003 live_data[name] = {} 3004 else: 3005 live_data = dict.fromkeys(nodenames, {}) 3006 3007 node_to_primary = dict([(name, set()) for name in nodenames]) 3008 node_to_secondary = dict([(name, set()) for name in nodenames]) 3009 3010 inst_fields = frozenset(("pinst_cnt", "pinst_list", 3011 "sinst_cnt", "sinst_list")) 3012 if inst_fields & frozenset(self.op.output_fields): 3013 inst_data = self.cfg.GetAllInstancesInfo() 3014 3015 for inst in inst_data.values(): 3016 if inst.primary_node in node_to_primary: 3017 node_to_primary[inst.primary_node].add(inst.name) 3018 for secnode in inst.secondary_nodes: 3019 if secnode in node_to_secondary: 3020 node_to_secondary[secnode].add(inst.name) 3021 3022 master_node = self.cfg.GetMasterNode() 3023 3024 # end data gathering 3025 3026 output = [] 3027 for node in nodelist: 3028 node_output = [] 3029 for field in self.op.output_fields: 3030 if field in self._SIMPLE_FIELDS: 3031 val = getattr(node, field) 3032 elif field == "pinst_list": 3033 val = list(node_to_primary[node.name]) 3034 elif field == "sinst_list": 3035 val = list(node_to_secondary[node.name]) 3036 elif field == "pinst_cnt": 3037 val = len(node_to_primary[node.name]) 3038 elif field == "sinst_cnt": 3039 val = len(node_to_secondary[node.name]) 3040 elif field == "pip": 3041 val = node.primary_ip 3042 elif field == "sip": 3043 val = node.secondary_ip 3044 elif field == "tags": 3045 val = list(node.GetTags()) 3046 elif field == "master": 3047 val = node.name == master_node 3048 elif self._FIELDS_DYNAMIC.Matches(field): 3049 val = live_data[node.name].get(field, None) 3050 elif field == "role": 3051 if node.name == master_node: 3052 val = "M" 3053 elif node.master_candidate: 3054 val = "C" 3055 elif node.drained: 3056 val = "D" 3057 elif node.offline: 3058 val = "O" 3059 else: 3060 val = "R" 3061 else: 3062 raise errors.ParameterError(field) 3063 node_output.append(val) 3064 output.append(node_output) 3065 3066 return output
3067
3068 3069 -class LUQueryNodeVolumes(NoHooksLU):
3070 """Logical unit for getting volumes on node(s). 3071 3072 """ 3073 _OP_REQP = ["nodes", "output_fields"] 3074 REQ_BGL = False 3075 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance") 3076 _FIELDS_STATIC = utils.FieldSet("node") 3077
3078 - def ExpandNames(self):
3079 _CheckOutputFields(static=self._FIELDS_STATIC, 3080 dynamic=self._FIELDS_DYNAMIC, 3081 selected=self.op.output_fields) 3082 3083 self.needed_locks = {} 3084 self.share_locks[locking.LEVEL_NODE] = 1 3085 if not self.op.nodes: 3086 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 3087 else: 3088 self.needed_locks[locking.LEVEL_NODE] = \ 3089 _GetWantedNodes(self, self.op.nodes)
3090