Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with nodes.""" 
  32   
  33  import logging 
  34  import operator 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import netutils 
  40  from ganeti import objects 
  41  from ganeti import opcodes 
  42  import ganeti.rpc.node as rpc 
  43  from ganeti import utils 
  44  from ganeti.masterd import iallocator 
  45   
  46  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
  47  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  48    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  49    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  50    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  51    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  52    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  53    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  54    FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \ 
  55    AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \ 
  56    EnsureKvmdOnNodes 
  57   
  58   
59 -def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate. 61 62 """ 63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 65 # the new node will increase mc_max with one, so: 66 mc_should = min(mc_should + 1, cp_size) 67 return mc_now < mc_should
68 69
70 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip. 72 73 @type lu: L{LogicalUnit} 74 @param lu: the LU on behalf of which we make the check 75 @type node: L{objects.Node} 76 @param node: the node to check 77 @type secondary_ip: string 78 @param secondary_ip: the ip to check 79 @type prereq: boolean 80 @param prereq: whether to throw a prerequisite or an execute error 81 @raise errors.OpPrereqError: if the node doesn't have the ip, 82 and prereq=True 83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 84 85 """ 86 # this can be called with a new node, which has no UUID yet, so perform the 87 # RPC call using its name 88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 89 result.Raise("Failure checking secondary ip on node %s" % node.name, 90 prereq=prereq, ecode=errors.ECODE_ENVIRON) 91 if not result.payload: 92 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 93 " please fix and re-run this command" % secondary_ip) 94 if prereq: 95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 96 else: 97 raise errors.OpExecError(msg)
98 99
100 -class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster. 102 103 """ 104 HPATH = "node-add" 105 HTYPE = constants.HTYPE_NODE 106 _NFLAGS = ["master_capable", "vm_capable"] 107
108 - def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 110 # validate/normalize the node name 111 self.hostname = netutils.GetHostname(name=self.op.node_name, 112 family=self.primary_ip_family) 113 self.op.node_name = self.hostname.name 114 115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 116 raise errors.OpPrereqError("Cannot readd the master node", 117 errors.ECODE_STATE) 118 119 if self.op.readd and self.op.group: 120 raise errors.OpPrereqError("Cannot pass a node group when a node is" 121 " being readded", errors.ECODE_INVAL)
122
123 - def BuildHooksEnv(self):
124 """Build hooks env. 125 126 This will run on all nodes before, and on all nodes + the new node after. 127 128 """ 129 return { 130 "OP_TARGET": self.op.node_name, 131 "NODE_NAME": self.op.node_name, 132 "NODE_PIP": self.op.primary_ip, 133 "NODE_SIP": self.op.secondary_ip, 134 "MASTER_CAPABLE": str(self.op.master_capable), 135 "VM_CAPABLE": str(self.op.vm_capable), 136 }
137
138 - def BuildHooksNodes(self):
139 """Build hooks nodes. 140 141 """ 142 hook_nodes = self.cfg.GetNodeList() 143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 144 if new_node_info is not None: 145 # Exclude added node 146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 147 148 # add the new node as post hook node by name; it does not have an UUID yet 149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 - def CheckPrereq(self):
155 """Check prerequisites. 156 157 This checks: 158 - the new node is not already in the config 159 - it is resolvable 160 - its parameters (single/dual homed) matches the cluster 161 162 Any errors are signaled by raising errors.OpPrereqError. 163 164 """ 165 node_name = self.hostname.name 166 self.op.primary_ip = self.hostname.ip 167 if self.op.secondary_ip is None: 168 if self.primary_ip_family == netutils.IP6Address.family: 169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 170 " IPv4 address must be given as secondary", 171 errors.ECODE_INVAL) 172 self.op.secondary_ip = self.op.primary_ip 173 174 secondary_ip = self.op.secondary_ip 175 if not netutils.IP4Address.IsValid(secondary_ip): 176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 177 " address" % secondary_ip, errors.ECODE_INVAL) 178 179 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 180 if not self.op.readd and existing_node_info is not None: 181 raise errors.OpPrereqError("Node %s is already in the configuration" % 182 node_name, errors.ECODE_EXISTS) 183 elif self.op.readd and existing_node_info is None: 184 raise errors.OpPrereqError("Node %s is not in the configuration" % 185 node_name, errors.ECODE_NOENT) 186 187 self.changed_primary_ip = False 188 189 for existing_node in self.cfg.GetAllNodesInfo().values(): 190 if self.op.readd and node_name == existing_node.name: 191 if existing_node.secondary_ip != secondary_ip: 192 raise errors.OpPrereqError("Readded node doesn't have the same IP" 193 " address configuration as before", 194 errors.ECODE_INVAL) 195 if existing_node.primary_ip != self.op.primary_ip: 196 self.changed_primary_ip = True 197 198 continue 199 200 if (existing_node.primary_ip == self.op.primary_ip or 201 existing_node.secondary_ip == self.op.primary_ip or 202 existing_node.primary_ip == secondary_ip or 203 existing_node.secondary_ip == secondary_ip): 204 raise errors.OpPrereqError("New node ip address(es) conflict with" 205 " existing node %s" % existing_node.name, 206 errors.ECODE_NOTUNIQUE) 207 208 # After this 'if' block, None is no longer a valid value for the 209 # _capable op attributes 210 if self.op.readd: 211 assert existing_node_info is not None, \ 212 "Can't retrieve locked node %s" % node_name 213 for attr in self._NFLAGS: 214 if getattr(self.op, attr) is None: 215 setattr(self.op, attr, getattr(existing_node_info, attr)) 216 else: 217 for attr in self._NFLAGS: 218 if getattr(self.op, attr) is None: 219 setattr(self.op, attr, True) 220 221 if self.op.readd and not self.op.vm_capable: 222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 223 if pri or sec: 224 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 225 " flag set to false, but it already holds" 226 " instances" % node_name, 227 errors.ECODE_STATE) 228 229 # check that the type of the node (single versus dual homed) is the 230 # same as for the master 231 myself = self.cfg.GetMasterNodeInfo() 232 master_singlehomed = myself.secondary_ip == myself.primary_ip 233 newbie_singlehomed = secondary_ip == self.op.primary_ip 234 if master_singlehomed != newbie_singlehomed: 235 if master_singlehomed: 236 raise errors.OpPrereqError("The master has no secondary ip but the" 237 " new node has one", 238 errors.ECODE_INVAL) 239 else: 240 raise errors.OpPrereqError("The master has a secondary ip but the" 241 " new node doesn't have one", 242 errors.ECODE_INVAL) 243 244 # checks reachability 245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 246 raise errors.OpPrereqError("Node not reachable by ping", 247 errors.ECODE_ENVIRON) 248 249 if not newbie_singlehomed: 250 # check reachability from my secondary ip to newbie's secondary ip 251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 252 source=myself.secondary_ip): 253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 254 " based ping to node daemon port", 255 errors.ECODE_ENVIRON) 256 257 if self.op.readd: 258 exceptions = [existing_node_info.uuid] 259 else: 260 exceptions = [] 261 262 if self.op.master_capable: 263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 264 else: 265 self.master_candidate = False 266 267 self.node_group = None 268 if self.op.readd: 269 self.new_node = existing_node_info 270 self.node_group = existing_node_info.group 271 else: 272 self.node_group = self.cfg.LookupNodeGroup(self.op.group) 273 self.new_node = objects.Node(name=node_name, 274 primary_ip=self.op.primary_ip, 275 secondary_ip=secondary_ip, 276 master_candidate=self.master_candidate, 277 offline=False, drained=False, 278 group=self.node_group, ndparams={}) 279 280 if self.op.ndparams: 281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 283 "node", "cluster or group") 284 285 if self.op.hv_state: 286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 287 288 if self.op.disk_state: 289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 290 291 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 292 # it a property on the base class. 293 rpcrunner = rpc.DnsOnlyRunner() 294 result = rpcrunner.call_version([node_name])[node_name] 295 result.Raise("Can't get version information from node %s" % node_name, 296 prereq=True) 297 if constants.PROTOCOL_VERSION == result.payload: 298 logging.info("Communication to node %s fine, sw version %s match", 299 node_name, result.payload) 300 else: 301 raise errors.OpPrereqError("Version mismatch master version %s," 302 " node version %s" % 303 (constants.PROTOCOL_VERSION, result.payload), 304 errors.ECODE_ENVIRON) 305 306 vg_name = self.cfg.GetVGName() 307 if vg_name is not None: 308 vparams = {constants.NV_PVLIST: [vg_name]} 309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 310 cname = self.cfg.GetClusterName() 311 result = rpcrunner.call_node_verify_light( 312 [node_name], vparams, cname, 313 self.cfg.GetClusterInfo().hvparams, 314 {node_name: self.node_group}, 315 self.cfg.GetAllNodeGroupsInfoDict() 316 )[node_name] 317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 318 if errmsgs: 319 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
322 - def _InitOpenVSwitch(self):
323 filled_ndparams = self.cfg.GetClusterInfo().FillND( 324 self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) 325 326 ovs = filled_ndparams.get(constants.ND_OVS, None) 327 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) 328 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) 329 330 if ovs: 331 if not ovs_link: 332 self.LogInfo("No physical interface for OpenvSwitch was given." 333 " OpenvSwitch will not have an outside connection. This" 334 " might not be what you want.") 335 336 result = self.rpc.call_node_configure_ovs( 337 self.new_node.name, ovs_name, ovs_link) 338 result.Raise("Failed to initialize OpenVSwitch on new node")
339
340 - def Exec(self, feedback_fn):
341 """Adds the new node to the cluster. 342 343 """ 344 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 345 "Not owning BGL" 346 347 # We adding a new node so we assume it's powered 348 self.new_node.powered = True 349 350 # for re-adds, reset the offline/drained/master-candidate flags; 351 # we need to reset here, otherwise offline would prevent RPC calls 352 # later in the procedure; this also means that if the re-add 353 # fails, we are left with a non-offlined, broken node 354 if self.op.readd: 355 self.new_node.offline = False 356 self.new_node.drained = False 357 self.LogInfo("Readding a node, the offline/drained flags were reset") 358 # if we demote the node, we do cleanup later in the procedure 359 self.new_node.master_candidate = self.master_candidate 360 if self.changed_primary_ip: 361 self.new_node.primary_ip = self.op.primary_ip 362 363 # copy the master/vm_capable flags 364 for attr in self._NFLAGS: 365 setattr(self.new_node, attr, getattr(self.op, attr)) 366 367 # notify the user about any possible mc promotion 368 if self.new_node.master_candidate: 369 self.LogInfo("Node will be a master candidate") 370 371 if self.op.ndparams: 372 self.new_node.ndparams = self.op.ndparams 373 else: 374 self.new_node.ndparams = {} 375 376 if self.op.hv_state: 377 self.new_node.hv_state_static = self.new_hv_state 378 379 if self.op.disk_state: 380 self.new_node.disk_state_static = self.new_disk_state 381 382 # Add node to our /etc/hosts, and add key to known_hosts 383 if self.cfg.GetClusterInfo().modify_etc_hosts: 384 master_node = self.cfg.GetMasterNode() 385 result = self.rpc.call_etc_hosts_modify( 386 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 387 self.hostname.ip) 388 result.Raise("Can't update hosts file with new host data") 389 390 if self.new_node.secondary_ip != self.new_node.primary_ip: 391 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 392 False) 393 394 node_verifier_uuids = [self.cfg.GetMasterNode()] 395 node_verify_param = { 396 constants.NV_NODELIST: ([self.new_node.name], {}), 397 # TODO: do a node-net-test as well? 398 } 399 400 result = self.rpc.call_node_verify( 401 node_verifier_uuids, node_verify_param, 402 self.cfg.GetClusterName(), 403 self.cfg.GetClusterInfo().hvparams, 404 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)}, 405 self.cfg.GetAllNodeGroupsInfoDict() 406 ) 407 for verifier in node_verifier_uuids: 408 result[verifier].Raise("Cannot communicate with node %s" % verifier) 409 nl_payload = result[verifier].payload[constants.NV_NODELIST] 410 if nl_payload: 411 for failed in nl_payload: 412 feedback_fn("ssh/hostname verification failed" 413 " (checking from %s): %s" % 414 (verifier, nl_payload[failed])) 415 raise errors.OpExecError("ssh/hostname verification failed") 416 417 self._InitOpenVSwitch() 418 419 if self.op.readd: 420 self.context.ReaddNode(self.new_node) 421 RedistributeAncillaryFiles(self) 422 # make sure we redistribute the config 423 self.cfg.Update(self.new_node, feedback_fn) 424 # and make sure the new node will not have old files around 425 if not self.new_node.master_candidate: 426 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 427 result.Warn("Node failed to demote itself from master candidate status", 428 self.LogWarning) 429 else: 430 self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId()) 431 RedistributeAncillaryFiles(self) 432 433 # We create a new certificate even if the node is readded 434 digest = GetClientCertDigest(self, self.new_node.uuid) 435 if self.new_node.master_candidate: 436 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest) 437 else: 438 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None) 439 440 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
441 442
443 -class LUNodeSetParams(LogicalUnit):
444 """Modifies the parameters of a node. 445 446 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 447 to the node role (as _ROLE_*) 448 @cvar _R2F: a dictionary from node role to tuples of flags 449 @cvar _FLAGS: a list of attribute names corresponding to the flags 450 451 """ 452 HPATH = "node-modify" 453 HTYPE = constants.HTYPE_NODE 454 REQ_BGL = False 455 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 456 _F2R = { 457 (True, False, False): _ROLE_CANDIDATE, 458 (False, True, False): _ROLE_DRAINED, 459 (False, False, True): _ROLE_OFFLINE, 460 (False, False, False): _ROLE_REGULAR, 461 } 462 _R2F = dict((v, k) for k, v in _F2R.items()) 463 _FLAGS = ["master_candidate", "drained", "offline"] 464
465 - def CheckArguments(self):
466 (self.op.node_uuid, self.op.node_name) = \ 467 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 468 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 469 self.op.master_capable, self.op.vm_capable, 470 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 471 self.op.disk_state] 472 if all_mods.count(None) == len(all_mods): 473 raise errors.OpPrereqError("Please pass at least one modification", 474 errors.ECODE_INVAL) 475 if all_mods.count(True) > 1: 476 raise errors.OpPrereqError("Can't set the node into more than one" 477 " state at the same time", 478 errors.ECODE_INVAL) 479 480 # Boolean value that tells us whether we might be demoting from MC 481 self.might_demote = (self.op.master_candidate is False or 482 self.op.offline is True or 483 self.op.drained is True or 484 self.op.master_capable is False) 485 486 if self.op.secondary_ip: 487 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 488 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 489 " address" % self.op.secondary_ip, 490 errors.ECODE_INVAL) 491 492 self.lock_all = self.op.auto_promote and self.might_demote 493 self.lock_instances = self.op.secondary_ip is not None
494
495 - def _InstanceFilter(self, instance):
496 """Filter for getting affected instances. 497 498 """ 499 return (instance.disk_template in constants.DTS_INT_MIRROR and 500 self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
501
502 - def ExpandNames(self):
503 if self.lock_all: 504 self.needed_locks = { 505 locking.LEVEL_NODE: locking.ALL_SET, 506 507 # Block allocations when all nodes are locked 508 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 509 } 510 else: 511 self.needed_locks = { 512 locking.LEVEL_NODE: self.op.node_uuid, 513 } 514 515 # Since modifying a node can have severe effects on currently running 516 # operations the resource lock is at least acquired in shared mode 517 self.needed_locks[locking.LEVEL_NODE_RES] = \ 518 self.needed_locks[locking.LEVEL_NODE] 519 520 # Get all locks except nodes in shared mode; they are not used for anything 521 # but read-only access 522 self.share_locks = ShareAll() 523 self.share_locks[locking.LEVEL_NODE] = 0 524 self.share_locks[locking.LEVEL_NODE_RES] = 0 525 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 526 527 if self.lock_instances: 528 self.needed_locks[locking.LEVEL_INSTANCE] = \ 529 self.cfg.GetInstanceNames( 530 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
531
532 - def BuildHooksEnv(self):
533 """Build hooks env. 534 535 This runs on the master node. 536 537 """ 538 return { 539 "OP_TARGET": self.op.node_name, 540 "MASTER_CANDIDATE": str(self.op.master_candidate), 541 "OFFLINE": str(self.op.offline), 542 "DRAINED": str(self.op.drained), 543 "MASTER_CAPABLE": str(self.op.master_capable), 544 "VM_CAPABLE": str(self.op.vm_capable), 545 }
546
547 - def BuildHooksNodes(self):
548 """Build hooks nodes. 549 550 """ 551 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 552 return (nl, nl)
553
554 - def CheckPrereq(self):
555 """Check prerequisites. 556 557 This only checks the instance list against the existing names. 558 559 """ 560 node = self.cfg.GetNodeInfo(self.op.node_uuid) 561 if self.lock_instances: 562 affected_instances = \ 563 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 564 565 # Verify instance locks 566 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 567 wanted_instance_names = frozenset([inst.name for inst in 568 affected_instances.values()]) 569 if wanted_instance_names - owned_instance_names: 570 raise errors.OpPrereqError("Instances affected by changing node %s's" 571 " secondary IP address have changed since" 572 " locks were acquired, wanted '%s', have" 573 " '%s'; retry the operation" % 574 (node.name, 575 utils.CommaJoin(wanted_instance_names), 576 utils.CommaJoin(owned_instance_names)), 577 errors.ECODE_STATE) 578 else: 579 affected_instances = None 580 581 if (self.op.master_candidate is not None or 582 self.op.drained is not None or 583 self.op.offline is not None): 584 # we can't change the master's node flags 585 if node.uuid == self.cfg.GetMasterNode(): 586 raise errors.OpPrereqError("The master role can be changed" 587 " only via master-failover", 588 errors.ECODE_INVAL) 589 590 if self.op.master_candidate and not node.master_capable: 591 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 592 " it a master candidate" % node.name, 593 errors.ECODE_STATE) 594 595 if self.op.vm_capable is False: 596 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 597 if ipri or isec: 598 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 599 " the vm_capable flag" % node.name, 600 errors.ECODE_STATE) 601 602 if node.master_candidate and self.might_demote and not self.lock_all: 603 assert not self.op.auto_promote, "auto_promote set but lock_all not" 604 # check if after removing the current node, we're missing master 605 # candidates 606 (mc_remaining, mc_should, _) = \ 607 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 608 if mc_remaining < mc_should: 609 raise errors.OpPrereqError("Not enough master candidates, please" 610 " pass auto promote option to allow" 611 " promotion (--auto-promote or RAPI" 612 " auto_promote=True)", errors.ECODE_STATE) 613 614 self.old_flags = old_flags = (node.master_candidate, 615 node.drained, node.offline) 616 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 617 self.old_role = old_role = self._F2R[old_flags] 618 619 # Check for ineffective changes 620 for attr in self._FLAGS: 621 if getattr(self.op, attr) is False and getattr(node, attr) is False: 622 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 623 setattr(self.op, attr, None) 624 625 # Past this point, any flag change to False means a transition 626 # away from the respective state, as only real changes are kept 627 628 # TODO: We might query the real power state if it supports OOB 629 if SupportsOob(self.cfg, node): 630 if self.op.offline is False and not (node.powered or 631 self.op.powered is True): 632 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 633 " offline status can be reset") % 634 self.op.node_name, errors.ECODE_STATE) 635 elif self.op.powered is not None: 636 raise errors.OpPrereqError(("Unable to change powered state for node %s" 637 " as it does not support out-of-band" 638 " handling") % self.op.node_name, 639 errors.ECODE_STATE) 640 641 # If we're being deofflined/drained, we'll MC ourself if needed 642 if (self.op.drained is False or self.op.offline is False or 643 (self.op.master_capable and not node.master_capable)): 644 if _DecideSelfPromotion(self): 645 self.op.master_candidate = True 646 self.LogInfo("Auto-promoting node to master candidate") 647 648 # If we're no longer master capable, we'll demote ourselves from MC 649 if self.op.master_capable is False and node.master_candidate: 650 if self.op.node_uuid == self.cfg.GetMasterNode(): 651 raise errors.OpPrereqError("Master must remain master capable", 652 errors.ECODE_STATE) 653 self.LogInfo("Demoting from master candidate") 654 self.op.master_candidate = False 655 656 # Compute new role 657 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 658 if self.op.master_candidate: 659 new_role = self._ROLE_CANDIDATE 660 elif self.op.drained: 661 new_role = self._ROLE_DRAINED 662 elif self.op.offline: 663 new_role = self._ROLE_OFFLINE 664 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 665 # False is still in new flags, which means we're un-setting (the 666 # only) True flag 667 new_role = self._ROLE_REGULAR 668 else: # no new flags, nothing, keep old role 669 new_role = old_role 670 671 self.new_role = new_role 672 673 if old_role == self._ROLE_OFFLINE and new_role != old_role: 674 # Trying to transition out of offline status 675 result = self.rpc.call_version([node.uuid])[node.uuid] 676 if result.fail_msg: 677 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 678 " to report its version: %s" % 679 (node.name, result.fail_msg), 680 errors.ECODE_STATE) 681 else: 682 self.LogWarning("Transitioning node from offline to online state" 683 " without using re-add. Please make sure the node" 684 " is healthy!") 685 686 # When changing the secondary ip, verify if this is a single-homed to 687 # multi-homed transition or vice versa, and apply the relevant 688 # restrictions. 689 if self.op.secondary_ip: 690 # Ok even without locking, because this can't be changed by any LU 691 master = self.cfg.GetMasterNodeInfo() 692 master_singlehomed = master.secondary_ip == master.primary_ip 693 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 694 if self.op.force and node.uuid == master.uuid: 695 self.LogWarning("Transitioning from single-homed to multi-homed" 696 " cluster; all nodes will require a secondary IP" 697 " address") 698 else: 699 raise errors.OpPrereqError("Changing the secondary ip on a" 700 " single-homed cluster requires the" 701 " --force option to be passed, and the" 702 " target node to be the master", 703 errors.ECODE_INVAL) 704 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 705 if self.op.force and node.uuid == master.uuid: 706 self.LogWarning("Transitioning from multi-homed to single-homed" 707 " cluster; secondary IP addresses will have to be" 708 " removed") 709 else: 710 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 711 " same as the primary IP on a multi-homed" 712 " cluster, unless the --force option is" 713 " passed, and the target node is the" 714 " master", errors.ECODE_INVAL) 715 716 assert not (set([inst.name for inst in affected_instances.values()]) - 717 self.owned_locks(locking.LEVEL_INSTANCE)) 718 719 if node.offline: 720 if affected_instances: 721 msg = ("Cannot change secondary IP address: offline node has" 722 " instances (%s) configured to use it" % 723 utils.CommaJoin( 724 [inst.name for inst in affected_instances.values()])) 725 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 726 else: 727 # On online nodes, check that no instances are running, and that 728 # the node has the new ip and we can reach it. 729 for instance in affected_instances.values(): 730 CheckInstanceState(self, instance, INSTANCE_DOWN, 731 msg="cannot change secondary ip") 732 733 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 734 if master.uuid != node.uuid: 735 # check reachability from master secondary ip to new secondary ip 736 if not netutils.TcpPing(self.op.secondary_ip, 737 constants.DEFAULT_NODED_PORT, 738 source=master.secondary_ip): 739 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 740 " based ping to node daemon port", 741 errors.ECODE_ENVIRON) 742 743 if self.op.ndparams: 744 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 745 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 746 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 747 "node", "cluster or group") 748 self.new_ndparams = new_ndparams 749 750 if self.op.hv_state: 751 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 752 node.hv_state_static) 753 754 if self.op.disk_state: 755 self.new_disk_state = \ 756 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
757
758 - def Exec(self, feedback_fn):
759 """Modifies a node. 760 761 """ 762 node = self.cfg.GetNodeInfo(self.op.node_uuid) 763 result = [] 764 765 if self.op.ndparams: 766 node.ndparams = self.new_ndparams 767 768 if self.op.powered is not None: 769 node.powered = self.op.powered 770 771 if self.op.hv_state: 772 node.hv_state_static = self.new_hv_state 773 774 if self.op.disk_state: 775 node.disk_state_static = self.new_disk_state 776 777 for attr in ["master_capable", "vm_capable"]: 778 val = getattr(self.op, attr) 779 if val is not None: 780 setattr(node, attr, val) 781 result.append((attr, str(val))) 782 783 if self.op.secondary_ip: 784 node.secondary_ip = self.op.secondary_ip 785 result.append(("secondary_ip", self.op.secondary_ip)) 786 787 # this will trigger configuration file update, if needed 788 self.cfg.Update(node, feedback_fn) 789 790 if self.new_role != self.old_role: 791 new_flags = self._R2F[self.new_role] 792 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 793 if of != nf: 794 result.append((desc, str(nf))) 795 (node.master_candidate, node.drained, node.offline) = new_flags 796 self.cfg.Update(node, feedback_fn) 797 798 # Tell the node to demote itself, if no longer MC and not offline. 799 # This must be done only after the configuration is updated so that 800 # it's ensured the node won't receive any further configuration updates. 801 if self.old_role == self._ROLE_CANDIDATE and \ 802 self.new_role != self._ROLE_OFFLINE: 803 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 804 if msg: 805 self.LogWarning("Node failed to demote itself: %s", msg) 806 807 # we locked all nodes, we adjust the CP before updating this node 808 if self.lock_all: 809 AdjustCandidatePool(self, [node.uuid]) 810 811 # if node gets promoted, grant RPC priviledges 812 if self.new_role == self._ROLE_CANDIDATE: 813 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid) 814 # if node is demoted, revoke RPC priviledges 815 if self.old_role == self._ROLE_CANDIDATE: 816 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid) 817 818 # this will trigger job queue propagation or cleanup if the mc 819 # flag changed 820 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 821 self.context.ReaddNode(node) 822 823 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid]) 824 825 return result
826 827
828 -class LUNodePowercycle(NoHooksLU):
829 """Powercycles a node. 830 831 """ 832 REQ_BGL = False 833
834 - def CheckArguments(self):
835 (self.op.node_uuid, self.op.node_name) = \ 836 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 837 838 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 839 raise errors.OpPrereqError("The node is the master and the force" 840 " parameter was not set", 841 errors.ECODE_INVAL)
842
843 - def ExpandNames(self):
844 """Locking for PowercycleNode. 845 846 This is a last-resort option and shouldn't block on other 847 jobs. Therefore, we grab no locks. 848 849 """ 850 self.needed_locks = {}
851
852 - def Exec(self, feedback_fn):
853 """Reboots a node. 854 855 """ 856 default_hypervisor = self.cfg.GetHypervisorType() 857 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 858 result = self.rpc.call_node_powercycle(self.op.node_uuid, 859 default_hypervisor, 860 hvparams) 861 result.Raise("Failed to schedule the reboot") 862 return result.payload
863 864
865 -def _GetNodeInstancesInner(cfg, fn):
866 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
867 868
869 -def _GetNodePrimaryInstances(cfg, node_uuid):
870 """Returns primary instances on a node. 871 872 """ 873 return _GetNodeInstancesInner(cfg, 874 lambda inst: node_uuid == inst.primary_node)
875 876
877 -def _GetNodeSecondaryInstances(cfg, node_uuid):
878 """Returns secondary instances on a node. 879 880 """ 881 return _GetNodeInstancesInner(cfg, 882 lambda inst: node_uuid in 883 cfg.GetInstanceSecondaryNodes(inst.uuid))
884 885
886 -def _GetNodeInstances(cfg, node_uuid):
887 """Returns a list of all primary and secondary instances on a node. 888 889 """ 890 891 return _GetNodeInstancesInner(cfg, 892 lambda inst: node_uuid in 893 cfg.GetInstanceNodes(inst.uuid.uuid))
894 895
896 -class LUNodeEvacuate(NoHooksLU):
897 """Evacuates instances off a list of nodes. 898 899 """ 900 REQ_BGL = False 901
902 - def CheckArguments(self):
903 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
904
905 - def ExpandNames(self):
906 (self.op.node_uuid, self.op.node_name) = \ 907 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 908 909 if self.op.remote_node is not None: 910 (self.op.remote_node_uuid, self.op.remote_node) = \ 911 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 912 self.op.remote_node) 913 assert self.op.remote_node 914 915 if self.op.node_uuid == self.op.remote_node_uuid: 916 raise errors.OpPrereqError("Can not use evacuated node as a new" 917 " secondary node", errors.ECODE_INVAL) 918 919 if self.op.mode != constants.NODE_EVAC_SEC: 920 raise errors.OpPrereqError("Without the use of an iallocator only" 921 " secondary instances can be evacuated", 922 errors.ECODE_INVAL) 923 924 # Declare locks 925 self.share_locks = ShareAll() 926 self.needed_locks = { 927 locking.LEVEL_INSTANCE: [], 928 locking.LEVEL_NODEGROUP: [], 929 locking.LEVEL_NODE: [], 930 } 931 932 # Determine nodes (via group) optimistically, needs verification once locks 933 # have been acquired 934 self.lock_nodes = self._DetermineNodes()
935
936 - def _DetermineNodes(self):
937 """Gets the list of node UUIDs to operate on. 938 939 """ 940 if self.op.remote_node is None: 941 # Iallocator will choose any node(s) in the same group 942 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 943 else: 944 group_nodes = frozenset([self.op.remote_node_uuid]) 945 946 # Determine nodes to be locked 947 return set([self.op.node_uuid]) | group_nodes
948
949 - def _DetermineInstances(self):
950 """Builds list of instances to operate on. 951 952 """ 953 assert self.op.mode in constants.NODE_EVAC_MODES 954 955 if self.op.mode == constants.NODE_EVAC_PRI: 956 # Primary instances only 957 inst_fn = _GetNodePrimaryInstances 958 assert self.op.remote_node is None, \ 959 "Evacuating primary instances requires iallocator" 960 elif self.op.mode == constants.NODE_EVAC_SEC: 961 # Secondary instances only 962 inst_fn = _GetNodeSecondaryInstances 963 else: 964 # All instances 965 assert self.op.mode == constants.NODE_EVAC_ALL 966 inst_fn = _GetNodeInstances 967 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 968 # per instance 969 raise errors.OpPrereqError("Due to an issue with the iallocator" 970 " interface it is not possible to evacuate" 971 " all instances at once; specify explicitly" 972 " whether to evacuate primary or secondary" 973 " instances", 974 errors.ECODE_INVAL) 975 976 return inst_fn(self.cfg, self.op.node_uuid)
977
978 - def DeclareLocks(self, level):
979 if level == locking.LEVEL_INSTANCE: 980 # Lock instances optimistically, needs verification once node and group 981 # locks have been acquired 982 self.needed_locks[locking.LEVEL_INSTANCE] = \ 983 set(i.name for i in self._DetermineInstances()) 984 985 elif level == locking.LEVEL_NODEGROUP: 986 # Lock node groups for all potential target nodes optimistically, needs 987 # verification once nodes have been acquired 988 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 989 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 990 991 elif level == locking.LEVEL_NODE: 992 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
993
994 - def CheckPrereq(self):
995 # Verify locks 996 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 997 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 998 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 999 1000 need_nodes = self._DetermineNodes() 1001 1002 if not owned_nodes.issuperset(need_nodes): 1003 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 1004 " locks were acquired, current nodes are" 1005 " are '%s', used to be '%s'; retry the" 1006 " operation" % 1007 (self.op.node_name, 1008 utils.CommaJoin(need_nodes), 1009 utils.CommaJoin(owned_nodes)), 1010 errors.ECODE_STATE) 1011 1012 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 1013 if owned_groups != wanted_groups: 1014 raise errors.OpExecError("Node groups changed since locks were acquired," 1015 " current groups are '%s', used to be '%s';" 1016 " retry the operation" % 1017 (utils.CommaJoin(wanted_groups), 1018 utils.CommaJoin(owned_groups))) 1019 1020 # Determine affected instances 1021 self.instances = self._DetermineInstances() 1022 self.instance_names = [i.name for i in self.instances] 1023 1024 if set(self.instance_names) != owned_instance_names: 1025 raise errors.OpExecError("Instances on node '%s' changed since locks" 1026 " were acquired, current instances are '%s'," 1027 " used to be '%s'; retry the operation" % 1028 (self.op.node_name, 1029 utils.CommaJoin(self.instance_names), 1030 utils.CommaJoin(owned_instance_names))) 1031 1032 if self.instance_names: 1033 self.LogInfo("Evacuating instances from node '%s': %s", 1034 self.op.node_name, 1035 utils.CommaJoin(utils.NiceSort(self.instance_names))) 1036 else: 1037 self.LogInfo("No instances to evacuate from node '%s'", 1038 self.op.node_name) 1039 1040 if self.op.remote_node is not None: 1041 for i in self.instances: 1042 if i.primary_node == self.op.remote_node_uuid: 1043 raise errors.OpPrereqError("Node %s is the primary node of" 1044 " instance %s, cannot use it as" 1045 " secondary" % 1046 (self.op.remote_node, i.name), 1047 errors.ECODE_INVAL)
1048
1049 - def Exec(self, feedback_fn):
1050 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 1051 1052 if not self.instance_names: 1053 # No instances to evacuate 1054 jobs = [] 1055 1056 elif self.op.iallocator is not None: 1057 # TODO: Implement relocation to other group 1058 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode, 1059 instances=list(self.instance_names)) 1060 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1061 1062 ial.Run(self.op.iallocator) 1063 1064 if not ial.success: 1065 raise errors.OpPrereqError("Can't compute node evacuation using" 1066 " iallocator '%s': %s" % 1067 (self.op.iallocator, ial.info), 1068 errors.ECODE_NORES) 1069 1070 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1071 1072 elif self.op.remote_node is not None: 1073 assert self.op.mode == constants.NODE_EVAC_SEC 1074 jobs = [ 1075 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1076 remote_node=self.op.remote_node, 1077 disks=[], 1078 mode=constants.REPLACE_DISK_CHG, 1079 early_release=self.op.early_release)] 1080 for instance_name in self.instance_names] 1081 1082 else: 1083 raise errors.ProgrammerError("No iallocator or remote node") 1084 1085 return ResultWithJobs(jobs)
1086 1087
1088 -class LUNodeMigrate(LogicalUnit):
1089 """Migrate all instances from a node. 1090 1091 """ 1092 HPATH = "node-migrate" 1093 HTYPE = constants.HTYPE_NODE 1094 REQ_BGL = False 1095
1096 - def CheckArguments(self):
1097 pass
1098
1099 - def ExpandNames(self):
1100 (self.op.node_uuid, self.op.node_name) = \ 1101 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1102 1103 self.share_locks = ShareAll() 1104 self.needed_locks = { 1105 locking.LEVEL_NODE: [self.op.node_uuid], 1106 }
1107
1108 - def BuildHooksEnv(self):
1109 """Build hooks env. 1110 1111 This runs on the master, the primary and all the secondaries. 1112 1113 """ 1114 return { 1115 "NODE_NAME": self.op.node_name, 1116 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1117 }
1118
1119 - def BuildHooksNodes(self):
1120 """Build hooks nodes. 1121 1122 """ 1123 nl = [self.cfg.GetMasterNode()] 1124 return (nl, nl)
1125
1126 - def CheckPrereq(self):
1127 pass
1128
1129 - def Exec(self, feedback_fn):
1130 # Prepare jobs for migration instances 1131 jobs = [ 1132 [opcodes.OpInstanceMigrate( 1133 instance_name=inst.name, 1134 mode=self.op.mode, 1135 live=self.op.live, 1136 iallocator=self.op.iallocator, 1137 target_node=self.op.target_node, 1138 allow_runtime_changes=self.op.allow_runtime_changes, 1139 ignore_ipolicy=self.op.ignore_ipolicy)] 1140 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1141 1142 # TODO: Run iallocator in this opcode and pass correct placement options to 1143 # OpInstanceMigrate. Since other jobs can modify the cluster between 1144 # running the iallocator and the actual migration, a good consistency model 1145 # will have to be found. 1146 1147 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1148 frozenset([self.op.node_uuid])) 1149 1150 return ResultWithJobs(jobs)
1151 1152
1153 -def _GetStorageTypeArgs(cfg, storage_type):
1154 """Returns the arguments for a storage type. 1155 1156 """ 1157 # Special case for file storage 1158 1159 if storage_type == constants.ST_FILE: 1160 return [[cfg.GetFileStorageDir()]] 1161 elif storage_type == constants.ST_SHARED_FILE: 1162 return [[cfg.GetSharedFileStorageDir()]] 1163 elif storage_type == constants.ST_GLUSTER: 1164 return [[cfg.GetGlusterStorageDir()]] 1165 else: 1166 return []
1167 1168
1169 -class LUNodeModifyStorage(NoHooksLU):
1170 """Logical unit for modifying a storage volume on a node. 1171 1172 """ 1173 REQ_BGL = False 1174
1175 - def CheckArguments(self):
1176 (self.op.node_uuid, self.op.node_name) = \ 1177 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1178 1179 storage_type = self.op.storage_type 1180 1181 try: 1182 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1183 except KeyError: 1184 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1185 " modified" % storage_type, 1186 errors.ECODE_INVAL) 1187 1188 diff = set(self.op.changes.keys()) - modifiable 1189 if diff: 1190 raise errors.OpPrereqError("The following fields can not be modified for" 1191 " storage units of type '%s': %r" % 1192 (storage_type, list(diff)), 1193 errors.ECODE_INVAL)
1194
1195 - def CheckPrereq(self):
1196 """Check prerequisites. 1197 1198 """ 1199 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1200
1201 - def ExpandNames(self):
1202 self.needed_locks = { 1203 locking.LEVEL_NODE: self.op.node_uuid, 1204 }
1205
1206 - def Exec(self, feedback_fn):
1207 """Computes the list of nodes and their attributes. 1208 1209 """ 1210 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1211 result = self.rpc.call_storage_modify(self.op.node_uuid, 1212 self.op.storage_type, st_args, 1213 self.op.name, self.op.changes) 1214 result.Raise("Failed to modify storage unit '%s' on %s" % 1215 (self.op.name, self.op.node_name))
1216 1217
1218 -def _CheckOutputFields(fields, selected):
1219 """Checks whether all selected fields are valid according to fields. 1220 1221 @type fields: L{utils.FieldSet} 1222 @param fields: fields set 1223 @type selected: L{utils.FieldSet} 1224 @param selected: fields set 1225 1226 """ 1227 delta = fields.NonMatching(selected) 1228 if delta: 1229 raise errors.OpPrereqError("Unknown output fields selected: %s" 1230 % ",".join(delta), errors.ECODE_INVAL)
1231 1232
1233 -class LUNodeQueryvols(NoHooksLU):
1234 """Logical unit for getting volumes on node(s). 1235 1236 """ 1237 REQ_BGL = False 1238
1239 - def CheckArguments(self):
1244
1245 - def ExpandNames(self):
1246 self.share_locks = ShareAll() 1247 1248 if self.op.nodes: 1249 self.needed_locks = { 1250 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1251 } 1252 else: 1253 self.needed_locks = { 1254 locking.LEVEL_NODE: locking.ALL_SET, 1255 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1256 }
1257
1258 - def Exec(self, feedback_fn):
1259 """Computes the list of nodes and their attributes. 1260 1261 """ 1262 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1263 volumes = self.rpc.call_node_volumes(node_uuids) 1264 1265 ilist = self.cfg.GetAllInstancesInfo() 1266 vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values()) 1267 1268 output = [] 1269 for node_uuid in node_uuids: 1270 nresult = volumes[node_uuid] 1271 if nresult.offline: 1272 continue 1273 msg = nresult.fail_msg 1274 if msg: 1275 self.LogWarning("Can't compute volume data on node %s: %s", 1276 self.cfg.GetNodeName(node_uuid), msg) 1277 continue 1278 1279 node_vols = sorted(nresult.payload, 1280 key=operator.itemgetter(constants.VF_DEV)) 1281 1282 for vol in node_vols: 1283 node_output = [] 1284 for field in self.op.output_fields: 1285 if field == constants.VF_NODE: 1286 val = self.cfg.GetNodeName(node_uuid) 1287 elif field == constants.VF_PHYS: 1288 val = vol[constants.VF_DEV] 1289 elif field == constants.VF_VG: 1290 val = vol[constants.VF_VG] 1291 elif field == constants.VF_NAME: 1292 val = vol[constants.VF_NAME] 1293 elif field == constants.VF_SIZE: 1294 val = int(float(vol[constants.VF_SIZE])) 1295 elif field == constants.VF_INSTANCE: 1296 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + 1297 vol[constants.VF_NAME]), None) 1298 if inst is not None: 1299 val = inst.name 1300 else: 1301 val = "-" 1302 else: 1303 raise errors.ParameterError(field) 1304 node_output.append(str(val)) 1305 1306 output.append(node_output) 1307 1308 return output
1309 1310
1311 -class LUNodeQueryStorage(NoHooksLU):
1312 """Logical unit for getting information on storage units on node(s). 1313 1314 """ 1315 REQ_BGL = False 1316
1317 - def CheckArguments(self):
1318 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1319 self.op.output_fields)
1320
1321 - def ExpandNames(self):
1322 self.share_locks = ShareAll() 1323 1324 if self.op.nodes: 1325 self.needed_locks = { 1326 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1327 } 1328 else: 1329 self.needed_locks = { 1330 locking.LEVEL_NODE: locking.ALL_SET, 1331 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1332 }
1333
1334 - def _DetermineStorageType(self):
1335 """Determines the default storage type of the cluster. 1336 1337 """ 1338 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates 1339 default_storage_type = \ 1340 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] 1341 return default_storage_type
1342
1343 - def CheckPrereq(self):
1344 """Check prerequisites. 1345 1346 """ 1347 if self.op.storage_type: 1348 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1349 self.storage_type = self.op.storage_type 1350 else: 1351 self.storage_type = self._DetermineStorageType() 1352 supported_storage_types = constants.STS_REPORT_NODE_STORAGE 1353 if self.storage_type not in supported_storage_types: 1354 raise errors.OpPrereqError( 1355 "Storage reporting for storage type '%s' is not supported. Please" 1356 " use the --storage-type option to specify one of the supported" 1357 " storage types (%s) or set the default disk template to one that" 1358 " supports storage reporting." % 1359 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1360
1361 - def Exec(self, feedback_fn):
1362 """Computes the list of nodes and their attributes. 1363 1364 """ 1365 if self.op.storage_type: 1366 self.storage_type = self.op.storage_type 1367 else: 1368 self.storage_type = self._DetermineStorageType() 1369 1370 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1371 1372 # Always get name to sort by 1373 if constants.SF_NAME in self.op.output_fields: 1374 fields = self.op.output_fields[:] 1375 else: 1376 fields = [constants.SF_NAME] + self.op.output_fields 1377 1378 # Never ask for node or type as it's only known to the LU 1379 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1380 while extra in fields: 1381 fields.remove(extra) 1382 1383 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1384 name_idx = field_idx[constants.SF_NAME] 1385 1386 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) 1387 data = self.rpc.call_storage_list(self.node_uuids, 1388 self.storage_type, st_args, 1389 self.op.name, fields) 1390 1391 result = [] 1392 1393 for node_uuid in utils.NiceSort(self.node_uuids): 1394 node_name = self.cfg.GetNodeName(node_uuid) 1395 nresult = data[node_uuid] 1396 if nresult.offline: 1397 continue 1398 1399 msg = nresult.fail_msg 1400 if msg: 1401 self.LogWarning("Can't get storage data from node %s: %s", 1402 node_name, msg) 1403 continue 1404 1405 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1406 1407 for name in utils.NiceSort(rows.keys()): 1408 row = rows[name] 1409 1410 out = [] 1411 1412 for field in self.op.output_fields: 1413 if field == constants.SF_NODE: 1414 val = node_name 1415 elif field == constants.SF_TYPE: 1416 val = self.storage_type 1417 elif field in field_idx: 1418 val = row[field_idx[field]] 1419 else: 1420 raise errors.ParameterError(field) 1421 1422 out.append(val) 1423 1424 result.append(out) 1425 1426 return result
1427 1428
1429 -class LUNodeRemove(LogicalUnit):
1430 """Logical unit for removing a node. 1431 1432 """ 1433 HPATH = "node-remove" 1434 HTYPE = constants.HTYPE_NODE 1435
1436 - def BuildHooksEnv(self):
1437 """Build hooks env. 1438 1439 """ 1440 return { 1441 "OP_TARGET": self.op.node_name, 1442 "NODE_NAME": self.op.node_name, 1443 }
1444
1445 - def BuildHooksNodes(self):
1446 """Build hooks nodes. 1447 1448 This doesn't run on the target node in the pre phase as a failed 1449 node would then be impossible to remove. 1450 1451 """ 1452 all_nodes = self.cfg.GetNodeList() 1453 try: 1454 all_nodes.remove(self.op.node_uuid) 1455 except ValueError: 1456 pass 1457 return (all_nodes, all_nodes)
1458
1459 - def CheckPrereq(self):
1460 """Check prerequisites. 1461 1462 This checks: 1463 - the node exists in the configuration 1464 - it does not have primary or secondary instances 1465 - it's not the master 1466 1467 Any errors are signaled by raising errors.OpPrereqError. 1468 1469 """ 1470 (self.op.node_uuid, self.op.node_name) = \ 1471 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1472 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1473 assert node is not None 1474 1475 masternode = self.cfg.GetMasterNode() 1476 if node.uuid == masternode: 1477 raise errors.OpPrereqError("Node is the master node, failover to another" 1478 " node is required", errors.ECODE_INVAL) 1479 1480 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1481 if node.uuid in self.cfg.GetInstanceNodes(instance.uuid): 1482 raise errors.OpPrereqError("Instance %s is still running on the node," 1483 " please remove first" % instance.name, 1484 errors.ECODE_INVAL) 1485 self.op.node_name = node.name 1486 self.node = node
1487
1488 - def Exec(self, feedback_fn):
1489 """Removes the node from the cluster. 1490 1491 """ 1492 logging.info("Stopping the node daemon and removing configs from node %s", 1493 self.node.name) 1494 1495 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1496 1497 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1498 "Not owning BGL" 1499 1500 # Promote nodes to master candidate as needed 1501 AdjustCandidatePool(self, [self.node.uuid]) 1502 self.context.RemoveNode(self.cfg, self.node) 1503 1504 # Run post hooks on the node before it's removed 1505 RunPostHook(self, self.node.name) 1506 1507 # we have to call this by name rather than by UUID, as the node is no longer 1508 # in the config 1509 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1510 msg = result.fail_msg 1511 if msg: 1512 self.LogWarning("Errors encountered on the remote node while leaving" 1513 " the cluster: %s", msg) 1514 1515 cluster = self.cfg.GetClusterInfo() 1516 1517 # Remove node from candidate certificate list 1518 if self.node.master_candidate: 1519 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid) 1520 1521 # Remove node from our /etc/hosts 1522 if cluster.modify_etc_hosts: 1523 master_node_uuid = self.cfg.GetMasterNode() 1524 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1525 constants.ETC_HOSTS_REMOVE, 1526 self.node.name, None) 1527 result.Raise("Can't update hosts file with new host data") 1528 RedistributeAncillaryFiles(self)
1529 1530
1531 -class LURepairNodeStorage(NoHooksLU):
1532 """Repairs the volume group on a node. 1533 1534 """ 1535 REQ_BGL = False 1536
1537 - def CheckArguments(self):
1538 (self.op.node_uuid, self.op.node_name) = \ 1539 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1540 1541 storage_type = self.op.storage_type 1542 1543 if (constants.SO_FIX_CONSISTENCY not in 1544 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1545 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1546 " repaired" % storage_type, 1547 errors.ECODE_INVAL)
1548
1549 - def ExpandNames(self):
1550 self.needed_locks = { 1551 locking.LEVEL_NODE: [self.op.node_uuid], 1552 }
1553
1554 - def _CheckFaultyDisks(self, instance, node_uuid):
1555 """Ensure faulty disks abort the opcode or at least warn.""" 1556 try: 1557 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1558 node_uuid, True): 1559 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1560 " node '%s'" % 1561 (instance.name, 1562 self.cfg.GetNodeName(node_uuid)), 1563 errors.ECODE_STATE) 1564 except errors.OpPrereqError, err: 1565 if self.op.ignore_consistency: 1566 self.LogWarning(str(err.args[0])) 1567 else: 1568 raise
1569
1570 - def CheckPrereq(self):
1571 """Check prerequisites. 1572 1573 """ 1574 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1575 1576 # Check whether any instance on this node has faulty disks 1577 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1578 if not inst.disks_active: 1579 continue 1580 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid)) 1581 check_nodes.discard(self.op.node_uuid) 1582 for inst_node_uuid in check_nodes: 1583 self._CheckFaultyDisks(inst, inst_node_uuid)
1584
1585 - def Exec(self, feedback_fn):
1586 feedback_fn("Repairing storage unit '%s' on %s ..." % 1587 (self.op.name, self.op.node_name)) 1588 1589 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1590 result = self.rpc.call_storage_execute(self.op.node_uuid, 1591 self.op.storage_type, st_args, 1592 self.op.name, 1593 constants.SO_FIX_CONSISTENCY) 1594 result.Raise("Failed to repair storage unit '%s' on %s" % 1595 (self.op.name, self.op.node_name))
1596