Package ganeti :: Module cmdlib
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Module implementing the master-side code.""" 
  23   
  24  # pylint: disable-msg=W0201 
  25   
  26  # W0201 since most LU attributes are defined in CheckPrereq or similar 
  27  # functions 
  28   
  29  import os 
  30  import os.path 
  31  import time 
  32  import re 
  33  import platform 
  34  import logging 
  35  import copy 
  36   
  37  from ganeti import ssh 
  38  from ganeti import utils 
  39  from ganeti import errors 
  40  from ganeti import hypervisor 
  41  from ganeti import locking 
  42  from ganeti import constants 
  43  from ganeti import objects 
  44  from ganeti import serializer 
  45  from ganeti import ssconf 
46 47 48 -class LogicalUnit(object):
49 """Logical Unit base class. 50 51 Subclasses must follow these rules: 52 - implement ExpandNames 53 - implement CheckPrereq 54 - implement Exec 55 - implement BuildHooksEnv 56 - redefine HPATH and HTYPE 57 - optionally redefine their run requirements: 58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively 59 60 Note that all commands require root permissions. 61 62 """ 63 HPATH = None 64 HTYPE = None 65 _OP_REQP = [] 66 REQ_BGL = True 67
68 - def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit. 70 71 This needs to be overridden in derived classes in order to check op 72 validity. 73 74 """ 75 self.proc = processor 76 self.op = op 77 self.cfg = context.cfg 78 self.context = context 79 self.rpc = rpc 80 # Dicts used to declare locking needs to mcpu 81 self.needed_locks = None 82 self.acquired_locks = {} 83 self.share_locks = dict(((i, 0) for i in locking.LEVELS)) 84 self.add_locks = {} 85 self.remove_locks = {} 86 # Used to force good behavior when calling helper functions 87 self.recalculate_locks = {} 88 self.__ssh = None 89 # logging 90 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 91 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 92 93 for attr_name in self._OP_REQP: 94 attr_val = getattr(op, attr_name, None) 95 if attr_val is None: 96 raise errors.OpPrereqError("Required parameter '%s' missing" % 97 attr_name) 98 self.CheckArguments()
99
100 - def __GetSSH(self):
101 """Returns the SshRunner object 102 103 """ 104 if not self.__ssh: 105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName()) 106 return self.__ssh
107 108 ssh = property(fget=__GetSSH) 109
110 - def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments. 112 113 This method is for doing a simple syntactic check and ensure 114 validity of opcode parameters, without any cluster-related 115 checks. While the same can be accomplished in ExpandNames and/or 116 CheckPrereq, doing these separate is better because: 117 118 - ExpandNames is left as as purely a lock-related function 119 - CheckPrereq is run after we have acquired locks (and possible 120 waited for them) 121 122 The function is allowed to change the self.op attribute so that 123 later methods can no longer worry about missing parameters. 124 125 """ 126 pass
127
128 - def ExpandNames(self):
129 """Expand names for this LU. 130 131 This method is called before starting to execute the opcode, and it should 132 update all the parameters of the opcode to their canonical form (e.g. a 133 short node name must be fully expanded after this method has successfully 134 completed). This way locking, hooks, logging, ecc. can work correctly. 135 136 LUs which implement this method must also populate the self.needed_locks 137 member, as a dict with lock levels as keys, and a list of needed lock names 138 as values. Rules: 139 140 - use an empty dict if you don't need any lock 141 - if you don't need any lock at a particular level omit that level 142 - don't put anything for the BGL level 143 - if you want all locks at a level use locking.ALL_SET as a value 144 145 If you need to share locks (rather than acquire them exclusively) at one 146 level you can modify self.share_locks, setting a true value (usually 1) for 147 that level. By default locks are not shared. 148 149 Examples:: 150 151 # Acquire all nodes and one instance 152 self.needed_locks = { 153 locking.LEVEL_NODE: locking.ALL_SET, 154 locking.LEVEL_INSTANCE: ['instance1.example.tld'], 155 } 156 # Acquire just two nodes 157 self.needed_locks = { 158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'], 159 } 160 # Acquire no locks 161 self.needed_locks = {} # No, you can't leave it to the default value None 162 163 """ 164 # The implementation of this method is mandatory only if the new LU is 165 # concurrent, so that old LUs don't need to be changed all at the same 166 # time. 167 if self.REQ_BGL: 168 self.needed_locks = {} # Exclusive LUs don't need locks. 169 else: 170 raise NotImplementedError
171
172 - def DeclareLocks(self, level):
173 """Declare LU locking needs for a level 174 175 While most LUs can just declare their locking needs at ExpandNames time, 176 sometimes there's the need to calculate some locks after having acquired 177 the ones before. This function is called just before acquiring locks at a 178 particular level, but after acquiring the ones at lower levels, and permits 179 such calculations. It can be used to modify self.needed_locks, and by 180 default it does nothing. 181 182 This function is only called if you have something already set in 183 self.needed_locks for the level. 184 185 @param level: Locking level which is going to be locked 186 @type level: member of ganeti.locking.LEVELS 187 188 """
189
190 - def CheckPrereq(self):
191 """Check prerequisites for this LU. 192 193 This method should check that the prerequisites for the execution 194 of this LU are fulfilled. It can do internode communication, but 195 it should be idempotent - no cluster or system changes are 196 allowed. 197 198 The method should raise errors.OpPrereqError in case something is 199 not fulfilled. Its return value is ignored. 200 201 This method should also update all the parameters of the opcode to 202 their canonical form if it hasn't been done by ExpandNames before. 203 204 """ 205 raise NotImplementedError
206
207 - def Exec(self, feedback_fn):
208 """Execute the LU. 209 210 This method should implement the actual work. It should raise 211 errors.OpExecError for failures that are somewhat dealt with in 212 code, or expected. 213 214 """ 215 raise NotImplementedError
216
217 - def BuildHooksEnv(self):
218 """Build hooks environment for this LU. 219 220 This method should return a three-node tuple consisting of: a dict 221 containing the environment that will be used for running the 222 specific hook for this LU, a list of node names on which the hook 223 should run before the execution, and a list of node names on which 224 the hook should run after the execution. 225 226 The keys of the dict must not have 'GANETI_' prefixed as this will 227 be handled in the hooks runner. Also note additional keys will be 228 added by the hooks runner. If the LU doesn't define any 229 environment, an empty dict (and not None) should be returned. 230 231 No nodes should be returned as an empty list (and not None). 232 233 Note that if the HPATH for a LU class is None, this function will 234 not be called. 235 236 """ 237 raise NotImplementedError
238
239 - def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks. 241 242 This method is called every time a hooks phase is executed, and notifies 243 the Logical Unit about the hooks' result. The LU can then use it to alter 244 its result based on the hooks. By default the method does nothing and the 245 previous result is passed back unchanged but any LU can define it if it 246 wants to use the local cluster hook-scripts somehow. 247 248 @param phase: one of L{constants.HOOKS_PHASE_POST} or 249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 250 @param hook_results: the results of the multi-node hooks rpc call 251 @param feedback_fn: function used send feedback back to the caller 252 @param lu_result: the previous Exec result this LU had, or None 253 in the PRE phase 254 @return: the new Exec result, based on the previous result 255 and hook results 256 257 """ 258 return lu_result
259
260 - def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance. 262 263 Many LUs that work on an instance take its name in self.op.instance_name 264 and need to expand it and then declare the expanded name for locking. This 265 function does it, and then updates self.op.instance_name to the expanded 266 name. It also initializes needed_locks as a dict, if this hasn't been done 267 before. 268 269 """ 270 if self.needed_locks is None: 271 self.needed_locks = {} 272 else: 273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \ 274 "_ExpandAndLockInstance called with instance-level locks set" 275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name) 276 if expanded_name is None: 277 raise errors.OpPrereqError("Instance '%s' not known" % 278 self.op.instance_name) 279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name 280 self.op.instance_name = expanded_name
281
282 - def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking. 284 285 This function should be called after locking one or more instances to lock 286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE] 287 with all primary or secondary nodes for instances already locked and 288 present in self.needed_locks[locking.LEVEL_INSTANCE]. 289 290 It should be called from DeclareLocks, and for safety only works if 291 self.recalculate_locks[locking.LEVEL_NODE] is set. 292 293 In the future it may grow parameters to just lock some instance's nodes, or 294 to just lock primaries or secondary nodes, if needed. 295 296 If should be called in DeclareLocks in a way similar to:: 297 298 if level == locking.LEVEL_NODE: 299 self._LockInstancesNodes() 300 301 @type primary_only: boolean 302 @param primary_only: only lock primary nodes of locked instances 303 304 """ 305 assert locking.LEVEL_NODE in self.recalculate_locks, \ 306 "_LockInstancesNodes helper function called with no nodes to recalculate" 307 308 # TODO: check if we're really been called with the instance locks held 309 310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the 311 # future we might want to have different behaviors depending on the value 312 # of self.recalculate_locks[locking.LEVEL_NODE] 313 wanted_nodes = [] 314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: 315 instance = self.context.cfg.GetInstanceInfo(instance_name) 316 wanted_nodes.append(instance.primary_node) 317 if not primary_only: 318 wanted_nodes.extend(instance.secondary_nodes) 319 320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE: 321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes 322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND: 323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes) 324 325 del self.recalculate_locks[locking.LEVEL_NODE]
326
327 328 -class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
329 """Simple LU which runs no hooks. 330 331 This LU is intended as a parent for other LogicalUnits which will 332 run no hooks, in order to reduce duplicate code. 333 334 """ 335 HPATH = None 336 HTYPE = None 337
338 - def BuildHooksEnv(self):
339 """Empty BuildHooksEnv for NoHooksLu. 340 341 This just raises an error. 342 343 """ 344 assert False, "BuildHooksEnv called for NoHooksLUs"
345
346 347 -def _GetWantedNodes(lu, nodes):
348 """Returns list of checked and expanded node names. 349 350 @type lu: L{LogicalUnit} 351 @param lu: the logical unit on whose behalf we execute 352 @type nodes: list 353 @param nodes: list of node names or None for all nodes 354 @rtype: list 355 @return: the list of nodes, sorted 356 @raise errors.OpProgrammerError: if the nodes parameter is wrong type 357 358 """ 359 if not isinstance(nodes, list): 360 raise errors.OpPrereqError("Invalid argument type 'nodes'") 361 362 if not nodes: 363 raise errors.ProgrammerError("_GetWantedNodes should only be called with a" 364 " non-empty list of nodes whose name is to be expanded.") 365 366 wanted = [] 367 for name in nodes: 368 node = lu.cfg.ExpandNodeName(name) 369 if node is None: 370 raise errors.OpPrereqError("No such node name '%s'" % name) 371 wanted.append(node) 372 373 return utils.NiceSort(wanted)
374
375 376 -def _GetWantedInstances(lu, instances):
377 """Returns list of checked and expanded instance names. 378 379 @type lu: L{LogicalUnit} 380 @param lu: the logical unit on whose behalf we execute 381 @type instances: list 382 @param instances: list of instance names or None for all instances 383 @rtype: list 384 @return: the list of instances, sorted 385 @raise errors.OpPrereqError: if the instances parameter is wrong type 386 @raise errors.OpPrereqError: if any of the passed instances is not found 387 388 """ 389 if not isinstance(instances, list): 390 raise errors.OpPrereqError("Invalid argument type 'instances'") 391 392 if instances: 393 wanted = [] 394 395 for name in instances: 396 instance = lu.cfg.ExpandInstanceName(name) 397 if instance is None: 398 raise errors.OpPrereqError("No such instance name '%s'" % name) 399 wanted.append(instance) 400 401 else: 402 wanted = utils.NiceSort(lu.cfg.GetInstanceList()) 403 return wanted
404
405 406 -def _CheckOutputFields(static, dynamic, selected):
407 """Checks whether all selected fields are valid. 408 409 @type static: L{utils.FieldSet} 410 @param static: static fields set 411 @type dynamic: L{utils.FieldSet} 412 @param dynamic: dynamic fields set 413 414 """ 415 f = utils.FieldSet() 416 f.Extend(static) 417 f.Extend(dynamic) 418 419 delta = f.NonMatching(selected) 420 if delta: 421 raise errors.OpPrereqError("Unknown output fields selected: %s" 422 % ",".join(delta))
423
424 425 -def _CheckBooleanOpField(op, name):
426 """Validates boolean opcode parameters. 427 428 This will ensure that an opcode parameter is either a boolean value, 429 or None (but that it always exists). 430 431 """ 432 val = getattr(op, name, None) 433 if not (val is None or isinstance(val, bool)): 434 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" % 435 (name, str(val))) 436 setattr(op, name, val)
437
438 439 -def _CheckNodeOnline(lu, node):
440 """Ensure that a given node is online. 441 442 @param lu: the LU on behalf of which we make the check 443 @param node: the node to check 444 @raise errors.OpPrereqError: if the node is offline 445 446 """ 447 if lu.cfg.GetNodeInfo(node).offline: 448 raise errors.OpPrereqError("Can't use offline node %s" % node)
449
450 451 -def _CheckNodeNotDrained(lu, node):
452 """Ensure that a given node is not drained. 453 454 @param lu: the LU on behalf of which we make the check 455 @param node: the node to check 456 @raise errors.OpPrereqError: if the node is drained 457 458 """ 459 if lu.cfg.GetNodeInfo(node).drained: 460 raise errors.OpPrereqError("Can't use drained node %s" % node)
461
462 463 -def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, 464 memory, vcpus, nics, disk_template, disks, 465 bep, hvp, hypervisor_name):
466 """Builds instance related env variables for hooks 467 468 This builds the hook environment from individual variables. 469 470 @type name: string 471 @param name: the name of the instance 472 @type primary_node: string 473 @param primary_node: the name of the instance's primary node 474 @type secondary_nodes: list 475 @param secondary_nodes: list of secondary nodes as strings 476 @type os_type: string 477 @param os_type: the name of the instance's OS 478 @type status: boolean 479 @param status: the should_run status of the instance 480 @type memory: string 481 @param memory: the memory size of the instance 482 @type vcpus: string 483 @param vcpus: the count of VCPUs the instance has 484 @type nics: list 485 @param nics: list of tuples (ip, bridge, mac) representing 486 the NICs the instance has 487 @type disk_template: string 488 @param disk_template: the disk template of the instance 489 @type disks: list 490 @param disks: the list of (size, mode) pairs 491 @type bep: dict 492 @param bep: the backend parameters for the instance 493 @type hvp: dict 494 @param hvp: the hypervisor parameters for the instance 495 @type hypervisor_name: string 496 @param hypervisor_name: the hypervisor for the instance 497 @rtype: dict 498 @return: the hook environment for this instance 499 500 """ 501 if status: 502 str_status = "up" 503 else: 504 str_status = "down" 505 env = { 506 "OP_TARGET": name, 507 "INSTANCE_NAME": name, 508 "INSTANCE_PRIMARY": primary_node, 509 "INSTANCE_SECONDARIES": " ".join(secondary_nodes), 510 "INSTANCE_OS_TYPE": os_type, 511 "INSTANCE_STATUS": str_status, 512 "INSTANCE_MEMORY": memory, 513 "INSTANCE_VCPUS": vcpus, 514 "INSTANCE_DISK_TEMPLATE": disk_template, 515 "INSTANCE_HYPERVISOR": hypervisor_name, 516 } 517 518 if nics: 519 nic_count = len(nics) 520 for idx, (ip, bridge, mac) in enumerate(nics): 521 if ip is None: 522 ip = "" 523 env["INSTANCE_NIC%d_IP" % idx] = ip 524 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge 525 env["INSTANCE_NIC%d_MAC" % idx] = mac 526 else: 527 nic_count = 0 528 529 env["INSTANCE_NIC_COUNT"] = nic_count 530 531 if disks: 532 disk_count = len(disks) 533 for idx, (size, mode) in enumerate(disks): 534 env["INSTANCE_DISK%d_SIZE" % idx] = size 535 env["INSTANCE_DISK%d_MODE" % idx] = mode 536 else: 537 disk_count = 0 538 539 env["INSTANCE_DISK_COUNT"] = disk_count 540 541 for source, kind in [(bep, "BE"), (hvp, "HV")]: 542 for key, value in source.items(): 543 env["INSTANCE_%s_%s" % (kind, key)] = value 544 545 return env
546
547 548 -def _BuildInstanceHookEnvByObject(lu, instance, override=None):
549 """Builds instance related env variables for hooks from an object. 550 551 @type lu: L{LogicalUnit} 552 @param lu: the logical unit on whose behalf we execute 553 @type instance: L{objects.Instance} 554 @param instance: the instance for which we should build the 555 environment 556 @type override: dict 557 @param override: dictionary with key/values that will override 558 our values 559 @rtype: dict 560 @return: the hook environment dictionary 561 562 """ 563 cluster = lu.cfg.GetClusterInfo() 564 bep = cluster.FillBE(instance) 565 hvp = cluster.FillHV(instance) 566 args = { 567 'name': instance.name, 568 'primary_node': instance.primary_node, 569 'secondary_nodes': instance.secondary_nodes, 570 'os_type': instance.os, 571 'status': instance.admin_up, 572 'memory': bep[constants.BE_MEMORY], 573 'vcpus': bep[constants.BE_VCPUS], 574 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics], 575 'disk_template': instance.disk_template, 576 'disks': [(disk.size, disk.mode) for disk in instance.disks], 577 'bep': bep, 578 'hvp': hvp, 579 'hypervisor_name': instance.hypervisor, 580 } 581 if override: 582 args.update(override) 583 return _BuildInstanceHookEnv(**args)
584
585 586 -def _AdjustCandidatePool(lu):
587 """Adjust the candidate pool after node operations. 588 589 """ 590 mod_list = lu.cfg.MaintainCandidatePool() 591 if mod_list: 592 lu.LogInfo("Promoted nodes to master candidate role: %s", 593 ", ".join(node.name for node in mod_list)) 594 for name in mod_list: 595 lu.context.ReaddNode(name) 596 mc_now, mc_max = lu.cfg.GetMasterCandidateStats() 597 if mc_now > mc_max: 598 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" % 599 (mc_now, mc_max))
600
601 602 -def _CheckInstanceBridgesExist(lu, instance):
603 """Check that the bridges needed by an instance exist. 604 605 """ 606 # check bridges existence 607 brlist = [nic.bridge for nic in instance.nics] 608 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist) 609 result.Raise() 610 if not result.data: 611 raise errors.OpPrereqError("One or more target bridges %s does not" 612 " exist on destination node '%s'" % 613 (brlist, instance.primary_node))
614
615 616 -class LUDestroyCluster(NoHooksLU):
617 """Logical unit for destroying the cluster. 618 619 """ 620 _OP_REQP = [] 621
622 - def CheckPrereq(self):
623 """Check prerequisites. 624 625 This checks whether the cluster is empty. 626 627 Any errors are signaled by raising errors.OpPrereqError. 628 629 """ 630 master = self.cfg.GetMasterNode() 631 632 nodelist = self.cfg.GetNodeList() 633 if len(nodelist) != 1 or nodelist[0] != master: 634 raise errors.OpPrereqError("There are still %d node(s) in" 635 " this cluster." % (len(nodelist) - 1)) 636 instancelist = self.cfg.GetInstanceList() 637 if instancelist: 638 raise errors.OpPrereqError("There are still %d instance(s) in" 639 " this cluster." % len(instancelist))
640
641 - def Exec(self, feedback_fn):
642 """Destroys the cluster. 643 644 """ 645 master = self.cfg.GetMasterNode() 646 result = self.rpc.call_node_stop_master(master, False) 647 result.Raise() 648 if not result.data: 649 raise errors.OpExecError("Could not disable the master role") 650 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) 651 utils.CreateBackup(priv_key) 652 utils.CreateBackup(pub_key) 653 return master
654
655 656 -class LUVerifyCluster(LogicalUnit):
657 """Verifies the cluster status. 658 659 """ 660 HPATH = "cluster-verify" 661 HTYPE = constants.HTYPE_CLUSTER 662 _OP_REQP = ["skip_checks"] 663 REQ_BGL = False 664
665 - def ExpandNames(self):
666 self.needed_locks = { 667 locking.LEVEL_NODE: locking.ALL_SET, 668 locking.LEVEL_INSTANCE: locking.ALL_SET, 669 } 670 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
671
672 - def _VerifyNode(self, nodeinfo, file_list, local_cksum, 673 node_result, feedback_fn, master_files, 674 drbd_map, vg_name):
675 """Run multiple tests against a node. 676 677 Test list: 678 679 - compares ganeti version 680 - checks vg existence and size > 20G 681 - checks config file checksum 682 - checks ssh to other nodes 683 684 @type nodeinfo: L{objects.Node} 685 @param nodeinfo: the node to check 686 @param file_list: required list of files 687 @param local_cksum: dictionary of local files and their checksums 688 @param node_result: the results from the node 689 @param feedback_fn: function used to accumulate results 690 @param master_files: list of files that only masters should have 691 @param drbd_map: the useddrbd minors for this node, in 692 form of minor: (instance, must_exist) which correspond to instances 693 and their running status 694 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName()) 695 696 """ 697 node = nodeinfo.name 698 699 # main result, node_result should be a non-empty dict 700 if not node_result or not isinstance(node_result, dict): 701 feedback_fn(" - ERROR: unable to verify node %s." % (node,)) 702 return True 703 704 # compares ganeti version 705 local_version = constants.PROTOCOL_VERSION 706 remote_version = node_result.get('version', None) 707 if not (remote_version and isinstance(remote_version, (list, tuple)) and 708 len(remote_version) == 2): 709 feedback_fn(" - ERROR: connection to %s failed" % (node)) 710 return True 711 712 if local_version != remote_version[0]: 713 feedback_fn(" - ERROR: incompatible protocol versions: master %s," 714 " node %s %s" % (local_version, node, remote_version[0])) 715 return True 716 717 # node seems compatible, we can actually try to look into its results 718 719 bad = False 720 721 # full package version 722 if constants.RELEASE_VERSION != remote_version[1]: 723 feedback_fn(" - WARNING: software version mismatch: master %s," 724 " node %s %s" % 725 (constants.RELEASE_VERSION, node, remote_version[1])) 726 727 # checks vg existence and size > 20G 728 if vg_name is not None: 729 vglist = node_result.get(constants.NV_VGLIST, None) 730 if not vglist: 731 feedback_fn(" - ERROR: unable to check volume groups on node %s." % 732 (node,)) 733 bad = True 734 else: 735 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, 736 constants.MIN_VG_SIZE) 737 if vgstatus: 738 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node)) 739 bad = True 740 741 # checks config file checksum 742 743 remote_cksum = node_result.get(constants.NV_FILELIST, None) 744 if not isinstance(remote_cksum, dict): 745 bad = True 746 feedback_fn(" - ERROR: node hasn't returned file checksum data") 747 else: 748 for file_name in file_list: 749 node_is_mc = nodeinfo.master_candidate 750 must_have_file = file_name not in master_files 751 if file_name not in remote_cksum: 752 if node_is_mc or must_have_file: 753 bad = True 754 feedback_fn(" - ERROR: file '%s' missing" % file_name) 755 elif remote_cksum[file_name] != local_cksum[file_name]: 756 if node_is_mc or must_have_file: 757 bad = True 758 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name) 759 else: 760 # not candidate and this is not a must-have file 761 bad = True 762 feedback_fn(" - ERROR: file '%s' should not exist on non master" 763 " candidates (and the file is outdated)" % file_name) 764 else: 765 # all good, except non-master/non-must have combination 766 if not node_is_mc and not must_have_file: 767 feedback_fn(" - ERROR: file '%s' should not exist on non master" 768 " candidates" % file_name) 769 770 # checks ssh to any 771 772 if constants.NV_NODELIST not in node_result: 773 bad = True 774 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data") 775 else: 776 if node_result[constants.NV_NODELIST]: 777 bad = True 778 for node in node_result[constants.NV_NODELIST]: 779 feedback_fn(" - ERROR: ssh communication with node '%s': %s" % 780 (node, node_result[constants.NV_NODELIST][node])) 781 782 if constants.NV_NODENETTEST not in node_result: 783 bad = True 784 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data") 785 else: 786 if node_result[constants.NV_NODENETTEST]: 787 bad = True 788 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys()) 789 for node in nlist: 790 feedback_fn(" - ERROR: tcp communication with node '%s': %s" % 791 (node, node_result[constants.NV_NODENETTEST][node])) 792 793 hyp_result = node_result.get(constants.NV_HYPERVISOR, None) 794 if isinstance(hyp_result, dict): 795 for hv_name, hv_result in hyp_result.iteritems(): 796 if hv_result is not None: 797 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" % 798 (hv_name, hv_result)) 799 800 # check used drbd list 801 if vg_name is not None: 802 used_minors = node_result.get(constants.NV_DRBDLIST, []) 803 if not isinstance(used_minors, (tuple, list)): 804 feedback_fn(" - ERROR: cannot parse drbd status file: %s" % 805 str(used_minors)) 806 else: 807 for minor, (iname, must_exist) in drbd_map.items(): 808 if minor not in used_minors and must_exist: 809 feedback_fn(" - ERROR: drbd minor %d of instance %s is" 810 " not active" % (minor, iname)) 811 bad = True 812 for minor in used_minors: 813 if minor not in drbd_map: 814 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % 815 minor) 816 bad = True 817 818 return bad
819
820 - def _VerifyInstance(self, instance, instanceconfig, node_vol_is, 821 node_instance, feedback_fn, n_offline):
822 """Verify an instance. 823 824 This function checks to see if the required block devices are 825 available on the instance's node. 826 827 """ 828 bad = False 829 830 node_current = instanceconfig.primary_node 831 832 node_vol_should = {} 833 instanceconfig.MapLVsByNode(node_vol_should) 834 835 for node in node_vol_should: 836 if node in n_offline: 837 # ignore missing volumes on offline nodes 838 continue 839 for volume in node_vol_should[node]: 840 if node not in node_vol_is or volume not in node_vol_is[node]: 841 feedback_fn(" - ERROR: volume %s missing on node %s" % 842 (volume, node)) 843 bad = True 844 845 if instanceconfig.admin_up: 846 if ((node_current not in node_instance or 847 not instance in node_instance[node_current]) and 848 node_current not in n_offline): 849 feedback_fn(" - ERROR: instance %s not running on node %s" % 850 (instance, node_current)) 851 bad = True 852 853 for node in node_instance: 854 if (not node == node_current): 855 if instance in node_instance[node]: 856 feedback_fn(" - ERROR: instance %s should not run on node %s" % 857 (instance, node)) 858 bad = True 859 860 return bad
861
862 - def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
863 """Verify if there are any unknown volumes in the cluster. 864 865 The .os, .swap and backup volumes are ignored. All other volumes are 866 reported as unknown. 867 868 """ 869 bad = False 870 871 for node in node_vol_is: 872 for volume in node_vol_is[node]: 873 if node not in node_vol_should or volume not in node_vol_should[node]: 874 feedback_fn(" - ERROR: volume %s on node %s should not exist" % 875 (volume, node)) 876 bad = True 877 return bad
878
879 - def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
880 """Verify the list of running instances. 881 882 This checks what instances are running but unknown to the cluster. 883 884 """ 885 bad = False 886 for node in node_instance: 887 for runninginstance in node_instance[node]: 888 if runninginstance not in instancelist: 889 feedback_fn(" - ERROR: instance %s on node %s should not exist" % 890 (runninginstance, node)) 891 bad = True 892 return bad
893
894 - def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
895 """Verify N+1 Memory Resilience. 896 897 Check that if one single node dies we can still start all the instances it 898 was primary for. 899 900 """ 901 bad = False 902 903 for node, nodeinfo in node_info.iteritems(): 904 # This code checks that every node which is now listed as secondary has 905 # enough memory to host all instances it is supposed to should a single 906 # other node in the cluster fail. 907 # FIXME: not ready for failover to an arbitrary node 908 # FIXME: does not support file-backed instances 909 # WARNING: we currently take into account down instances as well as up 910 # ones, considering that even if they're down someone might want to start 911 # them even in the event of a node failure. 912 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems(): 913 needed_mem = 0 914 for instance in instances: 915 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance]) 916 if bep[constants.BE_AUTO_BALANCE]: 917 needed_mem += bep[constants.BE_MEMORY] 918 if nodeinfo['mfree'] < needed_mem: 919 feedback_fn(" - ERROR: not enough memory on node %s to accommodate" 920 " failovers should node %s fail" % (node, prinode)) 921 bad = True 922 return bad
923
924 - def CheckPrereq(self):
925 """Check prerequisites. 926 927 Transform the list of checks we're going to skip into a set and check that 928 all its members are valid. 929 930 """ 931 self.skip_set = frozenset(self.op.skip_checks) 932 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set): 933 raise errors.OpPrereqError("Invalid checks to be skipped specified")
934
935 - def BuildHooksEnv(self):
936 """Build hooks env. 937 938 Cluster-Verify hooks just ran in the post phase and their failure makes 939 the output be logged in the verify output and the verification to fail. 940 941 """ 942 all_nodes = self.cfg.GetNodeList() 943 env = { 944 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) 945 } 946 for node in self.cfg.GetAllNodesInfo().values(): 947 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags()) 948 949 return env, [], all_nodes
950
951 - def Exec(self, feedback_fn):
952 """Verify integrity of cluster, performing various test on nodes. 953 954 """ 955 bad = False 956 feedback_fn("* Verifying global settings") 957 for msg in self.cfg.VerifyConfig(): 958 feedback_fn(" - ERROR: %s" % msg) 959 960 vg_name = self.cfg.GetVGName() 961 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors 962 nodelist = utils.NiceSort(self.cfg.GetNodeList()) 963 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] 964 instancelist = utils.NiceSort(self.cfg.GetInstanceList()) 965 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname)) 966 for iname in instancelist) 967 i_non_redundant = [] # Non redundant instances 968 i_non_a_balanced = [] # Non auto-balanced instances 969 n_offline = [] # List of offline nodes 970 n_drained = [] # List of nodes being drained 971 node_volume = {} 972 node_instance = {} 973 node_info = {} 974 instance_cfg = {} 975 976 # FIXME: verify OS list 977 # do local checksums 978 master_files = [constants.CLUSTER_CONF_FILE] 979 980 file_names = ssconf.SimpleStore().GetFileList() 981 file_names.append(constants.SSL_CERT_FILE) 982 file_names.append(constants.RAPI_CERT_FILE) 983 file_names.extend(master_files) 984 985 local_checksums = utils.FingerprintFiles(file_names) 986 987 feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) 988 node_verify_param = { 989 constants.NV_FILELIST: file_names, 990 constants.NV_NODELIST: [node.name for node in nodeinfo 991 if not node.offline], 992 constants.NV_HYPERVISOR: hypervisors, 993 constants.NV_NODENETTEST: [(node.name, node.primary_ip, 994 node.secondary_ip) for node in nodeinfo 995 if not node.offline], 996 constants.NV_INSTANCELIST: hypervisors, 997 constants.NV_VERSION: None, 998 constants.NV_HVINFO: self.cfg.GetHypervisorType(), 999 } 1000 if vg_name is not None: 1001 node_verify_param[constants.NV_VGLIST] = None 1002 node_verify_param[constants.NV_LVLIST] = vg_name 1003 node_verify_param[constants.NV_DRBDLIST] = None 1004 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, 1005 self.cfg.GetClusterName()) 1006 1007 cluster = self.cfg.GetClusterInfo() 1008 master_node = self.cfg.GetMasterNode() 1009 all_drbd_map = self.cfg.ComputeDRBDMap() 1010 1011 for node_i in nodeinfo: 1012 node = node_i.name 1013 nresult = all_nvinfo[node].data 1014 1015 if node_i.offline: 1016 feedback_fn("* Skipping offline node %s" % (node,)) 1017 n_offline.append(node) 1018 continue 1019 1020 if node == master_node: 1021 ntype = "master" 1022 elif node_i.master_candidate: 1023 ntype = "master candidate" 1024 elif node_i.drained: 1025 ntype = "drained" 1026 n_drained.append(node) 1027 else: 1028 ntype = "regular" 1029 feedback_fn("* Verifying node %s (%s)" % (node, ntype)) 1030 1031 if all_nvinfo[node].failed or not isinstance(nresult, dict): 1032 feedback_fn(" - ERROR: connection to %s failed" % (node,)) 1033 bad = True 1034 continue 1035 1036 node_drbd = {} 1037 for minor, instance in all_drbd_map[node].items(): 1038 if instance not in instanceinfo: 1039 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" % 1040 instance) 1041 # ghost instance should not be running, but otherwise we 1042 # don't give double warnings (both ghost instance and 1043 # unallocated minor in use) 1044 node_drbd[minor] = (instance, False) 1045 else: 1046 instance = instanceinfo[instance] 1047 node_drbd[minor] = (instance.name, instance.admin_up) 1048 result = self._VerifyNode(node_i, file_names, local_checksums, 1049 nresult, feedback_fn, master_files, 1050 node_drbd, vg_name) 1051 bad = bad or result 1052 1053 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") 1054 if vg_name is None: 1055 node_volume[node] = {} 1056 elif isinstance(lvdata, basestring): 1057 feedback_fn(" - ERROR: LVM problem on node %s: %s" % 1058 (node, utils.SafeEncode(lvdata))) 1059 bad = True 1060 node_volume[node] = {} 1061 elif not isinstance(lvdata, dict): 1062 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,)) 1063 bad = True 1064 continue 1065 else: 1066 node_volume[node] = lvdata 1067 1068 # node_instance 1069 idata = nresult.get(constants.NV_INSTANCELIST, None) 1070 if not isinstance(idata, list): 1071 feedback_fn(" - ERROR: connection to %s failed (instancelist)" % 1072 (node,)) 1073 bad = True 1074 continue 1075 1076 node_instance[node] = idata 1077 1078 # node_info 1079 nodeinfo = nresult.get(constants.NV_HVINFO, None) 1080 if not isinstance(nodeinfo, dict): 1081 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,)) 1082 bad = True 1083 continue 1084 1085 try: 1086 node_info[node] = { 1087 "mfree": int(nodeinfo['memory_free']), 1088 "pinst": [], 1089 "sinst": [], 1090 # dictionary holding all instances this node is secondary for, 1091 # grouped by their primary node. Each key is a cluster node, and each 1092 # value is a list of instances which have the key as primary and the 1093 # current node as secondary. this is handy to calculate N+1 memory 1094 # availability if you can only failover from a primary to its 1095 # secondary. 1096 "sinst-by-pnode": {}, 1097 } 1098 # FIXME: devise a free space model for file based instances as well 1099 if vg_name is not None: 1100 if (constants.NV_VGLIST not in nresult or 1101 vg_name not in nresult[constants.NV_VGLIST]): 1102 feedback_fn(" - ERROR: node %s didn't return data for the" 1103 " volume group '%s' - it is either missing or broken" % 1104 (node, vg_name)) 1105 bad = True 1106 continue 1107 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name]) 1108 except (ValueError, KeyError): 1109 feedback_fn(" - ERROR: invalid nodeinfo value returned" 1110 " from node %s" % (node,)) 1111 bad = True 1112 continue 1113 1114 node_vol_should = {} 1115 1116 for instance in instancelist: 1117 feedback_fn("* Verifying instance %s" % instance) 1118 inst_config = instanceinfo[instance] 1119 result = self._VerifyInstance(instance, inst_config, node_volume, 1120 node_instance, feedback_fn, n_offline) 1121 bad = bad or result 1122 inst_nodes_offline = [] 1123 1124 inst_config.MapLVsByNode(node_vol_should) 1125 1126 instance_cfg[instance] = inst_config 1127 1128 pnode = inst_config.primary_node 1129 if pnode in node_info: 1130 node_info[pnode]['pinst'].append(instance) 1131 elif pnode not in n_offline: 1132 feedback_fn(" - ERROR: instance %s, connection to primary node" 1133 " %s failed" % (instance, pnode)) 1134 bad = True 1135 1136 if pnode in n_offline: 1137 inst_nodes_offline.append(pnode) 1138 1139 # If the instance is non-redundant we cannot survive losing its primary 1140 # node, so we are not N+1 compliant. On the other hand we have no disk 1141 # templates with more than one secondary so that situation is not well 1142 # supported either. 1143 # FIXME: does not support file-backed instances 1144 if len(inst_config.secondary_nodes) == 0: 1145 i_non_redundant.append(instance) 1146 elif len(inst_config.secondary_nodes) > 1: 1147 feedback_fn(" - WARNING: multiple secondaries for instance %s" 1148 % instance) 1149 1150 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]: 1151 i_non_a_balanced.append(instance) 1152 1153 for snode in inst_config.secondary_nodes: 1154 if snode in node_info: 1155 node_info[snode]['sinst'].append(instance) 1156 if pnode not in node_info[snode]['sinst-by-pnode']: 1157 node_info[snode]['sinst-by-pnode'][pnode] = [] 1158 node_info[snode]['sinst-by-pnode'][pnode].append(instance) 1159 elif snode not in n_offline: 1160 feedback_fn(" - ERROR: instance %s, connection to secondary node" 1161 " %s failed" % (instance, snode)) 1162 bad = True 1163 if snode in n_offline: 1164 inst_nodes_offline.append(snode) 1165 1166 if inst_nodes_offline: 1167 # warn that the instance lives on offline nodes, and set bad=True 1168 feedback_fn(" - ERROR: instance lives on offline node(s) %s" % 1169 ", ".join(inst_nodes_offline)) 1170 bad = True 1171 1172 feedback_fn("* Verifying orphan volumes") 1173 result = self._VerifyOrphanVolumes(node_vol_should, node_volume, 1174 feedback_fn) 1175 bad = bad or result 1176 1177 feedback_fn("* Verifying remaining instances") 1178 result = self._VerifyOrphanInstances(instancelist, node_instance, 1179 feedback_fn) 1180 bad = bad or result 1181 1182 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: 1183 feedback_fn("* Verifying N+1 Memory redundancy") 1184 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn) 1185 bad = bad or result 1186 1187 feedback_fn("* Other Notes") 1188 if i_non_redundant: 1189 feedback_fn(" - NOTICE: %d non-redundant instance(s) found." 1190 % len(i_non_redundant)) 1191 1192 if i_non_a_balanced: 1193 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found." 1194 % len(i_non_a_balanced)) 1195 1196 if n_offline: 1197 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline)) 1198 1199 if n_drained: 1200 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained)) 1201 1202 return not bad
1203
1204 - def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1205 """Analyze the post-hooks' result 1206 1207 This method analyses the hook result, handles it, and sends some 1208 nicely-formatted feedback back to the user. 1209 1210 @param phase: one of L{constants.HOOKS_PHASE_POST} or 1211 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 1212 @param hooks_results: the results of the multi-node hooks rpc call 1213 @param feedback_fn: function used send feedback back to the caller 1214 @param lu_result: previous Exec result 1215 @return: the new Exec result, based on the previous result 1216 and hook results 1217 1218 """ 1219 # We only really run POST phase hooks, and are only interested in 1220 # their results 1221 if phase == constants.HOOKS_PHASE_POST: 1222 # Used to change hooks' output to proper indentation 1223 indent_re = re.compile('^', re.M) 1224 feedback_fn("* Hooks Results") 1225 if not hooks_results: 1226 feedback_fn(" - ERROR: general communication failure") 1227 lu_result = 1 1228 else: 1229 for node_name in hooks_results: 1230 show_node_header = True 1231 res = hooks_results[node_name] 1232 if res.failed or res.data is False or not isinstance(res.data, list): 1233 if res.offline: 1234 # no need to warn or set fail return value 1235 continue 1236 feedback_fn(" Communication failure in hooks execution") 1237 lu_result = 1 1238 continue 1239 for script, hkr, output in res.data: 1240 if hkr == constants.HKR_FAIL: 1241 # The node header is only shown once, if there are 1242 # failing hooks on that node 1243 if show_node_header: 1244 feedback_fn(" Node %s:" % node_name) 1245 show_node_header = False 1246 feedback_fn(" ERROR: Script %s failed, output:" % script) 1247 output = indent_re.sub(' ', output) 1248 feedback_fn("%s" % output) 1249 lu_result = 1 1250 1251 return lu_result
1252
1253 1254 -class LUVerifyDisks(NoHooksLU):
1255 """Verifies the cluster disks status. 1256 1257 """ 1258 _OP_REQP = [] 1259 REQ_BGL = False 1260
1261 - def ExpandNames(self):
1262 self.needed_locks = { 1263 locking.LEVEL_NODE: locking.ALL_SET, 1264 locking.LEVEL_INSTANCE: locking.ALL_SET, 1265 } 1266 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1267
1268 - def CheckPrereq(self):
1269 """Check prerequisites. 1270 1271 This has no prerequisites. 1272 1273 """ 1274 pass
1275
1276 - def Exec(self, feedback_fn):
1277 """Verify integrity of cluster disks. 1278 1279 """ 1280 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {} 1281 1282 vg_name = self.cfg.GetVGName() 1283 nodes = utils.NiceSort(self.cfg.GetNodeList()) 1284 instances = [self.cfg.GetInstanceInfo(name) 1285 for name in self.cfg.GetInstanceList()] 1286 1287 nv_dict = {} 1288 for inst in instances: 1289 inst_lvs = {} 1290 if (not inst.admin_up or 1291 inst.disk_template not in constants.DTS_NET_MIRROR): 1292 continue 1293 inst.MapLVsByNode(inst_lvs) 1294 # transform { iname: {node: [vol,],},} to {(node, vol): iname} 1295 for node, vol_list in inst_lvs.iteritems(): 1296 for vol in vol_list: 1297 nv_dict[(node, vol)] = inst 1298 1299 if not nv_dict: 1300 return result 1301 1302 node_lvs = self.rpc.call_volume_list(nodes, vg_name) 1303 1304 for node in nodes: 1305 # node_volume 1306 lvs = node_lvs[node] 1307 if lvs.failed: 1308 if not lvs.offline: 1309 self.LogWarning("Connection to node %s failed: %s" % 1310 (node, lvs.data)) 1311 continue 1312 lvs = lvs.data 1313 if isinstance(lvs, basestring): 1314 logging.warning("Error enumerating LVs on node %s: %s", node, lvs) 1315 res_nlvm[node] = lvs 1316 continue 1317 elif not isinstance(lvs, dict): 1318 logging.warning("Connection to node %s failed or invalid data" 1319 " returned", node) 1320 res_nodes.append(node) 1321 continue 1322 1323 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems(): 1324 inst = nv_dict.pop((node, lv_name), None) 1325 if (not lv_online and inst is not None 1326 and inst.name not in res_instances): 1327 res_instances.append(inst.name) 1328 1329 # any leftover items in nv_dict are missing LVs, let's arrange the 1330 # data better 1331 for key, inst in nv_dict.iteritems(): 1332 if inst.name not in res_missing: 1333 res_missing[inst.name] = [] 1334 res_missing[inst.name].append(key) 1335 1336 return result
1337
1338 1339 -class LURepairDiskSizes(NoHooksLU):
1340 """Verifies the cluster disks sizes. 1341 1342 """ 1343 _OP_REQP = ["instances"] 1344 REQ_BGL = False 1345
1346 - def ExpandNames(self):
1347 1348 if not isinstance(self.op.instances, list): 1349 raise errors.OpPrereqError("Invalid argument type 'instances'") 1350 1351 if self.op.instances: 1352 self.wanted_names = [] 1353 for name in self.op.instances: 1354 full_name = self.cfg.ExpandInstanceName(name) 1355 if full_name is None: 1356 raise errors.OpPrereqError("Instance '%s' not known" % name) 1357 self.wanted_names.append(full_name) 1358 self.needed_locks = { 1359 locking.LEVEL_NODE: [], 1360 locking.LEVEL_INSTANCE: self.wanted_names, 1361 } 1362 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1363 else: 1364 self.wanted_names = None 1365 self.needed_locks = { 1366 locking.LEVEL_NODE: locking.ALL_SET, 1367 locking.LEVEL_INSTANCE: locking.ALL_SET, 1368 } 1369 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1370
1371 - def DeclareLocks(self, level):
1372 if level == locking.LEVEL_NODE and self.wanted_names is not None: 1373 self._LockInstancesNodes(primary_only=True)
1374
1375 - def CheckPrereq(self):
1376 """Check prerequisites. 1377 1378 This only checks the optional instance list against the existing names. 1379 1380 """ 1381 if self.wanted_names is None: 1382 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] 1383 1384 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name 1385 in self.wanted_names]
1386
1387 - def _EnsureChildSizes(self, disk):
1388 """Ensure children of the disk have the needed disk size. 1389 1390 This is valid mainly for DRBD8 and fixes an issue where the 1391 children have smaller disk size. 1392 1393 @param disk: an L{ganeti.objects.Disk} object 1394 1395 """ 1396 if disk.dev_type == constants.LD_DRBD8: 1397 assert disk.children, "Empty children for DRBD8?" 1398 fchild = disk.children[0] 1399 mismatch = fchild.size < disk.size 1400 if mismatch: 1401 self.LogInfo("Child disk has size %d, parent %d, fixing", 1402 fchild.size, disk.size) 1403 fchild.size = disk.size 1404 1405 # and we recurse on this child only, not on the metadev 1406 return self._EnsureChildSizes(fchild) or mismatch 1407 else: 1408 return False
1409
1410 - def Exec(self, feedback_fn):
1411 """Verify the size of cluster disks. 1412 1413 """ 1414 # TODO: check child disks too 1415 # TODO: check differences in size between primary/secondary nodes 1416 per_node_disks = {} 1417 for instance in self.wanted_instances: 1418 pnode = instance.primary_node 1419 if pnode not in per_node_disks: 1420 per_node_disks[pnode] = [] 1421 for idx, disk in enumerate(instance.disks): 1422 per_node_disks[pnode].append((instance, idx, disk)) 1423 1424 changed = [] 1425 for node, dskl in per_node_disks.items(): 1426 newl = [v[2].Copy() for v in dskl] 1427 for dsk in newl: 1428 self.cfg.SetDiskID(dsk, node) 1429 result = self.rpc.call_blockdev_getsizes(node, newl) 1430 if result.failed: 1431 self.LogWarning("Failure in blockdev_getsizes call to node" 1432 " %s, ignoring", node) 1433 continue 1434 if len(result.data) != len(dskl): 1435 self.LogWarning("Invalid result from node %s, ignoring node results", 1436 node) 1437 continue 1438 for ((instance, idx, disk), size) in zip(dskl, result.data): 1439 if size is None: 1440 self.LogWarning("Disk %d of instance %s did not return size" 1441 " information, ignoring", idx, instance.name) 1442 continue 1443 if not isinstance(size, (int, long)): 1444 self.LogWarning("Disk %d of instance %s did not return valid" 1445 " size information, ignoring", idx, instance.name) 1446 continue 1447 size = size >> 20 1448 if size != disk.size: 1449 self.LogInfo("Disk %d of instance %s has mismatched size," 1450 " correcting: recorded %d, actual %d", idx, 1451 instance.name, disk.size, size) 1452 disk.size = size 1453 self.cfg.Update(instance) 1454 changed.append((instance.name, idx, size)) 1455 if self._EnsureChildSizes(disk): 1456 self.cfg.Update(instance) 1457 changed.append((instance.name, idx, disk.size)) 1458 return changed
1459
1460 1461 -class LURenameCluster(LogicalUnit):
1462 """Rename the cluster. 1463 1464 """ 1465 HPATH = "cluster-rename" 1466 HTYPE = constants.HTYPE_CLUSTER 1467 _OP_REQP = ["name"] 1468
1469 - def BuildHooksEnv(self):
1470 """Build hooks env. 1471 1472 """ 1473 env = { 1474 "OP_TARGET": self.cfg.GetClusterName(), 1475 "NEW_NAME": self.op.name, 1476 } 1477 mn = self.cfg.GetMasterNode() 1478 return env, [mn], [mn]
1479
1480 - def CheckPrereq(self):
1481 """Verify that the passed name is a valid one. 1482 1483 """ 1484 hostname = utils.HostInfo(self.op.name) 1485 1486 new_name = hostname.name 1487 self.ip = new_ip = hostname.ip 1488 old_name = self.cfg.GetClusterName() 1489 old_ip = self.cfg.GetMasterIP() 1490 if new_name == old_name and new_ip == old_ip: 1491 raise errors.OpPrereqError("Neither the name nor the IP address of the" 1492 " cluster has changed") 1493 if new_ip != old_ip: 1494 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): 1495 raise errors.OpPrereqError("The given cluster IP address (%s) is" 1496 " reachable on the network. Aborting." % 1497 new_ip) 1498 1499 self.op.name = new_name
1500
1501 - def Exec(self, feedback_fn):
1502 """Rename the cluster. 1503 1504 """ 1505 clustername = self.op.name 1506 ip = self.ip 1507 1508 # shutdown the master IP 1509 master = self.cfg.GetMasterNode() 1510 result = self.rpc.call_node_stop_master(master, False) 1511 if result.failed or not result.data: 1512 raise errors.OpExecError("Could not disable the master role") 1513 1514 try: 1515 cluster = self.cfg.GetClusterInfo() 1516 cluster.cluster_name = clustername 1517 cluster.master_ip = ip 1518 self.cfg.Update(cluster) 1519 1520 # update the known hosts file 1521 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) 1522 node_list = self.cfg.GetNodeList() 1523 try: 1524 node_list.remove(master) 1525 except ValueError: 1526 pass 1527 result = self.rpc.call_upload_file(node_list, 1528 constants.SSH_KNOWN_HOSTS_FILE) 1529 for to_node, to_result in result.iteritems(): 1530 if to_result.failed or not to_result.data: 1531 logging.error("Copy of file %s to node %s failed", 1532 constants.SSH_KNOWN_HOSTS_FILE, to_node) 1533 1534 finally: 1535 result = self.rpc.call_node_start_master(master, False, False) 1536 if result.failed or not result.data: 1537 self.LogWarning("Could not re-enable the master role on" 1538 " the master, please restart manually.")
1539
1540 1541 -def _RecursiveCheckIfLVMBased(disk):
1542 """Check if the given disk or its children are lvm-based. 1543 1544 @type disk: L{objects.Disk} 1545 @param disk: the disk to check 1546 @rtype: boolean 1547 @return: boolean indicating whether a LD_LV dev_type was found or not 1548 1549 """ 1550 if disk.children: 1551 for chdisk in disk.children: 1552 if _RecursiveCheckIfLVMBased(chdisk): 1553 return True 1554 return disk.dev_type == constants.LD_LV
1555
1556 1557 -class LUSetClusterParams(LogicalUnit):
1558 """Change the parameters of the cluster. 1559 1560 """ 1561 HPATH = "cluster-modify" 1562 HTYPE = constants.HTYPE_CLUSTER 1563 _OP_REQP = [] 1564 REQ_BGL = False 1565
1566 - def CheckArguments(self):
1567 """Check parameters 1568 1569 """ 1570 if not hasattr(self.op, "candidate_pool_size"): 1571 self.op.candidate_pool_size = None 1572 if self.op.candidate_pool_size is not None: 1573 try: 1574 self.op.candidate_pool_size = int(self.op.candidate_pool_size) 1575 except (ValueError, TypeError), err: 1576 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" % 1577 str(err)) 1578 if self.op.candidate_pool_size < 1: 1579 raise errors.OpPrereqError("At least one master candidate needed")
1580
1581 - def ExpandNames(self):
1582 # FIXME: in the future maybe other cluster params won't require checking on 1583 # all nodes to be modified. 1584 self.needed_locks = { 1585 locking.LEVEL_NODE: locking.ALL_SET, 1586 } 1587 self.share_locks[locking.LEVEL_NODE] = 1
1588
1589 - def BuildHooksEnv(self):
1590 """Build hooks env. 1591 1592 """ 1593 env = { 1594 "OP_TARGET": self.cfg.GetClusterName(), 1595 "NEW_VG_NAME": self.op.vg_name, 1596 } 1597 mn = self.cfg.GetMasterNode() 1598 return env, [mn], [mn]
1599
1600 - def CheckPrereq(self):
1601 """Check prerequisites. 1602 1603 This checks whether the given params don't conflict and 1604 if the given volume group is valid. 1605 1606 """ 1607 if self.op.vg_name is not None and not self.op.vg_name: 1608 instances = self.cfg.GetAllInstancesInfo().values() 1609 for inst in instances: 1610 for disk in inst.disks: 1611 if _RecursiveCheckIfLVMBased(disk): 1612 raise errors.OpPrereqError("Cannot disable lvm storage while" 1613 " lvm-based instances exist") 1614 1615 node_list = self.acquired_locks[locking.LEVEL_NODE] 1616 1617 # if vg_name not None, checks given volume group on all nodes 1618 if self.op.vg_name: 1619 vglist = self.rpc.call_vg_list(node_list) 1620 for node in node_list: 1621 if vglist[node].failed: 1622 # ignoring down node 1623 self.LogWarning("Node %s unreachable/error, ignoring" % node) 1624 continue 1625 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data, 1626 self.op.vg_name, 1627 constants.MIN_VG_SIZE) 1628 if vgstatus: 1629 raise errors.OpPrereqError("Error on node '%s': %s" % 1630 (node, vgstatus)) 1631 1632 self.cluster = cluster = self.cfg.GetClusterInfo() 1633 # validate beparams changes 1634 if self.op.beparams: 1635 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) 1636 self.new_beparams = cluster.FillDict( 1637 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams) 1638 1639 # hypervisor list/parameters 1640 self.new_hvparams = cluster.FillDict(cluster.hvparams, {}) 1641 if self.op.hvparams: 1642 if not isinstance(self.op.hvparams, dict): 1643 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input") 1644 for hv_name, hv_dict in self.op.hvparams.items(): 1645 if hv_name not in self.new_hvparams: 1646 self.new_hvparams[hv_name] = hv_dict 1647 else: 1648 self.new_hvparams[hv_name].update(hv_dict) 1649 1650 if self.op.enabled_hypervisors is not None: 1651 self.hv_list = self.op.enabled_hypervisors 1652 if not self.hv_list: 1653 raise errors.OpPrereqError("Enabled hypervisors list must contain at" 1654 " least one member") 1655 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES 1656 if invalid_hvs: 1657 raise errors.OpPrereqError("Enabled hypervisors contains invalid" 1658 " entries: %s" % invalid_hvs) 1659 else: 1660 self.hv_list = cluster.enabled_hypervisors 1661 1662 if self.op.hvparams or self.op.enabled_hypervisors is not None: 1663 # either the enabled list has changed, or the parameters have, validate 1664 for hv_name, hv_params in self.new_hvparams.items(): 1665 if ((self.op.hvparams and hv_name in self.op.hvparams) or 1666 (self.op.enabled_hypervisors and 1667 hv_name in self.op.enabled_hypervisors)): 1668 # either this is a new hypervisor, or its parameters have changed 1669 hv_class = hypervisor.GetHypervisor(hv_name) 1670 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 1671 hv_class.CheckParameterSyntax(hv_params) 1672 _CheckHVParams(self, node_list, hv_name, hv_params)
1673
1674 - def Exec(self, feedback_fn):
1675 """Change the parameters of the cluster. 1676 1677 """ 1678 if self.op.vg_name is not None: 1679 new_volume = self.op.vg_name 1680 if not new_volume: 1681 new_volume = None 1682 if new_volume != self.cfg.GetVGName(): 1683 self.cfg.SetVGName(new_volume) 1684 else: 1685 feedback_fn("Cluster LVM configuration already in desired" 1686 " state, not changing") 1687 if self.op.hvparams: 1688 self.cluster.hvparams = self.new_hvparams 1689 if self.op.enabled_hypervisors is not None: 1690 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors 1691 if self.op.beparams: 1692 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams 1693 if self.op.candidate_pool_size is not None: 1694 self.cluster.candidate_pool_size = self.op.candidate_pool_size 1695 # we need to update the pool size here, otherwise the save will fail 1696 _AdjustCandidatePool(self) 1697 1698 self.cfg.Update(self.cluster)
1699
1700 1701 -class LURedistributeConfig(NoHooksLU):
1702 """Force the redistribution of cluster configuration. 1703 1704 This is a very simple LU. 1705 1706 """ 1707 _OP_REQP = [] 1708 REQ_BGL = False 1709
1710 - def ExpandNames(self):
1711 self.needed_locks = { 1712 locking.LEVEL_NODE: locking.ALL_SET, 1713 } 1714 self.share_locks[locking.LEVEL_NODE] = 1
1715
1716 - def CheckPrereq(self):
1717 """Check prerequisites. 1718 1719 """
1720
1721 - def Exec(self, feedback_fn):
1722 """Redistribute the configuration. 1723 1724 """ 1725 self.cfg.Update(self.cfg.GetClusterInfo())
1726
1727 1728 -def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1729 """Sleep and poll for an instance's disk to sync. 1730 1731 """ 1732 if not instance.disks: 1733 return True 1734 1735 if not oneshot: 1736 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name) 1737 1738 node = instance.primary_node 1739 1740 for dev in instance.disks: 1741 lu.cfg.SetDiskID(dev, node) 1742 1743 retries = 0 1744 degr_retries = 10 # in seconds, as we sleep 1 second each time 1745 while True: 1746 max_time = 0 1747 done = True 1748 cumul_degraded = False 1749 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks) 1750 if rstats.failed or not rstats.data: 1751 lu.LogWarning("Can't get any data from node %s", node) 1752 retries += 1 1753 if retries >= 10: 1754 raise errors.RemoteError("Can't contact node %s for mirror data," 1755 " aborting." % node) 1756 time.sleep(6) 1757 continue 1758 rstats = rstats.data 1759 retries = 0 1760 for i, mstat in enumerate(rstats): 1761 if mstat is None: 1762 lu.LogWarning("Can't compute data for node %s/%s", 1763 node, instance.disks[i].iv_name) 1764 continue 1765 # we ignore the ldisk parameter 1766 perc_done, est_time, is_degraded, _ = mstat 1767 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None) 1768 if perc_done is not None: 1769 done = False 1770 if est_time is not None: 1771 rem_time = "%d estimated seconds remaining" % est_time 1772 max_time = est_time 1773 else: 1774 rem_time = "no time estimate" 1775 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % 1776 (instance.disks[i].iv_name, perc_done, rem_time)) 1777 1778 # if we're done but degraded, let's do a few small retries, to 1779 # make sure we see a stable and not transient situation; therefore 1780 # we force restart of the loop 1781 if (done or oneshot) and cumul_degraded and degr_retries > 0: 1782 logging.info("Degraded disks found, %d retries left", degr_retries) 1783 degr_retries -= 1 1784 time.sleep(1) 1785 continue 1786 1787 if done or oneshot: 1788 break 1789 1790 time.sleep(min(60, max_time)) 1791 1792 if done: 1793 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name) 1794 return not cumul_degraded
1795
1796 1797 -def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1798 """Check that mirrors are not degraded. 1799 1800 The ldisk parameter, if True, will change the test from the 1801 is_degraded attribute (which represents overall non-ok status for 1802 the device(s)) to the ldisk (representing the local storage status). 1803 1804 """ 1805 lu.cfg.SetDiskID(dev, node) 1806 if ldisk: 1807 idx = 6 1808 else: 1809 idx = 5 1810 1811 result = True 1812 if on_primary or dev.AssembleOnSecondary(): 1813 rstats = lu.rpc.call_blockdev_find(node, dev) 1814 msg = rstats.RemoteFailMsg() 1815 if msg: 1816 lu.LogWarning("Can't find disk on node %s: %s", node, msg) 1817 result = False 1818 elif not rstats.payload: 1819 lu.LogWarning("Can't find disk on node %s", node) 1820 result = False 1821 else: 1822 result = result and (not rstats.payload[idx]) 1823 if dev.children: 1824 for child in dev.children: 1825 result = result and _CheckDiskConsistency(lu, child, node, on_primary) 1826 1827 return result
1828
1829 1830 -class LUDiagnoseOS(NoHooksLU):
1831 """Logical unit for OS diagnose/query. 1832 1833 """ 1834 _OP_REQP = ["output_fields", "names"] 1835 REQ_BGL = False 1836 _FIELDS_STATIC = utils.FieldSet() 1837 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status") 1838
1839 - def ExpandNames(self):
1840 if self.op.names: 1841 raise errors.OpPrereqError("Selective OS query not supported") 1842 1843 _CheckOutputFields(static=self._FIELDS_STATIC, 1844 dynamic=self._FIELDS_DYNAMIC, 1845 selected=self.op.output_fields) 1846 1847 # Lock all nodes, in shared mode 1848 # Temporary removal of locks, should be reverted later 1849 # TODO: reintroduce locks when they are lighter-weight 1850 self.needed_locks = {}
1851 #self.share_locks[locking.LEVEL_NODE] = 1 1852 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 1853
1854 - def CheckPrereq(self):
1855 """Check prerequisites. 1856 1857 """
1858 1859 @staticmethod
1860 - def _DiagnoseByOS(node_list, rlist):
1861 """Remaps a per-node return list into an a per-os per-node dictionary 1862 1863 @param node_list: a list with the names of all nodes 1864 @param rlist: a map with node names as keys and OS objects as values 1865 1866 @rtype: dict 1867 @return: a dictionary with osnames as keys and as value another map, with 1868 nodes as keys and list of OS objects as values, eg:: 1869 1870 {"debian-etch": {"node1": [<object>,...], 1871 "node2": [<object>,]} 1872 } 1873 1874 """ 1875 all_os = {} 1876 # we build here the list of nodes that didn't fail the RPC (at RPC 1877 # level), so that nodes with a non-responding node daemon don't 1878 # make all OSes invalid 1879 good_nodes = [node_name for node_name in rlist 1880 if not rlist[node_name].failed] 1881 for node_name, nr in rlist.iteritems(): 1882 if nr.failed or not nr.data: 1883 continue 1884 for os_obj in nr.data: 1885 if os_obj.name not in all_os: 1886 # build a list of nodes for this os containing empty lists 1887 # for each node in node_list 1888 all_os[os_obj.name] = {} 1889 for nname in good_nodes: 1890 all_os[os_obj.name][nname] = [] 1891 all_os[os_obj.name][node_name].append(os_obj) 1892 return all_os
1893
1894 - def Exec(self, feedback_fn):
1895 """Compute the list of OSes. 1896 1897 """ 1898 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()] 1899 node_data = self.rpc.call_os_diagnose(valid_nodes) 1900 if node_data == False: 1901 raise errors.OpExecError("Can't gather the list of OSes") 1902 pol = self._DiagnoseByOS(valid_nodes, node_data) 1903 output = [] 1904 for os_name, os_data in pol.iteritems(): 1905 row = [] 1906 for field in self.op.output_fields: 1907 if field == "name": 1908 val = os_name 1909 elif field == "valid": 1910 val = utils.all([osl and osl[0] for osl in os_data.values()]) 1911 elif field == "node_status": 1912 val = {} 1913 for node_name, nos_list in os_data.iteritems(): 1914 val[node_name] = [(v.status, v.path) for v in nos_list] 1915 else: 1916 raise errors.ParameterError(field) 1917 row.append(val) 1918 output.append(row) 1919 1920 return output
1921
1922 1923 -class LURemoveNode(LogicalUnit):
1924 """Logical unit for removing a node. 1925 1926 """ 1927 HPATH = "node-remove" 1928 HTYPE = constants.HTYPE_NODE 1929 _OP_REQP = ["node_name"] 1930
1931 - def BuildHooksEnv(self):
1932 """Build hooks env. 1933 1934 This doesn't run on the target node in the pre phase as a failed 1935 node would then be impossible to remove. 1936 1937 """ 1938 env = { 1939 "OP_TARGET": self.op.node_name, 1940 "NODE_NAME": self.op.node_name, 1941 } 1942 all_nodes = self.cfg.GetNodeList() 1943 try: 1944 all_nodes.remove(self.op.node_name) 1945 except ValueError: 1946 logging.warning("Node %s which is about to be removed not found" 1947 " in the all nodes list", self.op.node_name) 1948 return env, all_nodes, all_nodes
1949
1950 - def CheckPrereq(self):
1951 """Check prerequisites. 1952 1953 This checks: 1954 - the node exists in the configuration 1955 - it does not have primary or secondary instances 1956 - it's not the master 1957 1958 Any errors are signaled by raising errors.OpPrereqError. 1959 1960 """ 1961 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name)) 1962 if node is None: 1963 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name) 1964 1965 instance_list = self.cfg.GetInstanceList() 1966 1967 masternode = self.cfg.GetMasterNode() 1968 if node.name == masternode: 1969 raise errors.OpPrereqError("Node is the master node," 1970 " you need to failover first.") 1971 1972 for instance_name in instance_list: 1973 instance = self.cfg.GetInstanceInfo(instance_name) 1974 if node.name in instance.all_nodes: 1975 raise errors.OpPrereqError("Instance %s is still running on the node," 1976 " please remove first." % instance_name) 1977 self.op.node_name = node.name 1978 self.node = node
1979
1980 - def Exec(self, feedback_fn):
1981 """Removes the node from the cluster. 1982 1983 """ 1984 node = self.node 1985 logging.info("Stopping the node daemon and removing configs from node %s", 1986 node.name) 1987 1988 self.context.RemoveNode(node.name) 1989 1990 self.rpc.call_node_leave_cluster(node.name) 1991 1992 # Promote nodes to master candidate as needed 1993 _AdjustCandidatePool(self)
1994
1995 1996 -class LUQueryNodes(NoHooksLU):
1997 """Logical unit for querying nodes. 1998 1999 """ 2000 _OP_REQP = ["output_fields", "names", "use_locking"] 2001 REQ_BGL = False 2002 _FIELDS_DYNAMIC = utils.FieldSet( 2003 "dtotal", "dfree", 2004 "mtotal", "mnode", "mfree", 2005 "bootid", 2006 "ctotal", "cnodes", "csockets", 2007 ) 2008 2009 _FIELDS_STATIC = utils.FieldSet( 2010 "name", "pinst_cnt", "sinst_cnt", 2011 "pinst_list", "sinst_list", 2012 "pip", "sip", "tags", 2013 "serial_no", 2014 "master_candidate", 2015 "master", 2016 "offline", 2017 "drained", 2018 "role", 2019 ) 2020
2021 - def ExpandNames(self):
2022 _CheckOutputFields(static=self._FIELDS_STATIC, 2023 dynamic=self._FIELDS_DYNAMIC, 2024 selected=self.op.output_fields) 2025 2026 self.needed_locks = {} 2027 self.share_locks[locking.LEVEL_NODE] = 1 2028 2029 if self.op.names: 2030 self.wanted = _GetWantedNodes(self, self.op.names) 2031 else: 2032 self.wanted = locking.ALL_SET 2033 2034 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields) 2035 self.do_locking = self.do_node_query and self.op.use_locking 2036 if self.do_locking: 2037 # if we don't request only static fields, we need to lock the nodes 2038 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2039 2040
2041 - def CheckPrereq(self):
2042 """Check prerequisites. 2043 2044 """ 2045 # The validation of the node list is done in the _GetWantedNodes, 2046 # if non empty, and if empty, there's no validation to do 2047 pass
2048
2049 - def Exec(self, feedback_fn):
2050 """Computes the list of nodes and their attributes. 2051 2052 """ 2053 all_info = self.cfg.GetAllNodesInfo() 2054 if self.do_locking: 2055 nodenames = self.acquired_locks[locking.LEVEL_NODE] 2056 elif self.wanted != locking.ALL_SET: 2057 nodenames = self.wanted 2058 missing = set(nodenames).difference(all_info.keys()) 2059 if missing: 2060 raise errors.OpExecError( 2061 "Some nodes were removed before retrieving their data: %s" % missing) 2062 else: 2063 nodenames = all_info.keys() 2064 2065 nodenames = utils.NiceSort(nodenames) 2066 nodelist = [all_info[name] for name in nodenames] 2067 2068 # begin data gathering 2069 2070 if self.do_node_query: 2071 live_data = {} 2072 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), 2073 self.cfg.GetHypervisorType()) 2074 for name in nodenames: 2075 nodeinfo = node_data[name] 2076 if not nodeinfo.failed and nodeinfo.data: 2077 nodeinfo = nodeinfo.data 2078 fn = utils.TryConvert 2079 live_data[name] = { 2080 "mtotal": fn(int, nodeinfo.get('memory_total', None)), 2081 "mnode": fn(int, nodeinfo.get('memory_dom0', None)), 2082 "mfree": fn(int, nodeinfo.get('memory_free', None)), 2083 "dtotal": fn(int, nodeinfo.get('vg_size', None)), 2084 "dfree": fn(int, nodeinfo.get('vg_free', None)), 2085 "ctotal": fn(int, nodeinfo.get('cpu_total', None)), 2086 "bootid": nodeinfo.get('bootid', None), 2087 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)), 2088 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)), 2089 } 2090 else: 2091 live_data[name] = {} 2092 else: 2093 live_data = dict.fromkeys(nodenames, {}) 2094 2095 node_to_primary = dict([(name, set()) for name in nodenames]) 2096 node_to_secondary = dict([(name, set()) for name in nodenames]) 2097 2098 inst_fields = frozenset(("pinst_cnt", "pinst_list", 2099 "sinst_cnt", "sinst_list")) 2100 if inst_fields & frozenset(self.op.output_fields): 2101 inst_data = self.cfg.GetAllInstancesInfo() 2102 2103 for instance_name, inst in inst_data.items(): 2104 if inst.primary_node in node_to_primary: 2105 node_to_primary[inst.primary_node].add(inst.name) 2106 for secnode in inst.secondary_nodes: 2107 if secnode in node_to_secondary: 2108 node_to_secondary[secnode].add(inst.name) 2109 2110 master_node = self.cfg.GetMasterNode() 2111 2112 # end data gathering 2113 2114 output = [] 2115 for node in nodelist: 2116 node_output = [] 2117 for field in self.op.output_fields: 2118 if field == "name": 2119 val = node.name 2120 elif field == "pinst_list": 2121 val = list(node_to_primary[node.name]) 2122 elif field == "sinst_list": 2123 val = list(node_to_secondary[node.name]) 2124 elif field == "pinst_cnt": 2125 val = len(node_to_primary[node.name]) 2126 elif field == "sinst_cnt": 2127 val = len(node_to_secondary[node.name]) 2128 elif field == "pip": 2129 val = node.primary_ip 2130 elif field == "sip": 2131 val = node.secondary_ip 2132 elif field == "tags": 2133 val = list(node.GetTags()) 2134 elif field == "serial_no": 2135 val = node.serial_no 2136 elif field == "master_candidate": 2137 val = node.master_candidate 2138 elif field == "master": 2139 val = node.name == master_node 2140 elif field == "offline": 2141 val = node.offline 2142 elif field == "drained": 2143 val = node.drained 2144 elif self._FIELDS_DYNAMIC.Matches(field): 2145 val = live_data[node.name].get(field, None) 2146 elif field == "role": 2147 if node.name == master_node: 2148 val = "M" 2149 elif node.master_candidate: 2150 val = "C" 2151 elif node.drained: 2152 val = "D" 2153 elif node.offline: 2154 val = "O" 2155 else: 2156 val = "R" 2157 else: 2158 raise errors.ParameterError(field) 2159 node_output.append(val) 2160 output.append(node_output) 2161 2162 return output
2163
2164 2165 -class LUQueryNodeVolumes(NoHooksLU):
2166 """Logical unit for getting volumes on node(s). 2167 2168 """ 2169 _OP_REQP = ["nodes", "output_fields"] 2170 REQ_BGL = False 2171 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance") 2172 _FIELDS_STATIC = utils.FieldSet("node") 2173
2174 - def ExpandNames(self):
2175 _CheckOutputFields(static=self._FIELDS_STATIC, 2176 dynamic=self._FIELDS_DYNAMIC, 2177 selected=self.op.output_fields) 2178 2179 self.needed_locks = {} 2180 self.share_locks[locking.LEVEL_NODE] = 1 2181 if not self.op.nodes: 2182 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 2183 else: 2184 self.needed_locks[locking.LEVEL_NODE] = \ 2185 _GetWantedNodes(self, self.op.nodes)
2186
2187 - def CheckPrereq(self):
2188 """Check prerequisites. 2189 2190 This checks that the fields required are valid output fields. 2191 2192 """ 2193 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2194
2195 - def Exec(self, feedback_fn):
2196 """Computes the list of nodes and their attributes. 2197 2198 """ 2199 nodenames = self.nodes 2200 volumes = self.rpc.call_node_volumes(nodenames) 2201 2202 ilist = [self.cfg.GetInstanceInfo(iname) for iname 2203 in self.cfg.GetInstanceList()] 2204 2205 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist]) 2206 2207 output = [] 2208 for node in nodenames: 2209 if node not in volumes or volumes[node].failed or not volumes[node].data: 2210 continue 2211 2212 node_vols = volumes[node].data[:] 2213 node_vols.sort(key=lambda vol: vol['dev']) 2214 2215 for vol in node_vols: 2216 node_output = [] 2217 for field in self.op.output_fields: 2218 if field == "node": 2219 val = node 2220 elif field == "phys": 2221 val = vol['dev'] 2222 elif field == "vg": 2223 val = vol['vg'] 2224 elif field == "name": 2225 val = vol['name'] 2226 elif field == "size": 2227 val = int(float(vol['size'])) 2228 elif field == "instance": 2229 for inst in ilist: 2230 if node not in lv_by_node[inst]: 2231 continue 2232 if vol['name'] in lv_by_node[inst][node]: 2233 val = inst.name 2234 break 2235 else: 2236 val = '-' 2237 else: 2238 raise errors.ParameterError(field) 2239 node_output.append(str(val)) 2240 2241 output.append(node_output) 2242 2243 return output
2244
2245 2246 -class LUAddNode(LogicalUnit):
2247 """Logical unit for adding node to the cluster. 2248 2249 """ 2250 HPATH = "node-add" 2251 HTYPE = constants.HTYPE_NODE 2252 _OP_REQP = ["node_name"] 2253
2254 - def BuildHooksEnv(self):
2255 """Build hooks env. 2256 2257 This will run on all nodes before, and on all nodes + the new node after. 2258 2259 """ 2260 env = { 2261 "OP_TARGET": self.op.node_name, 2262 "NODE_NAME": self.op.node_name, 2263 "NODE_PIP": self.op.primary_ip, 2264 "NODE_SIP": self.op.secondary_ip, 2265 } 2266 nodes_0 = self.cfg.GetNodeList() 2267 nodes_1 = nodes_0 + [self.op.node_name, ] 2268 return env, nodes_0, nodes_1
2269
2270 - def CheckPrereq(self):
2271 """Check prerequisites. 2272 2273 This checks: 2274 - the new node is not already in the config 2275 - it is resolvable 2276 - its parameters (single/dual homed) matches the cluster 2277 2278 Any errors are signaled by raising errors.OpPrereqError. 2279 2280 """ 2281 node_name = self.op.node_name 2282 cfg = self.cfg 2283 2284 dns_data = utils.HostInfo(node_name) 2285 2286 node = dns_data.name 2287 primary_ip = self.op.primary_ip = dns_data.ip 2288 secondary_ip = getattr(self.op, "secondary_ip", None) 2289 if secondary_ip is None: 2290 secondary_ip = primary_ip 2291 if not utils.IsValidIP(secondary_ip): 2292 raise errors.OpPrereqError("Invalid secondary IP given") 2293 self.op.secondary_ip = secondary_ip 2294 2295 node_list = cfg.GetNodeList() 2296 if not self.op.readd and node in node_list: 2297 raise errors.OpPrereqError("Node %s is already in the configuration" % 2298 node) 2299 elif self.op.readd and node not in node_list: 2300 raise errors.OpPrereqError("Node %s is not in the configuration" % node) 2301 2302 for existing_node_name in node_list: 2303 existing_node = cfg.GetNodeInfo(existing_node_name) 2304 2305 if self.op.readd and node == existing_node_name: 2306 if (existing_node.primary_ip != primary_ip or 2307 existing_node.secondary_ip != secondary_ip): 2308 raise errors.OpPrereqError("Readded node doesn't have the same IP" 2309 " address configuration as before") 2310 continue 2311 2312 if (existing_node.primary_ip == primary_ip or 2313 existing_node.secondary_ip == primary_ip or 2314 existing_node.primary_ip == secondary_ip or 2315 existing_node.secondary_ip == secondary_ip): 2316 raise errors.OpPrereqError("New node ip address(es) conflict with" 2317 " existing node %s" % existing_node.name) 2318 2319 # check that the type of the node (single versus dual homed) is the 2320 # same as for the master 2321 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode()) 2322 master_singlehomed = myself.secondary_ip == myself.primary_ip 2323 newbie_singlehomed = secondary_ip == primary_ip 2324 if master_singlehomed != newbie_singlehomed: 2325 if master_singlehomed: 2326 raise errors.OpPrereqError("The master has no private ip but the" 2327 " new node has one") 2328 else: 2329 raise errors.OpPrereqError("The master has a private ip but the" 2330 " new node doesn't have one") 2331 2332 # checks reachability 2333 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): 2334 raise errors.OpPrereqError("Node not reachable by ping") 2335 2336 if not newbie_singlehomed: 2337 # check reachability from my secondary ip to newbie's secondary ip 2338 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 2339 source=myself.secondary_ip): 2340 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 2341 " based ping to noded port") 2342 2343 cp_size = self.cfg.GetClusterInfo().candidate_pool_size 2344 if self.op.readd: 2345 exceptions = [node] 2346 else: 2347 exceptions = [] 2348 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions) 2349 # the new node will increase mc_max with one, so: 2350 mc_max = min(mc_max + 1, cp_size) 2351 self.master_candidate = mc_now < mc_max 2352 2353 if self.op.readd: 2354 self.new_node = self.cfg.GetNodeInfo(node) 2355 assert self.new_node is not None, "Can't retrieve locked node %s" % node 2356 else: 2357 self.new_node = objects.Node(name=node, 2358 primary_ip=primary_ip, 2359 secondary_ip=secondary_ip, 2360 master_candidate=self.master_candidate, 2361 offline=False, drained=False)
2362
2363 - def Exec(self, feedback_fn):
2364 """Adds the new node to the cluster. 2365 2366 """ 2367 new_node = self.new_node 2368 node = new_node.name 2369 2370 # for re-adds, reset the offline/drained/master-candidate flags; 2371 # we need to reset here, otherwise offline would prevent RPC calls 2372 # later in the procedure; this also means that if the re-add 2373 # fails, we are left with a non-offlined, broken node 2374 if self.op.readd: 2375 new_node.drained = new_node.offline = False 2376 self.LogInfo("Readding a node, the offline/drained flags were reset") 2377 # if we demote the node, we do cleanup later in the procedure 2378 new_node.master_candidate = self.master_candidate 2379 2380 # notify the user about any possible mc promotion 2381 if new_node.master_candidate: 2382 self.LogInfo("Node will be a master candidate") 2383 2384 # check connectivity 2385 result = self.rpc.call_version([node])[node] 2386 result.Raise() 2387 if result.data: 2388 if constants.PROTOCOL_VERSION == result.data: 2389 logging.info("Communication to node %s fine, sw version %s match", 2390 node, result.data) 2391 else: 2392 raise errors.OpExecError("Version mismatch master version %s," 2393 " node version %s" % 2394 (constants.PROTOCOL_VERSION, result.data)) 2395 else: 2396 raise errors.OpExecError("Cannot get version from the new node") 2397 2398 # setup ssh on node 2399 logging.info("Copy ssh key to node %s", node) 2400 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) 2401 keyarray = [] 2402 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB, 2403 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB, 2404 priv_key, pub_key] 2405 2406 for i in keyfiles: 2407 f = open(i, 'r') 2408 try: 2409 keyarray.append(f.read()) 2410 finally: 2411 f.close() 2412 2413 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], 2414 keyarray[2], 2415 keyarray[3], keyarray[4], keyarray[5]) 2416 2417 msg = result.RemoteFailMsg() 2418 if msg: 2419 raise errors.OpExecError("Cannot transfer ssh keys to the" 2420 " new node: %s" % msg) 2421 2422 # Add node to our /etc/hosts, and add key to known_hosts 2423 if self.cfg.GetClusterInfo().modify_etc_hosts: 2424 utils.AddHostToEtcHosts(new_node.name) 2425 2426 if new_node.secondary_ip != new_node.primary_ip: 2427 result = self.rpc.call_node_has_ip_address(new_node.name, 2428 new_node.secondary_ip) 2429 if result.failed or not result.data: 2430 raise errors.OpExecError("Node claims it doesn't have the secondary ip" 2431 " you gave (%s). Please fix and re-run this" 2432 " command." % new_node.secondary_ip) 2433 2434 node_verify_list = [self.cfg.GetMasterNode()] 2435 node_verify_param = { 2436 'nodelist': [node], 2437 # TODO: do a node-net-test as well? 2438 } 2439 2440 result = self.rpc.call_node_verify(node_verify_list, node_verify_param, 2441 self.cfg.GetClusterName()) 2442 for verifier in node_verify_list: 2443 if result[verifier].failed or not result[verifier].data: 2444 raise errors.OpExecError("Cannot communicate with %s's node daemon" 2445 " for remote verification" % verifier) 2446 if result[verifier].data['nodelist']: 2447 for failed in result[verifier].data['nodelist']: 2448 feedback_fn("ssh/hostname verification failed" 2449 " (checking from %s): %s" % 2450 (verifier, result[verifier].data['nodelist'][failed])) 2451 raise errors.OpExecError("ssh/hostname verification failed.") 2452 2453 # Distribute updated /etc/hosts and known_hosts to all nodes, 2454 # including the node just added 2455 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) 2456 dist_nodes = self.cfg.GetNodeList() 2457 if not self.op.readd: 2458 dist_nodes.append(node) 2459 if myself.name in dist_nodes: 2460 dist_nodes.remove(myself.name) 2461 2462 logging.debug("Copying hosts and known_hosts to all nodes") 2463 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE): 2464 result = self.rpc.call_upload_file(dist_nodes, fname) 2465 for to_node, to_result in result.iteritems(): 2466 if to_result.failed or not to_result.data: 2467 logging.error("Copy of file %s to node %s failed", fname, to_node) 2468 2469 to_copy = [] 2470 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors 2471 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors): 2472 to_copy.append(constants.VNC_PASSWORD_FILE) 2473 2474 for fname in to_copy: 2475 result = self.rpc.call_upload_file([node], fname) 2476 if result[node].failed or not result[node]: 2477 logging.error("Could not copy file %s to node %s", fname, node) 2478 2479 if self.op.readd: 2480 self.context.ReaddNode(new_node) 2481 # make sure we redistribute the config 2482 self.cfg.Update(new_node) 2483 # and make sure the new node will not have old files around 2484 if not new_node.master_candidate: 2485 result = self.rpc.call_node_demote_from_mc(new_node.name) 2486 msg = result.RemoteFailMsg() 2487 if msg: 2488 self.LogWarning("Node failed to demote itself from master" 2489 " candidate status: %s" % msg) 2490 else: 2491 self.context.AddNode(new_node)
2492
2493 2494 -class LUSetNodeParams(LogicalUnit):
2495 """Modifies the parameters of a node. 2496 2497 """ 2498 HPATH = "node-modify" 2499 HTYPE = constants.HTYPE_NODE 2500 _OP_REQP = ["node_name"] 2501 REQ_BGL = False 2502
2503 - def CheckArguments(self):
2504 node_name = self.cfg.ExpandNodeName(self.op.node_name) 2505 if node_name is None: 2506 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) 2507 self.op.node_name = node_name 2508 _CheckBooleanOpField(self.op, 'master_candidate') 2509 _CheckBooleanOpField(self.op, 'offline') 2510 _CheckBooleanOpField(self.op, 'drained') 2511 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained] 2512 if all_mods.count(None) == 3: 2513 raise errors.OpPrereqError("Please pass at least one modification") 2514 if all_mods.count(True) > 1: 2515 raise errors.OpPrereqError("Can't set the node into more than one" 2516 " state at the same time")
2517
2518 - def ExpandNames(self):
2519 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2520
2521 - def BuildHooksEnv(self):
2522 """Build hooks env. 2523 2524 This runs on the master node. 2525 2526 """ 2527 env = { 2528 "OP_TARGET": self.op.node_name, 2529 "MASTER_CANDIDATE": str(self.op.master_candidate), 2530 "OFFLINE": str(self.op.offline), 2531 "DRAINED": str(self.op.drained), 2532 } 2533 nl = [self.cfg.GetMasterNode(), 2534 self.op.node_name] 2535 return env, nl, nl
2536
2537 - def CheckPrereq(self):
2538 """Check prerequisites. 2539 2540 This only checks the instance list against the existing names. 2541 2542 """ 2543 node = self.node = self.cfg.GetNodeInfo(self.op.node_name) 2544 2545 if (self.op.master_candidate is not None or 2546 self.op.drained is not None or 2547 self.op.offline is not None): 2548 # we can't change the master's node flags 2549 if self.op.node_name == self.cfg.GetMasterNode(): 2550 raise errors.OpPrereqError("The master role can be changed" 2551 " only via masterfailover") 2552 2553 if ((self.op.master_candidate == False or self.op.offline == True or 2554 self.op.drained == True) and node.master_candidate): 2555 cp_size = self.cfg.GetClusterInfo().candidate_pool_size 2556 num_candidates, _ = self.cfg.GetMasterCandidateStats() 2557 if num_candidates <= cp_size: 2558 msg = ("Not enough master candidates (desired" 2559 " %d, new value will be %d)" % (cp_size, num_candidates-1)) 2560 if self.op.force: 2561 self.LogWarning(msg) 2562 else: 2563 raise errors.OpPrereqError(msg) 2564 2565 if (self.op.master_candidate == True and 2566 ((node.offline and not self.op.offline == False) or 2567 (node.drained and not self.op.drained == False))): 2568 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set" 2569 " to master_candidate" % node.name) 2570 2571 return
2572
2573 - def Exec(self, feedback_fn):
2574 """Modifies a node. 2575 2576 """ 2577 node = self.node 2578 2579 result = [] 2580 changed_mc = False 2581 2582 if self.op.offline is not None: 2583 node.offline = self.op.offline 2584 result.append(("offline", str(self.op.offline))) 2585 if self.op.offline == True: 2586 if node.master_candidate: 2587 node.master_candidate = False 2588 changed_mc = True 2589 result.append(("master_candidate", "auto-demotion due to offline")) 2590 if node.drained: 2591 node.drained = False 2592 result.append(("drained", "clear drained status due to offline")) 2593 2594 if self.op.master_candidate is not None: 2595 node.master_candidate = self.op.master_candidate 2596 changed_mc = True 2597 result.append(("master_candidate", str(self.op.master_candidate))) 2598 if self.op.master_candidate == False: 2599 rrc = self.rpc.call_node_demote_from_mc(node.name) 2600 msg = rrc.RemoteFailMsg() 2601 if msg: 2602 self.LogWarning("Node failed to demote itself: %s" % msg) 2603 2604 if self.op.drained is not None: 2605 node.drained = self.op.drained 2606 result.append(("drained", str(self.op.drained))) 2607 if self.op.drained == True: 2608 if node.master_candidate: 2609 node.master_candidate = False 2610 changed_mc = True 2611 result.append(("master_candidate", "auto-demotion due to drain")) 2612 rrc = self.rpc.call_node_demote_from_mc(node.name) 2613 msg = rrc.RemoteFailMsg() 2614 if msg: 2615 self.LogWarning("Node failed to demote itself: %s" % msg) 2616 if node.offline: 2617 node.offline = False 2618 result.append(("offline", "clear offline status due to drain")) 2619 2620 # this will trigger configuration file update, if needed 2621 self.cfg.Update(node) 2622 # this will trigger job queue propagation or cleanup 2623 if changed_mc: 2624 self.context.ReaddNode(node) 2625 2626 return result
2627
2628 2629 -class LUQueryClusterInfo(NoHooksLU):
2630 """Query cluster configuration. 2631 2632 """ 2633 _OP_REQP = [] 2634 REQ_BGL = False 2635
2636 - def ExpandNames(self):
2637 self.needed_locks = {}
2638
2639 - def CheckPrereq(self):
2640 """No prerequsites needed for this LU. 2641 2642 """ 2643 pass
2644
2645 - def Exec(self, feedback_fn):
2646 """Return cluster config. 2647 2648 """ 2649 cluster = self.cfg.GetClusterInfo() 2650 result = { 2651 "software_version": constants.RELEASE_VERSION, 2652 "protocol_version": constants.PROTOCOL_VERSION, 2653 "config_version": constants.CONFIG_VERSION, 2654 "os_api_version": constants.OS_API_VERSION, 2655 "export_version": constants.EXPORT_VERSION, 2656 "architecture": (platform.architecture()[0], platform.machine()), 2657 "name": cluster.cluster_name, 2658 "master": cluster.master_node, 2659 "default_hypervisor": cluster.default_hypervisor, 2660 "enabled_hypervisors": cluster.enabled_hypervisors, 2661 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name]) 2662 for hypervisor_name in cluster.enabled_hypervisors]), 2663 "beparams": cluster.beparams, 2664 "candidate_pool_size": cluster.candidate_pool_size, 2665 "default_bridge": cluster.default_bridge, 2666 "master_netdev": cluster.master_netdev, 2667 "volume_group_name": cluster.volume_group_name, 2668 "file_storage_dir": cluster.file_storage_dir, 2669 "tags": list(cluster.GetTags()), 2670 } 2671 2672 return result
2673
2674 2675 -class LUQueryConfigValues(NoHooksLU):
2676 """Return configuration values. 2677 2678 """ 2679 _OP_REQP = [] 2680 REQ_BGL = False 2681 _FIELDS_DYNAMIC = utils.FieldSet() 2682 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag") 2683
2684 - def ExpandNames(self):
2685 self.needed_locks = {} 2686 2687 _CheckOutputFields(static=self._FIELDS_STATIC, 2688 dynamic=self._FIELDS_DYNAMIC, 2689 selected=self.op.output_fields)
2690
2691 - def CheckPrereq(self):
2692 """No prerequisites. 2693 2694 """ 2695 pass
2696
2697 - def Exec(self, feedback_fn):
2698 """Dump a representation of the cluster config to the standard output. 2699 2700 """ 2701 values = [] 2702 for field in self.op.output_fields: 2703 if field == "cluster_name": 2704 entry = self.cfg.GetClusterName() 2705 elif field == "master_node": 2706 entry = self.cfg.GetMasterNode() 2707 elif field == "drain_flag": 2708 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) 2709 else: 2710 raise errors.ParameterError(field) 2711 values.append(entry) 2712 return values
2713
2714 2715 -class LUActivateInstanceDisks(NoHooksLU):
2716 """Bring up an instance's disks. 2717 2718 """ 2719 _OP_REQP = ["instance_name"] 2720 REQ_BGL = False 2721
2722 - def ExpandNames(self):
2723 self._ExpandAndLockInstance() 2724 self.needed_locks[locking.LEVEL_NODE] = [] 2725 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2726
2727 - def DeclareLocks(self, level):
2728 if level == locking.LEVEL_NODE: 2729 self._LockInstancesNodes()
2730
2731 - def CheckPrereq(self):
2732 """Check prerequisites. 2733 2734 This checks that the instance is in the cluster. 2735 2736 """ 2737 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) 2738 assert self.instance is not None, \ 2739 "Cannot retrieve locked instance %s" % self.op.instance_name 2740 _CheckNodeOnline(self, self.instance.primary_node) 2741 if not hasattr(self.op, "ignore_size"): 2742 self.op.ignore_size = False
2743
2744 - def Exec(self, feedback_fn):
2745 """Activate the disks. 2746 2747 """ 2748 disks_ok, disks_info = \ 2749 _AssembleInstanceDisks(self, self.instance, 2750 ignore_size=self.op.ignore_size) 2751 if not disks_ok: 2752 raise errors.OpExecError("Cannot activate block devices") 2753 2754 return disks_info
2755
2756 2757 -def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False, 2758 ignore_size=False):
2759 """Prepare the block devices for an instance. 2760 2761 This sets up the block devices on all nodes. 2762 2763 @type lu: L{LogicalUnit} 2764 @param lu: the logical unit on whose behalf we execute 2765 @type instance: L{objects.Instance} 2766 @param instance: the instance for whose disks we assemble 2767 @type ignore_secondaries: boolean 2768 @param ignore_secondaries: if true, errors on secondary nodes 2769 won't result in an error return from the function 2770 @type ignore_size: boolean 2771 @param ignore_size: if true, the current known size of the disk 2772 will not be used during the disk activation, useful for cases 2773 when the size is wrong 2774 @return: False if the operation failed, otherwise a list of 2775 (host, instance_visible_name, node_visible_name) 2776 with the mapping from node devices to instance devices 2777 2778 """ 2779 device_info = [] 2780 disks_ok = True 2781 iname = instance.name 2782 # With the two passes mechanism we try to reduce the window of 2783 # opportunity for the race condition of switching DRBD to primary 2784 # before handshaking occured, but we do not eliminate it 2785 2786 # The proper fix would be to wait (with some limits) until the 2787 # connection has been made and drbd transitions from WFConnection 2788 # into any other network-connected state (Connected, SyncTarget, 2789 # SyncSource, etc.) 2790 2791 # 1st pass, assemble on all nodes in secondary mode 2792 for inst_disk in instance.disks: 2793 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): 2794 if ignore_size: 2795 node_disk = node_disk.Copy() 2796 node_disk.UnsetSize() 2797 lu.cfg.SetDiskID(node_disk, node) 2798 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False) 2799 msg = result.RemoteFailMsg() 2800 if msg: 2801 lu.proc.LogWarning("Could not prepare block device %s on node %s" 2802 " (is_primary=False, pass=1): %s", 2803 inst_disk.iv_name, node, msg) 2804 if not ignore_secondaries: 2805 disks_ok = False 2806 2807 # FIXME: race condition on drbd migration to primary 2808 2809 # 2nd pass, do only the primary node 2810 for inst_disk in instance.disks: 2811 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): 2812 if node != instance.primary_node: 2813 continue 2814 if ignore_size: 2815 node_disk = node_disk.Copy() 2816 node_disk.UnsetSize() 2817 lu.cfg.SetDiskID(node_disk, node) 2818 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True) 2819 msg = result.RemoteFailMsg() 2820 if msg: 2821 lu.proc.LogWarning("Could not prepare block device %s on node %s" 2822 " (is_primary=True, pass=2): %s", 2823 inst_disk.iv_name, node, msg) 2824 disks_ok = False 2825 device_info.append((instance.primary_node, inst_disk.iv_name, 2826 result.payload)) 2827 2828 # leave the disks configured for the primary node 2829 # this is a workaround that would be fixed better by 2830 # improving the logical/physical id handling 2831 for disk in instance.disks: 2832 lu.cfg.SetDiskID(disk, instance.primary_node) 2833 2834 return disks_ok, device_info
2835
2836 2837 -def _StartInstanceDisks(lu, instance, force):
2838 """Start the disks of an instance. 2839 2840 """ 2841 disks_ok, _ = _AssembleInstanceDisks(lu, instance, 2842 ignore_secondaries=force) 2843 if not disks_ok: 2844 _ShutdownInstanceDisks(lu, instance) 2845 if force is not None and not force: 2846 lu.proc.LogWarning("", hint="If the message above refers to a" 2847 " secondary node," 2848 " you can retry the operation using '--force'.") 2849 raise errors.OpExecError("Disk consistency error")
2850
2851 2852 -class LUDeactivateInstanceDisks(NoHooksLU):
2853 """Shutdown an instance's disks. 2854 2855 """ 2856 _OP_REQP = ["instance_name"] 2857 REQ_BGL = False 2858
2859 - def ExpandNames(self):
2860 self._ExpandAndLockInstance() 2861 self.needed_locks[locking.LEVEL_NODE] = [] 2862 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2863
2864 - def DeclareLocks(self, level):
2865 if level == locking.LEVEL_NODE: 2866 self._LockInstancesNodes()
2867
2868 - def CheckPrereq(self):
2869 """Check prerequisites. 2870 2871 This checks that the instance is in the cluster. 2872 2873 """ 2874 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) 2875 assert self.instance is not None, \ 2876 "Cannot retrieve locked instance %s" % self.op.instance_name
2877
2878 - def Exec(self, feedback_fn):
2879 """Deactivate the disks 2880 2881 """ 2882 instance = self.instance 2883 _SafeShutdownInstanceDisks(self, instance)
2884
2885 2886 -def _SafeShutdownInstanceDisks(lu, instance):
2887 """Shutdown block devices of an instance. 2888 2889 This function checks if an instance is running, before calling 2890 _ShutdownInstanceDisks. 2891 2892 """ 2893 ins_l = lu.rpc.call_instance_list([instance.primary_node], 2894 [instance.hypervisor]) 2895 ins_l = ins_l[instance.primary_node] 2896 if ins_l.failed or not isinstance(ins_l.data, list): 2897 raise errors.OpExecError("Can't contact node '%s'" % 2898 instance.primary_node) 2899 2900 if instance.name in ins_l.data: 2901 raise errors.OpExecError("Instance is running, can't shutdown" 2902 " block devices.") 2903 2904 _ShutdownInstanceDisks(lu, instance)
2905
2906 2907 -def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2908 """Shutdown block devices of an instance. 2909 2910 This does the shutdown on all nodes of the instance. 2911 2912 If the ignore_primary is false, errors on the primary node are 2913 ignored. 2914 2915 """ 2916 all_result = True 2917 for disk in instance.disks: 2918 for node, top_disk in disk.ComputeNodeTree(instance.primary_node): 2919 lu.cfg.SetDiskID(top_disk, node) 2920 result = lu.rpc.call_blockdev_shutdown(node, top_disk) 2921 msg = result.RemoteFailMsg() 2922 if msg: 2923 lu.LogWarning("Could not shutdown block device %s on node %s: %s", 2924 disk.iv_name, node, msg) 2925 if not ignore_primary or node != instance.primary_node: 2926 all_result = False 2927 return all_result
2928
2929 2930 -def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2931 """Checks if a node has enough free memory. 2932 2933 This function check if a given node has the needed amount of free 2934 memory. In case the node has less memory or we cannot get the 2935 information from the node, this function raise an OpPrereqError 2936 exception. 2937 2938 @type lu: C{LogicalUnit} 2939 @param lu: a logical unit from which we get configuration data 2940 @type node: C{str} 2941 @param node: the node to check 2942 @type reason: C{str} 2943 @param reason: string to use in the error message 2944 @type requested: C{int} 2945 @param requested: the amount of memory in MiB to check for 2946 @type hypervisor_name: C{str} 2947 @param hypervisor_name: the hypervisor to ask for memory stats 2948 @raise errors.OpPrereqError: if the node doesn't have enough memory, or 2949 we cannot check the node 2950 2951 """ 2952 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name) 2953 nodeinfo[node].Raise() 2954 free_mem = nodeinfo[node].data.get('memory_free') 2955 if not isinstance(free_mem, int): 2956 raise errors.OpPrereqError("Can't compute free memory on node %s, result" 2957 " was '%s'" % (node, free_mem)) 2958 if requested > free_mem: 2959 raise errors.OpPrereqError("Not enough memory on node %s for %s:" 2960 " needed %s MiB, available %s MiB" % 2961 (node, reason, requested, free_mem))
2962
2963 2964 -class LUStartupInstance(LogicalUnit):
2965 """Starts an instance. 2966 2967 """ 2968 HPATH = "instance-start" 2969 HTYPE = constants.HTYPE_INSTANCE 2970 _OP_REQP = ["instance_name", "force"] 2971 REQ_BGL = False 2972
2973 - def ExpandNames(self):
2975
2976 - def BuildHooksEnv(self):
2977 """Build hooks env. 2978 2979 This runs on master, primary and secondary nodes of the instance. 2980 2981 """ 2982 env = { 2983 "FORCE": self.op.force, 2984 } 2985 env.update(_BuildInstanceHookEnvByObject(self, self.instance)) 2986 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 2987 return env, nl, nl
2988
2989 - def CheckPrereq(self):
2990 """Check prerequisites. 2991 2992 This checks that the instance is in the cluster. 2993 2994 """ 2995 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) 2996 assert self.instance is not None, \ 2997 "Cannot retrieve locked instance %s" % self.op.instance_name 2998 2999 # extra beparams 3000 self.beparams = getattr(self.op, "beparams", {}) 3001 if self.beparams: 3002 if not isinstance(self.beparams, dict): 3003 raise errors.OpPrereqError("Invalid beparams passed: %s, expected" 3004 " dict" % (type(self.beparams), )) 3005 # fill the beparams dict 3006 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES) 3007 self.op.beparams = self.beparams 3008 3009 # extra hvparams 3010 self.hvparams = getattr(self.op, "hvparams", {}) 3011 if self.hvparams: 3012 if not isinstance(self.hvparams, dict): 3013 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected" 3014 " dict" % (type(self.hvparams), )) 3015 3016 # check hypervisor parameter syntax (locally) 3017 cluster = self.cfg.GetClusterInfo() 3018 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES) 3019 filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor], 3020 instance.hvparams) 3021 filled_hvp.update(self.hvparams) 3022 hv_type = hypervisor.GetHypervisor(instance.hypervisor) 3023 hv_type.CheckParameterSyntax(filled_hvp) 3024 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp) 3025 self.op.hvparams = self.hvparams 3026 3027 _CheckNodeOnline(self, instance.primary_node) 3028 3029 bep = self.cfg.GetClusterInfo().FillBE(instance) 3030 # check bridges existence 3031 _CheckInstanceBridgesExist(self, instance) 3032 3033 remote_info = self.rpc.call_instance_info(instance.primary_node, 3034 instance.name, 3035 instance.hypervisor) 3036 remote_info.Raise() 3037 if not remote_info.data: 3038 _CheckNodeFreeMemory(self, instance.primary_node, 3039 "starting instance %s" % instance.name, 3040 bep[constants.BE_MEMORY], instance.hypervisor)
3041
3042 - def Exec(self, feedback_fn):
3043 """Start the instance. 3044 3045 """ 3046 instance = self.instance 3047 force = self.op.force 3048 3049 self.cfg.MarkInstanceUp(instance.name) 3050 3051 node_current = instance.primary_node 3052 3053 _StartInstanceDisks(self, instance, force) 3054 3055 result = self.rpc.call_instance_start(node_current, instance, 3056 self.hvparams, self.beparams) 3057 msg = result.RemoteFailMsg() 3058 if msg: 3059 _ShutdownInstanceDisks(self, instance) 3060 raise errors.OpExecError("Could not start instance: %s" % msg)
3061
3062 3063 -class LURebootInstance(LogicalUnit):
3064 """Reboot an instance. 3065 3066 """ 3067 HPATH = "instance-reboot" 3068 HTYPE = constants.HTYPE_INSTANCE 3069 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"] 3070 REQ_BGL = False 3071
3072 - def ExpandNames(self):
3073 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT, 3074 constants.INSTANCE_REBOOT_HARD, 3075 constants.INSTANCE_REBOOT_FULL]: 3076 raise errors.ParameterError("reboot type not in [%s, %s, %s]" % 3077 (constants.INSTANCE_REBOOT_SOFT, 3078 constants.INSTANCE_REBOOT_HARD, 3079 constants.INSTANCE_REBOOT_FULL)) 3080 self._ExpandAndLockInstance()
3081
3082 - def BuildHooksEnv(self):
3083 """Build hooks env. 3084 3085 This runs on master, primary and secondary nodes of the instance. 3086 3087 """ 3088 env = { 3089 "IGNORE_SECONDARIES": self.op.ignore_secondaries, 3090 "REBOOT_TYPE": self.op.reboot_type, 3091 } 3092 env.update(_BuildInstanceHookEnvByObject(self, self.instance)) 3093 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 3094 return env, nl, nl
3095
3096 - def CheckPrereq(self):
3097 """Check prerequisites. 3098 3099 This checks that the instance is in the cluster. 3100 3101 """ 3102 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) 3103 assert self.instance is not None, \ 3104 "Cannot retrieve locked instance %s" % self.op.instance_name 3105 3106 _CheckNodeOnline(self, instance.primary_node) 3107 3108 # check bridges existence 3109 _CheckInstanceBridgesExist(self, instance)
3110
3111 - def Exec(self, feedback_fn):
3112 """Reboot the instance. 3113 3114 """ 3115 instance = self.instance 3116 ignore_secondaries = self.op.ignore_secondaries 3117 reboot_type = self.op.reboot_type 3118 3119 node_current = instance.primary_node 3120 3121 if reboot_type in [constants.INSTANCE_REBOOT_SOFT, 3122 constants.INSTANCE_REBOOT_HARD]: 3123 for disk in instance.disks: 3124 self.cfg.SetDiskID(disk, node_current) 3125 result = self.rpc.call_instance_reboot(node_current, instance, 3126 reboot_type) 3127 msg = result.RemoteFailMsg() 3128 if msg: 3129 raise errors.OpExecError("Could not reboot instance: %s" % msg) 3130 else: 3131 result = self.rpc.call_instance_shutdown(node_current, instance) 3132 msg = result.RemoteFailMsg() 3133 if msg: 3134 raise errors.OpExecError("Could not shutdown instance for" 3135 " full reboot: %s" % msg) 3136 _ShutdownInstanceDisks(self, instance) 3137 _StartInstanceDisks(self, instance, ignore_secondaries) 3138 result = self.rpc.call_instance_start(node_current, instance, None, None) 3139 msg = result.RemoteFailMsg() 3140 if msg: 3141 _ShutdownInstanceDisks(self, instance) 3142 raise errors.OpExecError("Could not start instance for" 3143 " full reboot: %s" % msg) 3144 3145 self.cfg.MarkInstanceUp(instance.name)
3146
3147 3148 -class LUShutdownInstance(LogicalUnit):
3149 """Shutdown an instance. 3150 3151 """ 3152 HPATH = "instance-stop" 3153 HTYPE = constants.HTYPE_INSTANCE 3154 _OP_REQP = ["instance_name"] 3155 REQ_BGL = False 3156
3157 - def ExpandNames(self):
3159
3160 - def BuildHooksEnv(self):
3161 """Build hooks env. 3162 3163 This runs on master, primary and secondary nodes of the instance. 3164 3165 """ 3166 env = _BuildInstanceHookEnvByObject(self, self.instance) 3167 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 3168 return env, nl, nl
3169
3170 - def CheckPrereq(self):
3171 """Check prerequisites. 3172 3173 This checks that the instance is in the cluster. 3174 3175 """ 3176 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) 3177 assert self.instance is not None, \ 3178 "Cannot retrieve locked instance %s" % self.op.instance_name 3179 _CheckNodeOnline(self, self.instance.primary_node)
3180
3181 - def Exec(self, feedback_fn):
3182 """Shutdown the instance. 3183 3184 """ 3185 instance = self.instance 3186 node_current = instance.primary_node 3187 self.cfg.MarkInstanceDown(instance.name) 3188 result = self.rpc.call_instance_shutdown(node_current, instance) 3189 msg = result.RemoteFailMsg() 3190 if msg: 3191 self.proc.LogWarning("Could not shutdown instance: %s" % msg) 3192 3193 _ShutdownInstanceDisks(self, instance)
3194
3195 3196 -class LUReinstallInstance(LogicalUnit):
3197 """Reinstall an instance. 3198 3199 """ 3200 HPATH = "instance-reinstall" 3201 HTYPE = constants.HTYPE_INSTANCE 3202