Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with nodes.""" 
  32   
  33  import logging 
  34  import operator 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import netutils 
  40  from ganeti import objects 
  41  from ganeti import opcodes 
  42  from ganeti import qlang 
  43  from ganeti import query 
  44  from ganeti import rpc 
  45  from ganeti import utils 
  46  from ganeti.masterd import iallocator 
  47   
  48  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \ 
  49    ResultWithJobs 
  50  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  51    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  52    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  53    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  54    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  55    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  56    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  57    FindFaultyInstanceDisks, CheckStorageTypeEnabled 
  58   
  59   
60 -def _DecideSelfPromotion(lu, exceptions=None):
61 """Decide whether I should promote myself as a master candidate. 62 63 """ 64 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 65 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 66 # the new node will increase mc_max with one, so: 67 mc_should = min(mc_should + 1, cp_size) 68 return mc_now < mc_should
69 70
71 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
72 """Ensure that a node has the given secondary ip. 73 74 @type lu: L{LogicalUnit} 75 @param lu: the LU on behalf of which we make the check 76 @type node: L{objects.Node} 77 @param node: the node to check 78 @type secondary_ip: string 79 @param secondary_ip: the ip to check 80 @type prereq: boolean 81 @param prereq: whether to throw a prerequisite or an execute error 82 @raise errors.OpPrereqError: if the node doesn't have the ip, 83 and prereq=True 84 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 85 86 """ 87 # this can be called with a new node, which has no UUID yet, so perform the 88 # RPC call using its name 89 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 90 result.Raise("Failure checking secondary ip on node %s" % node.name, 91 prereq=prereq, ecode=errors.ECODE_ENVIRON) 92 if not result.payload: 93 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 94 " please fix and re-run this command" % secondary_ip) 95 if prereq: 96 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 97 else: 98 raise errors.OpExecError(msg)
99 100
101 -class LUNodeAdd(LogicalUnit):
102 """Logical unit for adding node to the cluster. 103 104 """ 105 HPATH = "node-add" 106 HTYPE = constants.HTYPE_NODE 107 _NFLAGS = ["master_capable", "vm_capable"] 108
109 - def CheckArguments(self):
110 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 111 # validate/normalize the node name 112 self.hostname = netutils.GetHostname(name=self.op.node_name, 113 family=self.primary_ip_family) 114 self.op.node_name = self.hostname.name 115 116 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 117 raise errors.OpPrereqError("Cannot readd the master node", 118 errors.ECODE_STATE) 119 120 if self.op.readd and self.op.group: 121 raise errors.OpPrereqError("Cannot pass a node group when a node is" 122 " being readded", errors.ECODE_INVAL)
123
124 - def BuildHooksEnv(self):
125 """Build hooks env. 126 127 This will run on all nodes before, and on all nodes + the new node after. 128 129 """ 130 return { 131 "OP_TARGET": self.op.node_name, 132 "NODE_NAME": self.op.node_name, 133 "NODE_PIP": self.op.primary_ip, 134 "NODE_SIP": self.op.secondary_ip, 135 "MASTER_CAPABLE": str(self.op.master_capable), 136 "VM_CAPABLE": str(self.op.vm_capable), 137 }
138
139 - def BuildHooksNodes(self):
140 """Build hooks nodes. 141 142 """ 143 hook_nodes = self.cfg.GetNodeList() 144 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 145 if new_node_info is not None: 146 # Exclude added node 147 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 148 149 # add the new node as post hook node by name; it does not have an UUID yet 150 return (hook_nodes, hook_nodes)
151
152 - def PreparePostHookNodes(self, post_hook_node_uuids):
153 return post_hook_node_uuids + [self.new_node.uuid]
154
155 - def CheckPrereq(self):
156 """Check prerequisites. 157 158 This checks: 159 - the new node is not already in the config 160 - it is resolvable 161 - its parameters (single/dual homed) matches the cluster 162 163 Any errors are signaled by raising errors.OpPrereqError. 164 165 """ 166 node_name = self.hostname.name 167 self.op.primary_ip = self.hostname.ip 168 if self.op.secondary_ip is None: 169 if self.primary_ip_family == netutils.IP6Address.family: 170 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 171 " IPv4 address must be given as secondary", 172 errors.ECODE_INVAL) 173 self.op.secondary_ip = self.op.primary_ip 174 175 secondary_ip = self.op.secondary_ip 176 if not netutils.IP4Address.IsValid(secondary_ip): 177 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 178 " address" % secondary_ip, errors.ECODE_INVAL) 179 180 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 181 if not self.op.readd and existing_node_info is not None: 182 raise errors.OpPrereqError("Node %s is already in the configuration" % 183 node_name, errors.ECODE_EXISTS) 184 elif self.op.readd and existing_node_info is None: 185 raise errors.OpPrereqError("Node %s is not in the configuration" % 186 node_name, errors.ECODE_NOENT) 187 188 self.changed_primary_ip = False 189 190 for existing_node in self.cfg.GetAllNodesInfo().values(): 191 if self.op.readd and node_name == existing_node.name: 192 if existing_node.secondary_ip != secondary_ip: 193 raise errors.OpPrereqError("Readded node doesn't have the same IP" 194 " address configuration as before", 195 errors.ECODE_INVAL) 196 if existing_node.primary_ip != self.op.primary_ip: 197 self.changed_primary_ip = True 198 199 continue 200 201 if (existing_node.primary_ip == self.op.primary_ip or 202 existing_node.secondary_ip == self.op.primary_ip or 203 existing_node.primary_ip == secondary_ip or 204 existing_node.secondary_ip == secondary_ip): 205 raise errors.OpPrereqError("New node ip address(es) conflict with" 206 " existing node %s" % existing_node.name, 207 errors.ECODE_NOTUNIQUE) 208 209 # After this 'if' block, None is no longer a valid value for the 210 # _capable op attributes 211 if self.op.readd: 212 assert existing_node_info is not None, \ 213 "Can't retrieve locked node %s" % node_name 214 for attr in self._NFLAGS: 215 if getattr(self.op, attr) is None: 216 setattr(self.op, attr, getattr(existing_node_info, attr)) 217 else: 218 for attr in self._NFLAGS: 219 if getattr(self.op, attr) is None: 220 setattr(self.op, attr, True) 221 222 if self.op.readd and not self.op.vm_capable: 223 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 224 if pri or sec: 225 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 226 " flag set to false, but it already holds" 227 " instances" % node_name, 228 errors.ECODE_STATE) 229 230 # check that the type of the node (single versus dual homed) is the 231 # same as for the master 232 myself = self.cfg.GetMasterNodeInfo() 233 master_singlehomed = myself.secondary_ip == myself.primary_ip 234 newbie_singlehomed = secondary_ip == self.op.primary_ip 235 if master_singlehomed != newbie_singlehomed: 236 if master_singlehomed: 237 raise errors.OpPrereqError("The master has no secondary ip but the" 238 " new node has one", 239 errors.ECODE_INVAL) 240 else: 241 raise errors.OpPrereqError("The master has a secondary ip but the" 242 " new node doesn't have one", 243 errors.ECODE_INVAL) 244 245 # checks reachability 246 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 247 raise errors.OpPrereqError("Node not reachable by ping", 248 errors.ECODE_ENVIRON) 249 250 if not newbie_singlehomed: 251 # check reachability from my secondary ip to newbie's secondary ip 252 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 253 source=myself.secondary_ip): 254 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 255 " based ping to node daemon port", 256 errors.ECODE_ENVIRON) 257 258 if self.op.readd: 259 exceptions = [existing_node_info.uuid] 260 else: 261 exceptions = [] 262 263 if self.op.master_capable: 264 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 265 else: 266 self.master_candidate = False 267 268 if self.op.readd: 269 self.new_node = existing_node_info 270 else: 271 node_group = self.cfg.LookupNodeGroup(self.op.group) 272 self.new_node = objects.Node(name=node_name, 273 primary_ip=self.op.primary_ip, 274 secondary_ip=secondary_ip, 275 master_candidate=self.master_candidate, 276 offline=False, drained=False, 277 group=node_group, ndparams={}) 278 279 if self.op.ndparams: 280 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 281 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 282 "node", "cluster or group") 283 284 if self.op.hv_state: 285 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 286 287 if self.op.disk_state: 288 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 289 290 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 291 # it a property on the base class. 292 rpcrunner = rpc.DnsOnlyRunner() 293 result = rpcrunner.call_version([node_name])[node_name] 294 result.Raise("Can't get version information from node %s" % node_name, 295 prereq=True) 296 if constants.PROTOCOL_VERSION == result.payload: 297 logging.info("Communication to node %s fine, sw version %s match", 298 node_name, result.payload) 299 else: 300 raise errors.OpPrereqError("Version mismatch master version %s," 301 " node version %s" % 302 (constants.PROTOCOL_VERSION, result.payload), 303 errors.ECODE_ENVIRON) 304 305 vg_name = self.cfg.GetVGName() 306 if vg_name is not None: 307 vparams = {constants.NV_PVLIST: [vg_name]} 308 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 309 cname = self.cfg.GetClusterName() 310 result = rpcrunner.call_node_verify_light( 311 [node_name], vparams, cname, 312 self.cfg.GetClusterInfo().hvparams)[node_name] 313 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 314 if errmsgs: 315 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 316 "; ".join(errmsgs), errors.ECODE_ENVIRON)
317
318 - def _InitOpenVSwitch(self):
319 filled_ndparams = self.cfg.GetClusterInfo().FillND( 320 self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) 321 322 ovs = filled_ndparams.get(constants.ND_OVS, None) 323 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) 324 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) 325 326 if ovs: 327 if not ovs_link: 328 self.LogInfo("No physical interface for OpenvSwitch was given." 329 " OpenvSwitch will not have an outside connection. This" 330 " might not be what you want.") 331 332 result = self.rpc.call_node_configure_ovs( 333 self.new_node.name, ovs_name, ovs_link) 334 result.Raise("Failed to initialize OpenVSwitch on new node")
335
336 - def Exec(self, feedback_fn):
337 """Adds the new node to the cluster. 338 339 """ 340 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 341 "Not owning BGL" 342 343 # We adding a new node so we assume it's powered 344 self.new_node.powered = True 345 346 # for re-adds, reset the offline/drained/master-candidate flags; 347 # we need to reset here, otherwise offline would prevent RPC calls 348 # later in the procedure; this also means that if the re-add 349 # fails, we are left with a non-offlined, broken node 350 if self.op.readd: 351 self.new_node.offline = False 352 self.new_node.drained = False 353 self.LogInfo("Readding a node, the offline/drained flags were reset") 354 # if we demote the node, we do cleanup later in the procedure 355 self.new_node.master_candidate = self.master_candidate 356 if self.changed_primary_ip: 357 self.new_node.primary_ip = self.op.primary_ip 358 359 # copy the master/vm_capable flags 360 for attr in self._NFLAGS: 361 setattr(self.new_node, attr, getattr(self.op, attr)) 362 363 # notify the user about any possible mc promotion 364 if self.new_node.master_candidate: 365 self.LogInfo("Node will be a master candidate") 366 367 if self.op.ndparams: 368 self.new_node.ndparams = self.op.ndparams 369 else: 370 self.new_node.ndparams = {} 371 372 if self.op.hv_state: 373 self.new_node.hv_state_static = self.new_hv_state 374 375 if self.op.disk_state: 376 self.new_node.disk_state_static = self.new_disk_state 377 378 # Add node to our /etc/hosts, and add key to known_hosts 379 if self.cfg.GetClusterInfo().modify_etc_hosts: 380 master_node = self.cfg.GetMasterNode() 381 result = self.rpc.call_etc_hosts_modify( 382 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 383 self.hostname.ip) 384 result.Raise("Can't update hosts file with new host data") 385 386 if self.new_node.secondary_ip != self.new_node.primary_ip: 387 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 388 False) 389 390 node_verifier_uuids = [self.cfg.GetMasterNode()] 391 node_verify_param = { 392 constants.NV_NODELIST: ([self.new_node.name], {}), 393 # TODO: do a node-net-test as well? 394 } 395 396 result = self.rpc.call_node_verify( 397 node_verifier_uuids, node_verify_param, 398 self.cfg.GetClusterName(), 399 self.cfg.GetClusterInfo().hvparams) 400 for verifier in node_verifier_uuids: 401 result[verifier].Raise("Cannot communicate with node %s" % verifier) 402 nl_payload = result[verifier].payload[constants.NV_NODELIST] 403 if nl_payload: 404 for failed in nl_payload: 405 feedback_fn("ssh/hostname verification failed" 406 " (checking from %s): %s" % 407 (verifier, nl_payload[failed])) 408 raise errors.OpExecError("ssh/hostname verification failed") 409 410 self._InitOpenVSwitch() 411 412 if self.op.readd: 413 self.context.ReaddNode(self.new_node) 414 RedistributeAncillaryFiles(self) 415 # make sure we redistribute the config 416 self.cfg.Update(self.new_node, feedback_fn) 417 # and make sure the new node will not have old files around 418 if not self.new_node.master_candidate: 419 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 420 result.Warn("Node failed to demote itself from master candidate status", 421 self.LogWarning) 422 else: 423 self.context.AddNode(self.new_node, self.proc.GetECId()) 424 RedistributeAncillaryFiles(self)
425 426
427 -class LUNodeSetParams(LogicalUnit):
428 """Modifies the parameters of a node. 429 430 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 431 to the node role (as _ROLE_*) 432 @cvar _R2F: a dictionary from node role to tuples of flags 433 @cvar _FLAGS: a list of attribute names corresponding to the flags 434 435 """ 436 HPATH = "node-modify" 437 HTYPE = constants.HTYPE_NODE 438 REQ_BGL = False 439 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 440 _F2R = { 441 (True, False, False): _ROLE_CANDIDATE, 442 (False, True, False): _ROLE_DRAINED, 443 (False, False, True): _ROLE_OFFLINE, 444 (False, False, False): _ROLE_REGULAR, 445 } 446 _R2F = dict((v, k) for k, v in _F2R.items()) 447 _FLAGS = ["master_candidate", "drained", "offline"] 448
449 - def CheckArguments(self):
450 (self.op.node_uuid, self.op.node_name) = \ 451 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 452 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 453 self.op.master_capable, self.op.vm_capable, 454 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 455 self.op.disk_state] 456 if all_mods.count(None) == len(all_mods): 457 raise errors.OpPrereqError("Please pass at least one modification", 458 errors.ECODE_INVAL) 459 if all_mods.count(True) > 1: 460 raise errors.OpPrereqError("Can't set the node into more than one" 461 " state at the same time", 462 errors.ECODE_INVAL) 463 464 # Boolean value that tells us whether we might be demoting from MC 465 self.might_demote = (self.op.master_candidate is False or 466 self.op.offline is True or 467 self.op.drained is True or 468 self.op.master_capable is False) 469 470 if self.op.secondary_ip: 471 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 472 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 473 " address" % self.op.secondary_ip, 474 errors.ECODE_INVAL) 475 476 self.lock_all = self.op.auto_promote and self.might_demote 477 self.lock_instances = self.op.secondary_ip is not None
478
479 - def _InstanceFilter(self, instance):
480 """Filter for getting affected instances. 481 482 """ 483 return (instance.disk_template in constants.DTS_INT_MIRROR and 484 self.op.node_uuid in instance.all_nodes)
485
486 - def ExpandNames(self):
487 if self.lock_all: 488 self.needed_locks = { 489 locking.LEVEL_NODE: locking.ALL_SET, 490 491 # Block allocations when all nodes are locked 492 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 493 } 494 else: 495 self.needed_locks = { 496 locking.LEVEL_NODE: self.op.node_uuid, 497 } 498 499 # Since modifying a node can have severe effects on currently running 500 # operations the resource lock is at least acquired in shared mode 501 self.needed_locks[locking.LEVEL_NODE_RES] = \ 502 self.needed_locks[locking.LEVEL_NODE] 503 504 # Get all locks except nodes in shared mode; they are not used for anything 505 # but read-only access 506 self.share_locks = ShareAll() 507 self.share_locks[locking.LEVEL_NODE] = 0 508 self.share_locks[locking.LEVEL_NODE_RES] = 0 509 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 510 511 if self.lock_instances: 512 self.needed_locks[locking.LEVEL_INSTANCE] = \ 513 self.cfg.GetInstanceNames( 514 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
515
516 - def BuildHooksEnv(self):
517 """Build hooks env. 518 519 This runs on the master node. 520 521 """ 522 return { 523 "OP_TARGET": self.op.node_name, 524 "MASTER_CANDIDATE": str(self.op.master_candidate), 525 "OFFLINE": str(self.op.offline), 526 "DRAINED": str(self.op.drained), 527 "MASTER_CAPABLE": str(self.op.master_capable), 528 "VM_CAPABLE": str(self.op.vm_capable), 529 }
530
531 - def BuildHooksNodes(self):
532 """Build hooks nodes. 533 534 """ 535 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 536 return (nl, nl)
537
538 - def CheckPrereq(self):
539 """Check prerequisites. 540 541 This only checks the instance list against the existing names. 542 543 """ 544 node = self.cfg.GetNodeInfo(self.op.node_uuid) 545 if self.lock_instances: 546 affected_instances = \ 547 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 548 549 # Verify instance locks 550 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 551 wanted_instance_names = frozenset([inst.name for inst in 552 affected_instances.values()]) 553 if wanted_instance_names - owned_instance_names: 554 raise errors.OpPrereqError("Instances affected by changing node %s's" 555 " secondary IP address have changed since" 556 " locks were acquired, wanted '%s', have" 557 " '%s'; retry the operation" % 558 (node.name, 559 utils.CommaJoin(wanted_instance_names), 560 utils.CommaJoin(owned_instance_names)), 561 errors.ECODE_STATE) 562 else: 563 affected_instances = None 564 565 if (self.op.master_candidate is not None or 566 self.op.drained is not None or 567 self.op.offline is not None): 568 # we can't change the master's node flags 569 if node.uuid == self.cfg.GetMasterNode(): 570 raise errors.OpPrereqError("The master role can be changed" 571 " only via master-failover", 572 errors.ECODE_INVAL) 573 574 if self.op.master_candidate and not node.master_capable: 575 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 576 " it a master candidate" % node.name, 577 errors.ECODE_STATE) 578 579 if self.op.vm_capable is False: 580 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 581 if ipri or isec: 582 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 583 " the vm_capable flag" % node.name, 584 errors.ECODE_STATE) 585 586 if node.master_candidate and self.might_demote and not self.lock_all: 587 assert not self.op.auto_promote, "auto_promote set but lock_all not" 588 # check if after removing the current node, we're missing master 589 # candidates 590 (mc_remaining, mc_should, _) = \ 591 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 592 if mc_remaining < mc_should: 593 raise errors.OpPrereqError("Not enough master candidates, please" 594 " pass auto promote option to allow" 595 " promotion (--auto-promote or RAPI" 596 " auto_promote=True)", errors.ECODE_STATE) 597 598 self.old_flags = old_flags = (node.master_candidate, 599 node.drained, node.offline) 600 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 601 self.old_role = old_role = self._F2R[old_flags] 602 603 # Check for ineffective changes 604 for attr in self._FLAGS: 605 if getattr(self.op, attr) is False and getattr(node, attr) is False: 606 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 607 setattr(self.op, attr, None) 608 609 # Past this point, any flag change to False means a transition 610 # away from the respective state, as only real changes are kept 611 612 # TODO: We might query the real power state if it supports OOB 613 if SupportsOob(self.cfg, node): 614 if self.op.offline is False and not (node.powered or 615 self.op.powered is True): 616 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 617 " offline status can be reset") % 618 self.op.node_name, errors.ECODE_STATE) 619 elif self.op.powered is not None: 620 raise errors.OpPrereqError(("Unable to change powered state for node %s" 621 " as it does not support out-of-band" 622 " handling") % self.op.node_name, 623 errors.ECODE_STATE) 624 625 # If we're being deofflined/drained, we'll MC ourself if needed 626 if (self.op.drained is False or self.op.offline is False or 627 (self.op.master_capable and not node.master_capable)): 628 if _DecideSelfPromotion(self): 629 self.op.master_candidate = True 630 self.LogInfo("Auto-promoting node to master candidate") 631 632 # If we're no longer master capable, we'll demote ourselves from MC 633 if self.op.master_capable is False and node.master_candidate: 634 self.LogInfo("Demoting from master candidate") 635 self.op.master_candidate = False 636 637 # Compute new role 638 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 639 if self.op.master_candidate: 640 new_role = self._ROLE_CANDIDATE 641 elif self.op.drained: 642 new_role = self._ROLE_DRAINED 643 elif self.op.offline: 644 new_role = self._ROLE_OFFLINE 645 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 646 # False is still in new flags, which means we're un-setting (the 647 # only) True flag 648 new_role = self._ROLE_REGULAR 649 else: # no new flags, nothing, keep old role 650 new_role = old_role 651 652 self.new_role = new_role 653 654 if old_role == self._ROLE_OFFLINE and new_role != old_role: 655 # Trying to transition out of offline status 656 result = self.rpc.call_version([node.uuid])[node.uuid] 657 if result.fail_msg: 658 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 659 " to report its version: %s" % 660 (node.name, result.fail_msg), 661 errors.ECODE_STATE) 662 else: 663 self.LogWarning("Transitioning node from offline to online state" 664 " without using re-add. Please make sure the node" 665 " is healthy!") 666 667 # When changing the secondary ip, verify if this is a single-homed to 668 # multi-homed transition or vice versa, and apply the relevant 669 # restrictions. 670 if self.op.secondary_ip: 671 # Ok even without locking, because this can't be changed by any LU 672 master = self.cfg.GetMasterNodeInfo() 673 master_singlehomed = master.secondary_ip == master.primary_ip 674 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 675 if self.op.force and node.uuid == master.uuid: 676 self.LogWarning("Transitioning from single-homed to multi-homed" 677 " cluster; all nodes will require a secondary IP" 678 " address") 679 else: 680 raise errors.OpPrereqError("Changing the secondary ip on a" 681 " single-homed cluster requires the" 682 " --force option to be passed, and the" 683 " target node to be the master", 684 errors.ECODE_INVAL) 685 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 686 if self.op.force and node.uuid == master.uuid: 687 self.LogWarning("Transitioning from multi-homed to single-homed" 688 " cluster; secondary IP addresses will have to be" 689 " removed") 690 else: 691 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 692 " same as the primary IP on a multi-homed" 693 " cluster, unless the --force option is" 694 " passed, and the target node is the" 695 " master", errors.ECODE_INVAL) 696 697 assert not (set([inst.name for inst in affected_instances.values()]) - 698 self.owned_locks(locking.LEVEL_INSTANCE)) 699 700 if node.offline: 701 if affected_instances: 702 msg = ("Cannot change secondary IP address: offline node has" 703 " instances (%s) configured to use it" % 704 utils.CommaJoin( 705 [inst.name for inst in affected_instances.values()])) 706 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 707 else: 708 # On online nodes, check that no instances are running, and that 709 # the node has the new ip and we can reach it. 710 for instance in affected_instances.values(): 711 CheckInstanceState(self, instance, INSTANCE_DOWN, 712 msg="cannot change secondary ip") 713 714 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 715 if master.uuid != node.uuid: 716 # check reachability from master secondary ip to new secondary ip 717 if not netutils.TcpPing(self.op.secondary_ip, 718 constants.DEFAULT_NODED_PORT, 719 source=master.secondary_ip): 720 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 721 " based ping to node daemon port", 722 errors.ECODE_ENVIRON) 723 724 if self.op.ndparams: 725 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 726 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 727 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 728 "node", "cluster or group") 729 self.new_ndparams = new_ndparams 730 731 if self.op.hv_state: 732 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 733 node.hv_state_static) 734 735 if self.op.disk_state: 736 self.new_disk_state = \ 737 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
738
739 - def Exec(self, feedback_fn):
740 """Modifies a node. 741 742 """ 743 node = self.cfg.GetNodeInfo(self.op.node_uuid) 744 result = [] 745 746 if self.op.ndparams: 747 node.ndparams = self.new_ndparams 748 749 if self.op.powered is not None: 750 node.powered = self.op.powered 751 752 if self.op.hv_state: 753 node.hv_state_static = self.new_hv_state 754 755 if self.op.disk_state: 756 node.disk_state_static = self.new_disk_state 757 758 for attr in ["master_capable", "vm_capable"]: 759 val = getattr(self.op, attr) 760 if val is not None: 761 setattr(node, attr, val) 762 result.append((attr, str(val))) 763 764 if self.new_role != self.old_role: 765 # Tell the node to demote itself, if no longer MC and not offline 766 if self.old_role == self._ROLE_CANDIDATE and \ 767 self.new_role != self._ROLE_OFFLINE: 768 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 769 if msg: 770 self.LogWarning("Node failed to demote itself: %s", msg) 771 772 new_flags = self._R2F[self.new_role] 773 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 774 if of != nf: 775 result.append((desc, str(nf))) 776 (node.master_candidate, node.drained, node.offline) = new_flags 777 778 # we locked all nodes, we adjust the CP before updating this node 779 if self.lock_all: 780 AdjustCandidatePool(self, [node.uuid]) 781 782 if self.op.secondary_ip: 783 node.secondary_ip = self.op.secondary_ip 784 result.append(("secondary_ip", self.op.secondary_ip)) 785 786 # this will trigger configuration file update, if needed 787 self.cfg.Update(node, feedback_fn) 788 789 # this will trigger job queue propagation or cleanup if the mc 790 # flag changed 791 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 792 self.context.ReaddNode(node) 793 794 return result
795 796
797 -class LUNodePowercycle(NoHooksLU):
798 """Powercycles a node. 799 800 """ 801 REQ_BGL = False 802
803 - def CheckArguments(self):
804 (self.op.node_uuid, self.op.node_name) = \ 805 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 806 807 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 808 raise errors.OpPrereqError("The node is the master and the force" 809 " parameter was not set", 810 errors.ECODE_INVAL)
811
812 - def ExpandNames(self):
813 """Locking for PowercycleNode. 814 815 This is a last-resort option and shouldn't block on other 816 jobs. Therefore, we grab no locks. 817 818 """ 819 self.needed_locks = {}
820
821 - def Exec(self, feedback_fn):
822 """Reboots a node. 823 824 """ 825 default_hypervisor = self.cfg.GetHypervisorType() 826 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 827 result = self.rpc.call_node_powercycle(self.op.node_uuid, 828 default_hypervisor, 829 hvparams) 830 result.Raise("Failed to schedule the reboot") 831 return result.payload
832 833
834 -def _GetNodeInstancesInner(cfg, fn):
835 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
836 837
838 -def _GetNodePrimaryInstances(cfg, node_uuid):
839 """Returns primary instances on a node. 840 841 """ 842 return _GetNodeInstancesInner(cfg, 843 lambda inst: node_uuid == inst.primary_node)
844 845
846 -def _GetNodeSecondaryInstances(cfg, node_uuid):
847 """Returns secondary instances on a node. 848 849 """ 850 return _GetNodeInstancesInner(cfg, 851 lambda inst: node_uuid in inst.secondary_nodes)
852 853
854 -def _GetNodeInstances(cfg, node_uuid):
855 """Returns a list of all primary and secondary instances on a node. 856 857 """ 858 859 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
860 861
862 -class LUNodeEvacuate(NoHooksLU):
863 """Evacuates instances off a list of nodes. 864 865 """ 866 REQ_BGL = False 867
868 - def CheckArguments(self):
869 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
870
871 - def ExpandNames(self):
872 (self.op.node_uuid, self.op.node_name) = \ 873 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 874 875 if self.op.remote_node is not None: 876 (self.op.remote_node_uuid, self.op.remote_node) = \ 877 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 878 self.op.remote_node) 879 assert self.op.remote_node 880 881 if self.op.node_uuid == self.op.remote_node_uuid: 882 raise errors.OpPrereqError("Can not use evacuated node as a new" 883 " secondary node", errors.ECODE_INVAL) 884 885 if self.op.mode != constants.NODE_EVAC_SEC: 886 raise errors.OpPrereqError("Without the use of an iallocator only" 887 " secondary instances can be evacuated", 888 errors.ECODE_INVAL) 889 890 # Declare locks 891 self.share_locks = ShareAll() 892 self.needed_locks = { 893 locking.LEVEL_INSTANCE: [], 894 locking.LEVEL_NODEGROUP: [], 895 locking.LEVEL_NODE: [], 896 } 897 898 # Determine nodes (via group) optimistically, needs verification once locks 899 # have been acquired 900 self.lock_nodes = self._DetermineNodes()
901
902 - def _DetermineNodes(self):
903 """Gets the list of node UUIDs to operate on. 904 905 """ 906 if self.op.remote_node is None: 907 # Iallocator will choose any node(s) in the same group 908 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 909 else: 910 group_nodes = frozenset([self.op.remote_node_uuid]) 911 912 # Determine nodes to be locked 913 return set([self.op.node_uuid]) | group_nodes
914
915 - def _DetermineInstances(self):
916 """Builds list of instances to operate on. 917 918 """ 919 assert self.op.mode in constants.NODE_EVAC_MODES 920 921 if self.op.mode == constants.NODE_EVAC_PRI: 922 # Primary instances only 923 inst_fn = _GetNodePrimaryInstances 924 assert self.op.remote_node is None, \ 925 "Evacuating primary instances requires iallocator" 926 elif self.op.mode == constants.NODE_EVAC_SEC: 927 # Secondary instances only 928 inst_fn = _GetNodeSecondaryInstances 929 else: 930 # All instances 931 assert self.op.mode == constants.NODE_EVAC_ALL 932 inst_fn = _GetNodeInstances 933 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 934 # per instance 935 raise errors.OpPrereqError("Due to an issue with the iallocator" 936 " interface it is not possible to evacuate" 937 " all instances at once; specify explicitly" 938 " whether to evacuate primary or secondary" 939 " instances", 940 errors.ECODE_INVAL) 941 942 return inst_fn(self.cfg, self.op.node_uuid)
943
944 - def DeclareLocks(self, level):
945 if level == locking.LEVEL_INSTANCE: 946 # Lock instances optimistically, needs verification once node and group 947 # locks have been acquired 948 self.needed_locks[locking.LEVEL_INSTANCE] = \ 949 set(i.name for i in self._DetermineInstances()) 950 951 elif level == locking.LEVEL_NODEGROUP: 952 # Lock node groups for all potential target nodes optimistically, needs 953 # verification once nodes have been acquired 954 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 955 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 956 957 elif level == locking.LEVEL_NODE: 958 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
959
960 - def CheckPrereq(self):
961 # Verify locks 962 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 963 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 964 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 965 966 need_nodes = self._DetermineNodes() 967 968 if not owned_nodes.issuperset(need_nodes): 969 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 970 " locks were acquired, current nodes are" 971 " are '%s', used to be '%s'; retry the" 972 " operation" % 973 (self.op.node_name, 974 utils.CommaJoin(need_nodes), 975 utils.CommaJoin(owned_nodes)), 976 errors.ECODE_STATE) 977 978 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 979 if owned_groups != wanted_groups: 980 raise errors.OpExecError("Node groups changed since locks were acquired," 981 " current groups are '%s', used to be '%s';" 982 " retry the operation" % 983 (utils.CommaJoin(wanted_groups), 984 utils.CommaJoin(owned_groups))) 985 986 # Determine affected instances 987 self.instances = self._DetermineInstances() 988 self.instance_names = [i.name for i in self.instances] 989 990 if set(self.instance_names) != owned_instance_names: 991 raise errors.OpExecError("Instances on node '%s' changed since locks" 992 " were acquired, current instances are '%s'," 993 " used to be '%s'; retry the operation" % 994 (self.op.node_name, 995 utils.CommaJoin(self.instance_names), 996 utils.CommaJoin(owned_instance_names))) 997 998 if self.instance_names: 999 self.LogInfo("Evacuating instances from node '%s': %s", 1000 self.op.node_name, 1001 utils.CommaJoin(utils.NiceSort(self.instance_names))) 1002 else: 1003 self.LogInfo("No instances to evacuate from node '%s'", 1004 self.op.node_name) 1005 1006 if self.op.remote_node is not None: 1007 for i in self.instances: 1008 if i.primary_node == self.op.remote_node_uuid: 1009 raise errors.OpPrereqError("Node %s is the primary node of" 1010 " instance %s, cannot use it as" 1011 " secondary" % 1012 (self.op.remote_node, i.name), 1013 errors.ECODE_INVAL)
1014
1015 - def Exec(self, feedback_fn):
1016 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 1017 1018 if not self.instance_names: 1019 # No instances to evacuate 1020 jobs = [] 1021 1022 elif self.op.iallocator is not None: 1023 # TODO: Implement relocation to other group 1024 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode, 1025 instances=list(self.instance_names)) 1026 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1027 1028 ial.Run(self.op.iallocator) 1029 1030 if not ial.success: 1031 raise errors.OpPrereqError("Can't compute node evacuation using" 1032 " iallocator '%s': %s" % 1033 (self.op.iallocator, ial.info), 1034 errors.ECODE_NORES) 1035 1036 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1037 1038 elif self.op.remote_node is not None: 1039 assert self.op.mode == constants.NODE_EVAC_SEC 1040 jobs = [ 1041 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1042 remote_node=self.op.remote_node, 1043 disks=[], 1044 mode=constants.REPLACE_DISK_CHG, 1045 early_release=self.op.early_release)] 1046 for instance_name in self.instance_names] 1047 1048 else: 1049 raise errors.ProgrammerError("No iallocator or remote node") 1050 1051 return ResultWithJobs(jobs)
1052 1053
1054 -class LUNodeMigrate(LogicalUnit):
1055 """Migrate all instances from a node. 1056 1057 """ 1058 HPATH = "node-migrate" 1059 HTYPE = constants.HTYPE_NODE 1060 REQ_BGL = False 1061
1062 - def CheckArguments(self):
1063 pass
1064
1065 - def ExpandNames(self):
1066 (self.op.node_uuid, self.op.node_name) = \ 1067 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1068 1069 self.share_locks = ShareAll() 1070 self.needed_locks = { 1071 locking.LEVEL_NODE: [self.op.node_uuid], 1072 }
1073
1074 - def BuildHooksEnv(self):
1075 """Build hooks env. 1076 1077 This runs on the master, the primary and all the secondaries. 1078 1079 """ 1080 return { 1081 "NODE_NAME": self.op.node_name, 1082 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1083 }
1084
1085 - def BuildHooksNodes(self):
1086 """Build hooks nodes. 1087 1088 """ 1089 nl = [self.cfg.GetMasterNode()] 1090 return (nl, nl)
1091
1092 - def CheckPrereq(self):
1093 pass
1094
1095 - def Exec(self, feedback_fn):
1096 # Prepare jobs for migration instances 1097 jobs = [ 1098 [opcodes.OpInstanceMigrate( 1099 instance_name=inst.name, 1100 mode=self.op.mode, 1101 live=self.op.live, 1102 iallocator=self.op.iallocator, 1103 target_node=self.op.target_node, 1104 allow_runtime_changes=self.op.allow_runtime_changes, 1105 ignore_ipolicy=self.op.ignore_ipolicy)] 1106 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1107 1108 # TODO: Run iallocator in this opcode and pass correct placement options to 1109 # OpInstanceMigrate. Since other jobs can modify the cluster between 1110 # running the iallocator and the actual migration, a good consistency model 1111 # will have to be found. 1112 1113 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1114 frozenset([self.op.node_uuid])) 1115 1116 return ResultWithJobs(jobs)
1117 1118
1119 -def _GetStorageTypeArgs(cfg, storage_type):
1120 """Returns the arguments for a storage type. 1121 1122 """ 1123 # Special case for file storage 1124 if storage_type == constants.ST_FILE: 1125 # storage.FileStorage wants a list of storage directories 1126 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]] 1127 1128 return []
1129 1130
1131 -class LUNodeModifyStorage(NoHooksLU):
1132 """Logical unit for modifying a storage volume on a node. 1133 1134 """ 1135 REQ_BGL = False 1136
1137 - def CheckArguments(self):
1138 (self.op.node_uuid, self.op.node_name) = \ 1139 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1140 1141 storage_type = self.op.storage_type 1142 1143 try: 1144 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1145 except KeyError: 1146 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1147 " modified" % storage_type, 1148 errors.ECODE_INVAL) 1149 1150 diff = set(self.op.changes.keys()) - modifiable 1151 if diff: 1152 raise errors.OpPrereqError("The following fields can not be modified for" 1153 " storage units of type '%s': %r" % 1154 (storage_type, list(diff)), 1155 errors.ECODE_INVAL)
1156
1157 - def CheckPrereq(self):
1158 """Check prerequisites. 1159 1160 """ 1161 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1162
1163 - def ExpandNames(self):
1164 self.needed_locks = { 1165 locking.LEVEL_NODE: self.op.node_uuid, 1166 }
1167
1168 - def Exec(self, feedback_fn):
1169 """Computes the list of nodes and their attributes. 1170 1171 """ 1172 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1173 result = self.rpc.call_storage_modify(self.op.node_uuid, 1174 self.op.storage_type, st_args, 1175 self.op.name, self.op.changes) 1176 result.Raise("Failed to modify storage unit '%s' on %s" % 1177 (self.op.name, self.op.node_name))
1178 1179
1180 -class NodeQuery(QueryBase):
1181 FIELDS = query.NODE_FIELDS 1182
1183 - def ExpandNames(self, lu):
1184 lu.needed_locks = {} 1185 lu.share_locks = ShareAll() 1186 1187 if self.names: 1188 (self.wanted, _) = GetWantedNodes(lu, self.names) 1189 else: 1190 self.wanted = locking.ALL_SET 1191 1192 self.do_locking = (self.use_locking and 1193 query.NQ_LIVE in self.requested_data) 1194 1195 if self.do_locking: 1196 # If any non-static field is requested we need to lock the nodes 1197 lu.needed_locks[locking.LEVEL_NODE] = self.wanted 1198 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1199
1200 - def DeclareLocks(self, lu, level):
1201 pass
1202
1203 - def _GetQueryData(self, lu):
1204 """Computes the list of nodes and their attributes. 1205 1206 """ 1207 all_info = lu.cfg.GetAllNodesInfo() 1208 1209 node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE) 1210 1211 # Gather data as requested 1212 if query.NQ_LIVE in self.requested_data: 1213 # filter out non-vm_capable nodes 1214 toquery_node_uuids = [node.uuid for node in all_info.values() 1215 if node.vm_capable and node.uuid in node_uuids] 1216 default_template = lu.cfg.GetClusterInfo().enabled_disk_templates[0] 1217 raw_storage_units = utils.storage.GetStorageUnits( 1218 lu.cfg, [default_template]) 1219 storage_units = rpc.PrepareStorageUnitsForNodes( 1220 lu.cfg, raw_storage_units, toquery_node_uuids) 1221 default_hypervisor = lu.cfg.GetHypervisorType() 1222 hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor] 1223 hvspecs = [(default_hypervisor, hvparams)] 1224 node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units, 1225 hvspecs) 1226 live_data = dict( 1227 (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, default_template)) 1228 for (uuid, nresult) in node_data.items() 1229 if not nresult.fail_msg and nresult.payload) 1230 else: 1231 live_data = None 1232 1233 if query.NQ_INST in self.requested_data: 1234 node_to_primary = dict([(uuid, set()) for uuid in node_uuids]) 1235 node_to_secondary = dict([(uuid, set()) for uuid in node_uuids]) 1236 1237 inst_data = lu.cfg.GetAllInstancesInfo() 1238 inst_uuid_to_inst_name = {} 1239 1240 for inst in inst_data.values(): 1241 inst_uuid_to_inst_name[inst.uuid] = inst.name 1242 if inst.primary_node in node_to_primary: 1243 node_to_primary[inst.primary_node].add(inst.uuid) 1244 for secnode in inst.secondary_nodes: 1245 if secnode in node_to_secondary: 1246 node_to_secondary[secnode].add(inst.uuid) 1247 else: 1248 node_to_primary = None 1249 node_to_secondary = None 1250 inst_uuid_to_inst_name = None 1251 1252 if query.NQ_OOB in self.requested_data: 1253 oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node))) 1254 for uuid, node in all_info.iteritems()) 1255 else: 1256 oob_support = None 1257 1258 if query.NQ_GROUP in self.requested_data: 1259 groups = lu.cfg.GetAllNodeGroupsInfo() 1260 else: 1261 groups = {} 1262 1263 return query.NodeQueryData([all_info[uuid] for uuid in node_uuids], 1264 live_data, lu.cfg.GetMasterNode(), 1265 node_to_primary, node_to_secondary, 1266 inst_uuid_to_inst_name, groups, oob_support, 1267 lu.cfg.GetClusterInfo())
1268 1269
1270 -class LUNodeQuery(NoHooksLU):
1271 """Logical unit for querying nodes. 1272 1273 """ 1274 # pylint: disable=W0142 1275 REQ_BGL = False 1276
1277 - def CheckArguments(self):
1278 self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names), 1279 self.op.output_fields, self.op.use_locking)
1280
1281 - def ExpandNames(self):
1282 self.nq.ExpandNames(self)
1283
1284 - def DeclareLocks(self, level):
1285 self.nq.DeclareLocks(self, level)
1286
1287 - def Exec(self, feedback_fn):
1288 return self.nq.OldStyleQuery(self)
1289 1290
1291 -def _CheckOutputFields(fields, selected):
1292 """Checks whether all selected fields are valid according to fields. 1293 1294 @type fields: L{utils.FieldSet} 1295 @param fields: fields set 1296 @type selected: L{utils.FieldSet} 1297 @param selected: fields set 1298 1299 """ 1300 delta = fields.NonMatching(selected) 1301 if delta: 1302 raise errors.OpPrereqError("Unknown output fields selected: %s" 1303 % ",".join(delta), errors.ECODE_INVAL)
1304 1305
1306 -class LUNodeQueryvols(NoHooksLU):
1307 """Logical unit for getting volumes on node(s). 1308 1309 """ 1310 REQ_BGL = False 1311
1312 - def CheckArguments(self):
1317
1318 - def ExpandNames(self):
1319 self.share_locks = ShareAll() 1320 1321 if self.op.nodes: 1322 self.needed_locks = { 1323 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1324 } 1325 else: 1326 self.needed_locks = { 1327 locking.LEVEL_NODE: locking.ALL_SET, 1328 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1329 }
1330
1331 - def Exec(self, feedback_fn):
1332 """Computes the list of nodes and their attributes. 1333 1334 """ 1335 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1336 volumes = self.rpc.call_node_volumes(node_uuids) 1337 1338 ilist = self.cfg.GetAllInstancesInfo() 1339 vol2inst = MapInstanceLvsToNodes(ilist.values()) 1340 1341 output = [] 1342 for node_uuid in node_uuids: 1343 nresult = volumes[node_uuid] 1344 if nresult.offline: 1345 continue 1346 msg = nresult.fail_msg 1347 if msg: 1348 self.LogWarning("Can't compute volume data on node %s: %s", 1349 self.cfg.GetNodeName(node_uuid), msg) 1350 continue 1351 1352 node_vols = sorted(nresult.payload, 1353 key=operator.itemgetter(constants.VF_DEV)) 1354 1355 for vol in node_vols: 1356 node_output = [] 1357 for field in self.op.output_fields: 1358 if field == constants.VF_NODE: 1359 val = self.cfg.GetNodeName(node_uuid) 1360 elif field == constants.VF_PHYS: 1361 val = vol[constants.VF_DEV] 1362 elif field == constants.VF_VG: 1363 val = vol[constants.VF_VG] 1364 elif field == constants.VF_NAME: 1365 val = vol[constants.VF_NAME] 1366 elif field == constants.VF_SIZE: 1367 val = int(float(vol[constants.VF_SIZE])) 1368 elif field == constants.VF_INSTANCE: 1369 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + 1370 vol[constants.VF_NAME]), None) 1371 if inst is not None: 1372 val = inst.name 1373 else: 1374 val = "-" 1375 else: 1376 raise errors.ParameterError(field) 1377 node_output.append(str(val)) 1378 1379 output.append(node_output) 1380 1381 return output
1382 1383
1384 -class LUNodeQueryStorage(NoHooksLU):
1385 """Logical unit for getting information on storage units on node(s). 1386 1387 """ 1388 REQ_BGL = False 1389
1390 - def CheckArguments(self):
1391 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1392 self.op.output_fields)
1393
1394 - def ExpandNames(self):
1395 self.share_locks = ShareAll() 1396 1397 if self.op.nodes: 1398 self.needed_locks = { 1399 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1400 } 1401 else: 1402 self.needed_locks = { 1403 locking.LEVEL_NODE: locking.ALL_SET, 1404 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1405 }
1406
1407 - def _DetermineStorageType(self):
1408 """Determines the default storage type of the cluster. 1409 1410 """ 1411 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates 1412 default_storage_type = \ 1413 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] 1414 return default_storage_type
1415
1416 - def CheckPrereq(self):
1417 """Check prerequisites. 1418 1419 """ 1420 if self.op.storage_type: 1421 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1422 self.storage_type = self.op.storage_type 1423 else: 1424 self.storage_type = self._DetermineStorageType() 1425 if self.storage_type not in constants.STS_REPORT: 1426 raise errors.OpPrereqError( 1427 "Storage reporting for storage type '%s' is not supported. Please" 1428 " use the --storage-type option to specify one of the supported" 1429 " storage types (%s) or set the default disk template to one that" 1430 " supports storage reporting." % 1431 (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1432
1433 - def Exec(self, feedback_fn):
1434 """Computes the list of nodes and their attributes. 1435 1436 """ 1437 if self.op.storage_type: 1438 self.storage_type = self.op.storage_type 1439 else: 1440 self.storage_type = self._DetermineStorageType() 1441 1442 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1443 1444 # Always get name to sort by 1445 if constants.SF_NAME in self.op.output_fields: 1446 fields = self.op.output_fields[:] 1447 else: 1448 fields = [constants.SF_NAME] + self.op.output_fields 1449 1450 # Never ask for node or type as it's only known to the LU 1451 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1452 while extra in fields: 1453 fields.remove(extra) 1454 1455 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1456 name_idx = field_idx[constants.SF_NAME] 1457 1458 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) 1459 data = self.rpc.call_storage_list(self.node_uuids, 1460 self.storage_type, st_args, 1461 self.op.name, fields) 1462 1463 result = [] 1464 1465 for node_uuid in utils.NiceSort(self.node_uuids): 1466 node_name = self.cfg.GetNodeName(node_uuid) 1467 nresult = data[node_uuid] 1468 if nresult.offline: 1469 continue 1470 1471 msg = nresult.fail_msg 1472 if msg: 1473 self.LogWarning("Can't get storage data from node %s: %s", 1474 node_name, msg) 1475 continue 1476 1477 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1478 1479 for name in utils.NiceSort(rows.keys()): 1480 row = rows[name] 1481 1482 out = [] 1483 1484 for field in self.op.output_fields: 1485 if field == constants.SF_NODE: 1486 val = node_name 1487 elif field == constants.SF_TYPE: 1488 val = self.storage_type 1489 elif field in field_idx: 1490 val = row[field_idx[field]] 1491 else: 1492 raise errors.ParameterError(field) 1493 1494 out.append(val) 1495 1496 result.append(out) 1497 1498 return result
1499 1500
1501 -class LUNodeRemove(LogicalUnit):
1502 """Logical unit for removing a node. 1503 1504 """ 1505 HPATH = "node-remove" 1506 HTYPE = constants.HTYPE_NODE 1507
1508 - def BuildHooksEnv(self):
1509 """Build hooks env. 1510 1511 """ 1512 return { 1513 "OP_TARGET": self.op.node_name, 1514 "NODE_NAME": self.op.node_name, 1515 }
1516
1517 - def BuildHooksNodes(self):
1518 """Build hooks nodes. 1519 1520 This doesn't run on the target node in the pre phase as a failed 1521 node would then be impossible to remove. 1522 1523 """ 1524 all_nodes = self.cfg.GetNodeList() 1525 try: 1526 all_nodes.remove(self.op.node_uuid) 1527 except ValueError: 1528 pass 1529 return (all_nodes, all_nodes)
1530
1531 - def CheckPrereq(self):
1532 """Check prerequisites. 1533 1534 This checks: 1535 - the node exists in the configuration 1536 - it does not have primary or secondary instances 1537 - it's not the master 1538 1539 Any errors are signaled by raising errors.OpPrereqError. 1540 1541 """ 1542 (self.op.node_uuid, self.op.node_name) = \ 1543 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1544 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1545 assert node is not None 1546 1547 masternode = self.cfg.GetMasterNode() 1548 if node.uuid == masternode: 1549 raise errors.OpPrereqError("Node is the master node, failover to another" 1550 " node is required", errors.ECODE_INVAL) 1551 1552 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1553 if node.uuid in instance.all_nodes: 1554 raise errors.OpPrereqError("Instance %s is still running on the node," 1555 " please remove first" % instance.name, 1556 errors.ECODE_INVAL) 1557 self.op.node_name = node.name 1558 self.node = node
1559
1560 - def Exec(self, feedback_fn):
1561 """Removes the node from the cluster. 1562 1563 """ 1564 logging.info("Stopping the node daemon and removing configs from node %s", 1565 self.node.name) 1566 1567 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1568 1569 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1570 "Not owning BGL" 1571 1572 # Promote nodes to master candidate as needed 1573 AdjustCandidatePool(self, exceptions=[self.node.uuid]) 1574 self.context.RemoveNode(self.node) 1575 1576 # Run post hooks on the node before it's removed 1577 RunPostHook(self, self.node.name) 1578 1579 # we have to call this by name rather than by UUID, as the node is no longer 1580 # in the config 1581 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1582 msg = result.fail_msg 1583 if msg: 1584 self.LogWarning("Errors encountered on the remote node while leaving" 1585 " the cluster: %s", msg) 1586 1587 # Remove node from our /etc/hosts 1588 if self.cfg.GetClusterInfo().modify_etc_hosts: 1589 master_node_uuid = self.cfg.GetMasterNode() 1590 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1591 constants.ETC_HOSTS_REMOVE, 1592 self.node.name, None) 1593 result.Raise("Can't update hosts file with new host data") 1594 RedistributeAncillaryFiles(self)
1595 1596
1597 -class LURepairNodeStorage(NoHooksLU):
1598 """Repairs the volume group on a node. 1599 1600 """ 1601 REQ_BGL = False 1602
1603 - def CheckArguments(self):
1604 (self.op.node_uuid, self.op.node_name) = \ 1605 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1606 1607 storage_type = self.op.storage_type 1608 1609 if (constants.SO_FIX_CONSISTENCY not in 1610 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1611 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1612 " repaired" % storage_type, 1613 errors.ECODE_INVAL)
1614
1615 - def ExpandNames(self):
1616 self.needed_locks = { 1617 locking.LEVEL_NODE: [self.op.node_uuid], 1618 }
1619
1620 - def _CheckFaultyDisks(self, instance, node_uuid):
1621 """Ensure faulty disks abort the opcode or at least warn.""" 1622 try: 1623 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1624 node_uuid, True): 1625 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1626 " node '%s'" % 1627 (instance.name, 1628 self.cfg.GetNodeName(node_uuid)), 1629 errors.ECODE_STATE) 1630 except errors.OpPrereqError, err: 1631 if self.op.ignore_consistency: 1632 self.LogWarning(str(err.args[0])) 1633 else: 1634 raise
1635
1636 - def CheckPrereq(self):
1637 """Check prerequisites. 1638 1639 """ 1640 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1641 1642 # Check whether any instance on this node has faulty disks 1643 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1644 if not inst.disks_active: 1645 continue 1646 check_nodes = set(inst.all_nodes) 1647 check_nodes.discard(self.op.node_uuid) 1648 for inst_node_uuid in check_nodes: 1649 self._CheckFaultyDisks(inst, inst_node_uuid)
1650
1651 - def Exec(self, feedback_fn):
1652 feedback_fn("Repairing storage unit '%s' on %s ..." % 1653 (self.op.name, self.op.node_name)) 1654 1655 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1656 result = self.rpc.call_storage_execute(self.op.node_uuid, 1657 self.op.storage_type, st_args, 1658 self.op.name, 1659 constants.SO_FIX_CONSISTENCY) 1660 result.Raise("Failed to repair storage unit '%s' on %s" % 1661 (self.op.name, self.op.node_name))
1662