Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with nodes.""" 
  32   
  33  import logging 
  34  import operator 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import netutils 
  40  from ganeti import objects 
  41  from ganeti import opcodes 
  42  import ganeti.rpc.node as rpc 
  43  from ganeti import utils 
  44  from ganeti.masterd import iallocator 
  45   
  46  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
  47  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  48    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  49    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  50    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  51    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  52    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  53    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  54    FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \ 
  55    AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \ 
  56    EnsureKvmdOnNodes, WarnAboutFailedSshUpdates, AddMasterCandidateSshKey 
  57   
  58   
59 -def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate. 61 62 """ 63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 65 # the new node will increase mc_max with one, so: 66 mc_should = min(mc_should + 1, cp_size) 67 return mc_now < mc_should
68 69
70 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip. 72 73 @type lu: L{LogicalUnit} 74 @param lu: the LU on behalf of which we make the check 75 @type node: L{objects.Node} 76 @param node: the node to check 77 @type secondary_ip: string 78 @param secondary_ip: the ip to check 79 @type prereq: boolean 80 @param prereq: whether to throw a prerequisite or an execute error 81 @raise errors.OpPrereqError: if the node doesn't have the ip, 82 and prereq=True 83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 84 85 """ 86 # this can be called with a new node, which has no UUID yet, so perform the 87 # RPC call using its name 88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 89 result.Raise("Failure checking secondary ip on node %s" % node.name, 90 prereq=prereq, ecode=errors.ECODE_ENVIRON) 91 if not result.payload: 92 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 93 " please fix and re-run this command" % secondary_ip) 94 if prereq: 95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 96 else: 97 raise errors.OpExecError(msg)
98 99
100 -class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster. 102 103 """ 104 HPATH = "node-add" 105 HTYPE = constants.HTYPE_NODE 106 _NFLAGS = ["master_capable", "vm_capable"] 107
108 - def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 110 # validate/normalize the node name 111 self.hostname = netutils.GetHostname(name=self.op.node_name, 112 family=self.primary_ip_family) 113 self.op.node_name = self.hostname.name 114 115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 116 raise errors.OpPrereqError("Cannot readd the master node", 117 errors.ECODE_STATE) 118 119 if self.op.readd and self.op.group: 120 raise errors.OpPrereqError("Cannot pass a node group when a node is" 121 " being readded", errors.ECODE_INVAL)
122
123 - def BuildHooksEnv(self):
124 """Build hooks env. 125 126 This will run on all nodes before, and on all nodes + the new node after. 127 128 """ 129 return { 130 "OP_TARGET": self.op.node_name, 131 "NODE_NAME": self.op.node_name, 132 "NODE_PIP": self.op.primary_ip, 133 "NODE_SIP": self.op.secondary_ip, 134 "MASTER_CAPABLE": str(self.op.master_capable), 135 "VM_CAPABLE": str(self.op.vm_capable), 136 }
137
138 - def BuildHooksNodes(self):
139 """Build hooks nodes. 140 141 """ 142 hook_nodes = self.cfg.GetNodeList() 143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 144 if new_node_info is not None: 145 # Exclude added node 146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 147 148 # add the new node as post hook node by name; it does not have an UUID yet 149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 - def CheckPrereq(self):
155 """Check prerequisites. 156 157 This checks: 158 - the new node is not already in the config 159 - it is resolvable 160 - its parameters (single/dual homed) matches the cluster 161 162 Any errors are signaled by raising errors.OpPrereqError. 163 164 """ 165 node_name = self.hostname.name 166 self.op.primary_ip = self.hostname.ip 167 if self.op.secondary_ip is None: 168 if self.primary_ip_family == netutils.IP6Address.family: 169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 170 " IPv4 address must be given as secondary", 171 errors.ECODE_INVAL) 172 self.op.secondary_ip = self.op.primary_ip 173 174 secondary_ip = self.op.secondary_ip 175 if not netutils.IP4Address.IsValid(secondary_ip): 176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 177 " address" % secondary_ip, errors.ECODE_INVAL) 178 179 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 180 if not self.op.readd and existing_node_info is not None: 181 raise errors.OpPrereqError("Node %s is already in the configuration" % 182 node_name, errors.ECODE_EXISTS) 183 elif self.op.readd and existing_node_info is None: 184 raise errors.OpPrereqError("Node %s is not in the configuration" % 185 node_name, errors.ECODE_NOENT) 186 187 self.changed_primary_ip = False 188 189 for existing_node in self.cfg.GetAllNodesInfo().values(): 190 if self.op.readd and node_name == existing_node.name: 191 if existing_node.secondary_ip != secondary_ip: 192 raise errors.OpPrereqError("Readded node doesn't have the same IP" 193 " address configuration as before", 194 errors.ECODE_INVAL) 195 if existing_node.primary_ip != self.op.primary_ip: 196 self.changed_primary_ip = True 197 198 continue 199 200 if (existing_node.primary_ip == self.op.primary_ip or 201 existing_node.secondary_ip == self.op.primary_ip or 202 existing_node.primary_ip == secondary_ip or 203 existing_node.secondary_ip == secondary_ip): 204 raise errors.OpPrereqError("New node ip address(es) conflict with" 205 " existing node %s" % existing_node.name, 206 errors.ECODE_NOTUNIQUE) 207 208 # After this 'if' block, None is no longer a valid value for the 209 # _capable op attributes 210 if self.op.readd: 211 assert existing_node_info is not None, \ 212 "Can't retrieve locked node %s" % node_name 213 for attr in self._NFLAGS: 214 if getattr(self.op, attr) is None: 215 setattr(self.op, attr, getattr(existing_node_info, attr)) 216 else: 217 for attr in self._NFLAGS: 218 if getattr(self.op, attr) is None: 219 setattr(self.op, attr, True) 220 221 if self.op.readd and not self.op.vm_capable: 222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 223 if pri or sec: 224 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 225 " flag set to false, but it already holds" 226 " instances" % node_name, 227 errors.ECODE_STATE) 228 229 # check that the type of the node (single versus dual homed) is the 230 # same as for the master 231 myself = self.cfg.GetMasterNodeInfo() 232 master_singlehomed = myself.secondary_ip == myself.primary_ip 233 newbie_singlehomed = secondary_ip == self.op.primary_ip 234 if master_singlehomed != newbie_singlehomed: 235 if master_singlehomed: 236 raise errors.OpPrereqError("The master has no secondary ip but the" 237 " new node has one", 238 errors.ECODE_INVAL) 239 else: 240 raise errors.OpPrereqError("The master has a secondary ip but the" 241 " new node doesn't have one", 242 errors.ECODE_INVAL) 243 244 # checks reachability 245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 246 raise errors.OpPrereqError("Node not reachable by ping", 247 errors.ECODE_ENVIRON) 248 249 if not newbie_singlehomed: 250 # check reachability from my secondary ip to newbie's secondary ip 251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 252 source=myself.secondary_ip): 253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 254 " based ping to node daemon port", 255 errors.ECODE_ENVIRON) 256 257 if self.op.readd: 258 exceptions = [existing_node_info.uuid] 259 else: 260 exceptions = [] 261 262 if self.op.master_capable: 263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 264 else: 265 self.master_candidate = False 266 267 self.node_group = None 268 if self.op.readd: 269 self.new_node = existing_node_info 270 self.node_group = existing_node_info.group 271 else: 272 self.node_group = self.cfg.LookupNodeGroup(self.op.group) 273 self.new_node = objects.Node(name=node_name, 274 primary_ip=self.op.primary_ip, 275 secondary_ip=secondary_ip, 276 master_candidate=self.master_candidate, 277 offline=False, drained=False, 278 group=self.node_group, ndparams={}) 279 280 if self.op.ndparams: 281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 283 "node", "cluster or group") 284 285 if self.op.hv_state: 286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 287 288 if self.op.disk_state: 289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 290 291 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 292 # it a property on the base class. 293 rpcrunner = rpc.DnsOnlyRunner() 294 result = rpcrunner.call_version([node_name])[node_name] 295 result.Raise("Can't get version information from node %s" % node_name, 296 prereq=True, ecode=errors.ECODE_ENVIRON) 297 if constants.PROTOCOL_VERSION == result.payload: 298 logging.info("Communication to node %s fine, sw version %s match", 299 node_name, result.payload) 300 else: 301 raise errors.OpPrereqError("Version mismatch master version %s," 302 " node version %s" % 303 (constants.PROTOCOL_VERSION, result.payload), 304 errors.ECODE_ENVIRON) 305 306 vg_name = self.cfg.GetVGName() 307 if vg_name is not None: 308 vparams = {constants.NV_PVLIST: [vg_name]} 309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 310 cname = self.cfg.GetClusterName() 311 result = rpcrunner.call_node_verify_light( 312 [node_name], vparams, cname, 313 self.cfg.GetClusterInfo().hvparams, 314 )[node_name] 315 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 316 if errmsgs: 317 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 318 "; ".join(errmsgs), errors.ECODE_ENVIRON)
319
320 - def _InitOpenVSwitch(self):
321 filled_ndparams = self.cfg.GetClusterInfo().FillND( 322 self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) 323 324 ovs = filled_ndparams.get(constants.ND_OVS, None) 325 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) 326 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) 327 328 if ovs: 329 if not ovs_link: 330 self.LogInfo("No physical interface for OpenvSwitch was given." 331 " OpenvSwitch will not have an outside connection. This" 332 " might not be what you want.") 333 334 result = self.rpc.call_node_configure_ovs( 335 self.new_node.name, ovs_name, ovs_link) 336 result.Raise("Failed to initialize OpenVSwitch on new node")
337
338 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate, 339 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
340 """Update the SSH setup of all nodes after adding a new node. 341 342 @type readd: boolean 343 @param readd: whether or not this node is readded 344 345 """ 346 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 347 master_node = self.cfg.GetMasterNode() 348 349 if readd: 350 # clear previous keys 351 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 352 remove_result = rpcrunner.call_node_ssh_key_remove( 353 [master_node], 354 new_node_uuid, new_node_name, 355 master_candidate_uuids, 356 potential_master_candidates, 357 True, # from authorized keys 358 True, # from public keys 359 False, # clear authorized keys 360 True, # clear public keys 361 True) # it's a readd 362 remove_result[master_node].Raise( 363 "Could not remove SSH keys of node %s before readding," 364 " (UUID: %s)." % (new_node_name, new_node_uuid)) 365 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn) 366 367 result = rpcrunner.call_node_ssh_key_add( 368 [master_node], new_node_uuid, new_node_name, 369 potential_master_candidates, 370 is_master_candidate, is_potential_master_candidate, 371 is_potential_master_candidate) 372 373 result[master_node].Raise("Could not update the node's SSH setup.") 374 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
375
376 - def Exec(self, feedback_fn):
377 """Adds the new node to the cluster. 378 379 """ 380 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 381 "Not owning BGL" 382 383 # We adding a new node so we assume it's powered 384 self.new_node.powered = True 385 386 # for re-adds, reset the offline/drained/master-candidate flags; 387 # we need to reset here, otherwise offline would prevent RPC calls 388 # later in the procedure; this also means that if the re-add 389 # fails, we are left with a non-offlined, broken node 390 if self.op.readd: 391 self.new_node.offline = False 392 self.new_node.drained = False 393 self.LogInfo("Readding a node, the offline/drained flags were reset") 394 # if we demote the node, we do cleanup later in the procedure 395 self.new_node.master_candidate = self.master_candidate 396 if self.changed_primary_ip: 397 self.new_node.primary_ip = self.op.primary_ip 398 399 # copy the master/vm_capable flags 400 for attr in self._NFLAGS: 401 setattr(self.new_node, attr, getattr(self.op, attr)) 402 403 # notify the user about any possible mc promotion 404 if self.new_node.master_candidate: 405 self.LogInfo("Node will be a master candidate") 406 407 if self.op.ndparams: 408 self.new_node.ndparams = self.op.ndparams 409 else: 410 self.new_node.ndparams = {} 411 412 if self.op.hv_state: 413 self.new_node.hv_state_static = self.new_hv_state 414 415 if self.op.disk_state: 416 self.new_node.disk_state_static = self.new_disk_state 417 418 # Add node to our /etc/hosts, and add key to known_hosts 419 if self.cfg.GetClusterInfo().modify_etc_hosts: 420 master_node = self.cfg.GetMasterNode() 421 result = self.rpc.call_etc_hosts_modify( 422 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 423 self.hostname.ip) 424 result.Raise("Can't update hosts file with new host data") 425 426 if self.new_node.secondary_ip != self.new_node.primary_ip: 427 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 428 False) 429 430 node_verifier_uuids = [self.cfg.GetMasterNode()] 431 node_verify_param = { 432 constants.NV_NODELIST: ([self.new_node.name], {}, []), 433 # TODO: do a node-net-test as well? 434 } 435 436 result = self.rpc.call_node_verify( 437 node_verifier_uuids, node_verify_param, 438 self.cfg.GetClusterName(), 439 self.cfg.GetClusterInfo().hvparams) 440 for verifier in node_verifier_uuids: 441 result[verifier].Raise("Cannot communicate with node %s" % verifier) 442 nl_payload = result[verifier].payload[constants.NV_NODELIST] 443 if nl_payload: 444 for failed in nl_payload: 445 feedback_fn("ssh/hostname verification failed" 446 " (checking from %s): %s" % 447 (verifier, nl_payload[failed])) 448 raise errors.OpExecError("ssh/hostname verification failed") 449 450 self._InitOpenVSwitch() 451 452 if self.op.readd: 453 RedistributeAncillaryFiles(self) 454 # make sure we redistribute the config 455 self.cfg.Update(self.new_node, feedback_fn) 456 # and make sure the new node will not have old files around 457 if not self.new_node.master_candidate: 458 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 459 result.Warn("Node failed to demote itself from master candidate status", 460 self.LogWarning) 461 else: 462 self.cfg.AddNode(self.new_node, self.proc.GetECId()) 463 RedistributeAncillaryFiles(self) 464 465 # We create a new certificate even if the node is readded 466 digest = GetClientCertDigest(self, self.new_node.uuid) 467 if self.new_node.master_candidate: 468 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest) 469 else: 470 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None) 471 472 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid]) 473 474 # Update SSH setup of all nodes 475 if self.op.node_setup: 476 # FIXME: so far, all nodes are considered potential master candidates 477 self._SshUpdate(self.new_node.uuid, self.new_node.name, 478 self.new_node.master_candidate, True, 479 self.rpc, self.op.readd, feedback_fn)
480 481
482 -class LUNodeSetParams(LogicalUnit):
483 """Modifies the parameters of a node. 484 485 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 486 to the node role (as _ROLE_*) 487 @cvar _R2F: a dictionary from node role to tuples of flags 488 @cvar _FLAGS: a list of attribute names corresponding to the flags 489 490 """ 491 HPATH = "node-modify" 492 HTYPE = constants.HTYPE_NODE 493 REQ_BGL = False 494 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 495 _F2R = { 496 (True, False, False): _ROLE_CANDIDATE, 497 (False, True, False): _ROLE_DRAINED, 498 (False, False, True): _ROLE_OFFLINE, 499 (False, False, False): _ROLE_REGULAR, 500 } 501 _R2F = dict((v, k) for k, v in _F2R.items()) 502 _FLAGS = ["master_candidate", "drained", "offline"] 503
504 - def CheckArguments(self):
505 (self.op.node_uuid, self.op.node_name) = \ 506 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 507 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 508 self.op.master_capable, self.op.vm_capable, 509 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 510 self.op.disk_state] 511 if all_mods.count(None) == len(all_mods): 512 raise errors.OpPrereqError("Please pass at least one modification", 513 errors.ECODE_INVAL) 514 if all_mods.count(True) > 1: 515 raise errors.OpPrereqError("Can't set the node into more than one" 516 " state at the same time", 517 errors.ECODE_INVAL) 518 519 # Boolean value that tells us whether we might be demoting from MC 520 self.might_demote = (self.op.master_candidate is False or 521 self.op.offline is True or 522 self.op.drained is True or 523 self.op.master_capable is False) 524 525 if self.op.secondary_ip: 526 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 527 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 528 " address" % self.op.secondary_ip, 529 errors.ECODE_INVAL) 530 531 self.lock_all = self.op.auto_promote and self.might_demote 532 self.lock_instances = self.op.secondary_ip is not None
533
534 - def _InstanceFilter(self, instance):
535 """Filter for getting affected instances. 536 537 """ 538 disks = self.cfg.GetInstanceDisks(instance.uuid) 539 any_mirrored = utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR) 540 return (any_mirrored and 541 self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
542
543 - def ExpandNames(self):
544 if self.lock_all: 545 self.needed_locks = { 546 locking.LEVEL_NODE: locking.ALL_SET, 547 } 548 else: 549 self.needed_locks = { 550 locking.LEVEL_NODE: self.op.node_uuid, 551 } 552 553 # Since modifying a node can have severe effects on currently running 554 # operations the resource lock is at least acquired in shared mode 555 self.needed_locks[locking.LEVEL_NODE_RES] = \ 556 self.needed_locks[locking.LEVEL_NODE] 557 558 # Get all locks except nodes in shared mode; they are not used for anything 559 # but read-only access 560 self.share_locks = ShareAll() 561 self.share_locks[locking.LEVEL_NODE] = 0 562 self.share_locks[locking.LEVEL_NODE_RES] = 0 563 564 if self.lock_instances: 565 self.needed_locks[locking.LEVEL_INSTANCE] = \ 566 self.cfg.GetInstanceNames( 567 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
568
569 - def BuildHooksEnv(self):
570 """Build hooks env. 571 572 This runs on the master node. 573 574 """ 575 return { 576 "OP_TARGET": self.op.node_name, 577 "MASTER_CANDIDATE": str(self.op.master_candidate), 578 "OFFLINE": str(self.op.offline), 579 "DRAINED": str(self.op.drained), 580 "MASTER_CAPABLE": str(self.op.master_capable), 581 "VM_CAPABLE": str(self.op.vm_capable), 582 }
583
584 - def BuildHooksNodes(self):
585 """Build hooks nodes. 586 587 """ 588 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 589 return (nl, nl)
590
591 - def CheckPrereq(self):
592 """Check prerequisites. 593 594 This only checks the instance list against the existing names. 595 596 """ 597 node = self.cfg.GetNodeInfo(self.op.node_uuid) 598 if self.lock_instances: 599 affected_instances = \ 600 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 601 602 # Verify instance locks 603 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 604 wanted_instance_names = frozenset([inst.name for inst in 605 affected_instances.values()]) 606 if wanted_instance_names - owned_instance_names: 607 raise errors.OpPrereqError("Instances affected by changing node %s's" 608 " secondary IP address have changed since" 609 " locks were acquired, wanted '%s', have" 610 " '%s'; retry the operation" % 611 (node.name, 612 utils.CommaJoin(wanted_instance_names), 613 utils.CommaJoin(owned_instance_names)), 614 errors.ECODE_STATE) 615 else: 616 affected_instances = None 617 618 if (self.op.master_candidate is not None or 619 self.op.drained is not None or 620 self.op.offline is not None): 621 # we can't change the master's node flags 622 if node.uuid == self.cfg.GetMasterNode(): 623 raise errors.OpPrereqError("The master role can be changed" 624 " only via master-failover", 625 errors.ECODE_INVAL) 626 627 if self.op.master_candidate and not node.master_capable: 628 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 629 " it a master candidate" % node.name, 630 errors.ECODE_STATE) 631 632 if self.op.vm_capable is False: 633 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 634 if ipri or isec: 635 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 636 " the vm_capable flag" % node.name, 637 errors.ECODE_STATE) 638 639 if node.master_candidate and self.might_demote and not self.lock_all: 640 assert not self.op.auto_promote, "auto_promote set but lock_all not" 641 # check if after removing the current node, we're missing master 642 # candidates 643 (mc_remaining, mc_should, _) = \ 644 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 645 if mc_remaining < mc_should: 646 raise errors.OpPrereqError("Not enough master candidates, please" 647 " pass auto promote option to allow" 648 " promotion (--auto-promote or RAPI" 649 " auto_promote=True)", errors.ECODE_STATE) 650 651 self.old_flags = old_flags = (node.master_candidate, 652 node.drained, node.offline) 653 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 654 self.old_role = old_role = self._F2R[old_flags] 655 656 # Check for ineffective changes 657 for attr in self._FLAGS: 658 if getattr(self.op, attr) is False and getattr(node, attr) is False: 659 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 660 setattr(self.op, attr, None) 661 662 # Past this point, any flag change to False means a transition 663 # away from the respective state, as only real changes are kept 664 665 # TODO: We might query the real power state if it supports OOB 666 if SupportsOob(self.cfg, node): 667 if self.op.offline is False and not (node.powered or 668 self.op.powered is True): 669 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 670 " offline status can be reset") % 671 self.op.node_name, errors.ECODE_STATE) 672 elif self.op.powered is not None: 673 raise errors.OpPrereqError(("Unable to change powered state for node %s" 674 " as it does not support out-of-band" 675 " handling") % self.op.node_name, 676 errors.ECODE_STATE) 677 678 # If we're being deofflined/drained, we'll MC ourself if needed 679 if (self.op.drained is False or self.op.offline is False or 680 (self.op.master_capable and not node.master_capable)): 681 if _DecideSelfPromotion(self): 682 self.op.master_candidate = True 683 self.LogInfo("Auto-promoting node to master candidate") 684 685 # If we're no longer master capable, we'll demote ourselves from MC 686 if self.op.master_capable is False and node.master_candidate: 687 if self.op.node_uuid == self.cfg.GetMasterNode(): 688 raise errors.OpPrereqError("Master must remain master capable", 689 errors.ECODE_STATE) 690 self.LogInfo("Demoting from master candidate") 691 self.op.master_candidate = False 692 693 # Compute new role 694 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 695 if self.op.master_candidate: 696 new_role = self._ROLE_CANDIDATE 697 elif self.op.drained: 698 new_role = self._ROLE_DRAINED 699 elif self.op.offline: 700 new_role = self._ROLE_OFFLINE 701 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 702 # False is still in new flags, which means we're un-setting (the 703 # only) True flag 704 new_role = self._ROLE_REGULAR 705 else: # no new flags, nothing, keep old role 706 new_role = old_role 707 708 self.new_role = new_role 709 710 if old_role == self._ROLE_OFFLINE and new_role != old_role: 711 # Trying to transition out of offline status 712 result = self.rpc.call_version([node.uuid])[node.uuid] 713 if result.fail_msg: 714 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 715 " to report its version: %s" % 716 (node.name, result.fail_msg), 717 errors.ECODE_STATE) 718 else: 719 self.LogWarning("Transitioning node from offline to online state" 720 " without using re-add. Please make sure the node" 721 " is healthy!") 722 723 # When changing the secondary ip, verify if this is a single-homed to 724 # multi-homed transition or vice versa, and apply the relevant 725 # restrictions. 726 if self.op.secondary_ip: 727 # Ok even without locking, because this can't be changed by any LU 728 master = self.cfg.GetMasterNodeInfo() 729 master_singlehomed = master.secondary_ip == master.primary_ip 730 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 731 if self.op.force and node.uuid == master.uuid: 732 self.LogWarning("Transitioning from single-homed to multi-homed" 733 " cluster; all nodes will require a secondary IP" 734 " address") 735 else: 736 raise errors.OpPrereqError("Changing the secondary ip on a" 737 " single-homed cluster requires the" 738 " --force option to be passed, and the" 739 " target node to be the master", 740 errors.ECODE_INVAL) 741 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 742 if self.op.force and node.uuid == master.uuid: 743 self.LogWarning("Transitioning from multi-homed to single-homed" 744 " cluster; secondary IP addresses will have to be" 745 " removed") 746 else: 747 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 748 " same as the primary IP on a multi-homed" 749 " cluster, unless the --force option is" 750 " passed, and the target node is the" 751 " master", errors.ECODE_INVAL) 752 753 assert not (set([inst.name for inst in affected_instances.values()]) - 754 self.owned_locks(locking.LEVEL_INSTANCE)) 755 756 if node.offline: 757 if affected_instances: 758 msg = ("Cannot change secondary IP address: offline node has" 759 " instances (%s) configured to use it" % 760 utils.CommaJoin( 761 [inst.name for inst in affected_instances.values()])) 762 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 763 else: 764 # On online nodes, check that no instances are running, and that 765 # the node has the new ip and we can reach it. 766 for instance in affected_instances.values(): 767 CheckInstanceState(self, instance, INSTANCE_DOWN, 768 msg="cannot change secondary ip") 769 770 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 771 if master.uuid != node.uuid: 772 # check reachability from master secondary ip to new secondary ip 773 if not netutils.TcpPing(self.op.secondary_ip, 774 constants.DEFAULT_NODED_PORT, 775 source=master.secondary_ip): 776 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 777 " based ping to node daemon port", 778 errors.ECODE_ENVIRON) 779 780 if self.op.ndparams: 781 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 782 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 783 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 784 "node", "cluster or group") 785 self.new_ndparams = new_ndparams 786 787 if self.op.hv_state: 788 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 789 node.hv_state_static) 790 791 if self.op.disk_state: 792 self.new_disk_state = \ 793 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
794
795 - def Exec(self, feedback_fn):
796 """Modifies a node. 797 798 """ 799 node = self.cfg.GetNodeInfo(self.op.node_uuid) 800 result = [] 801 802 if self.op.ndparams: 803 node.ndparams = self.new_ndparams 804 805 if self.op.powered is not None: 806 node.powered = self.op.powered 807 808 if self.op.hv_state: 809 node.hv_state_static = self.new_hv_state 810 811 if self.op.disk_state: 812 node.disk_state_static = self.new_disk_state 813 814 for attr in ["master_capable", "vm_capable"]: 815 val = getattr(self.op, attr) 816 if val is not None: 817 setattr(node, attr, val) 818 result.append((attr, str(val))) 819 820 if self.op.secondary_ip: 821 node.secondary_ip = self.op.secondary_ip 822 result.append(("secondary_ip", self.op.secondary_ip)) 823 824 # this will trigger configuration file update, if needed 825 self.cfg.Update(node, feedback_fn) 826 master_node = self.cfg.GetMasterNode() 827 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 828 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 829 830 if self.new_role != self.old_role: 831 new_flags = self._R2F[self.new_role] 832 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 833 if of != nf: 834 result.append((desc, str(nf))) 835 (node.master_candidate, node.drained, node.offline) = new_flags 836 self.cfg.Update(node, feedback_fn) 837 838 # Tell the node to demote itself, if no longer MC and not offline. 839 # This must be done only after the configuration is updated so that 840 # it's ensured the node won't receive any further configuration updates. 841 if self.old_role == self._ROLE_CANDIDATE and \ 842 self.new_role != self._ROLE_OFFLINE: 843 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 844 if msg: 845 self.LogWarning("Node failed to demote itself: %s", msg) 846 847 # we locked all nodes, we adjust the CP before updating this node 848 if self.lock_all: 849 AdjustCandidatePool( 850 self, [node.uuid], master_node, potential_master_candidates, 851 feedback_fn, modify_ssh_setup) 852 853 # if node gets promoted, grant RPC priviledges 854 if self.new_role == self._ROLE_CANDIDATE: 855 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid) 856 # if node is demoted, revoke RPC priviledges 857 if self.old_role == self._ROLE_CANDIDATE: 858 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid) 859 860 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid]) 861 862 # this will trigger job queue propagation or cleanup if the mc 863 # flag changed 864 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 865 866 if modify_ssh_setup: 867 if self.old_role == self._ROLE_CANDIDATE: 868 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 869 ssh_result = self.rpc.call_node_ssh_key_remove( 870 [master_node], 871 node.uuid, node.name, 872 master_candidate_uuids, potential_master_candidates, 873 True, # remove node's key from all nodes' authorized_keys file 874 False, # currently, all nodes are potential master candidates 875 False, # do not clear node's 'authorized_keys' 876 False, # do not clear node's 'ganeti_pub_keys' 877 False) # no readd 878 ssh_result[master_node].Raise( 879 "Could not adjust the SSH setup after demoting node '%s'" 880 " (UUID: %s)." % (node.name, node.uuid)) 881 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn) 882 883 if self.new_role == self._ROLE_CANDIDATE: 884 AddMasterCandidateSshKey( 885 self, master_node, node, potential_master_candidates, feedback_fn) 886 887 return result
888 889
890 -class LUNodePowercycle(NoHooksLU):
891 """Powercycles a node. 892 893 """ 894 REQ_BGL = False 895
896 - def CheckArguments(self):
897 (self.op.node_uuid, self.op.node_name) = \ 898 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 899 900 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 901 raise errors.OpPrereqError("The node is the master and the force" 902 " parameter was not set", 903 errors.ECODE_INVAL)
904
905 - def ExpandNames(self):
906 """Locking for PowercycleNode. 907 908 This is a last-resort option and shouldn't block on other 909 jobs. Therefore, we grab no locks. 910 911 """ 912 self.needed_locks = {}
913
914 - def Exec(self, feedback_fn):
915 """Reboots a node. 916 917 """ 918 default_hypervisor = self.cfg.GetHypervisorType() 919 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 920 result = self.rpc.call_node_powercycle(self.op.node_uuid, 921 default_hypervisor, 922 hvparams) 923 result.Raise("Failed to schedule the reboot") 924 return result.payload
925 926
927 -def _GetNodeInstancesInner(cfg, fn):
928 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
929 930
931 -def _GetNodePrimaryInstances(cfg, node_uuid):
932 """Returns primary instances on a node. 933 934 """ 935 return _GetNodeInstancesInner(cfg, 936 lambda inst: node_uuid == inst.primary_node)
937 938
939 -def _GetNodeSecondaryInstances(cfg, node_uuid):
940 """Returns secondary instances on a node. 941 942 """ 943 return _GetNodeInstancesInner(cfg, 944 lambda inst: node_uuid in 945 cfg.GetInstanceSecondaryNodes(inst.uuid))
946 947
948 -def _GetNodeInstances(cfg, node_uuid):
949 """Returns a list of all primary and secondary instances on a node. 950 951 """ 952 953 return _GetNodeInstancesInner(cfg, 954 lambda inst: node_uuid in 955 cfg.GetInstanceNodes(inst.uuid))
956 957
958 -class LUNodeEvacuate(NoHooksLU):
959 """Evacuates instances off a list of nodes. 960 961 """ 962 REQ_BGL = False 963
964 - def CheckArguments(self):
965 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
966
967 - def ExpandNames(self):
968 (self.op.node_uuid, self.op.node_name) = \ 969 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 970 971 if self.op.remote_node is not None: 972 (self.op.remote_node_uuid, self.op.remote_node) = \ 973 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 974 self.op.remote_node) 975 assert self.op.remote_node 976 977 if self.op.node_uuid == self.op.remote_node_uuid: 978 raise errors.OpPrereqError("Can not use evacuated node as a new" 979 " secondary node", errors.ECODE_INVAL) 980 981 if self.op.mode != constants.NODE_EVAC_SEC: 982 raise errors.OpPrereqError("Without the use of an iallocator only" 983 " secondary instances can be evacuated", 984 errors.ECODE_INVAL) 985 986 # Declare locks 987 self.share_locks = ShareAll() 988 self.needed_locks = { 989 locking.LEVEL_INSTANCE: [], 990 locking.LEVEL_NODEGROUP: [], 991 locking.LEVEL_NODE: [], 992 } 993 994 # Determine nodes (via group) optimistically, needs verification once locks 995 # have been acquired 996 self.lock_nodes = self._DetermineNodes()
997
998 - def _DetermineNodes(self):
999 """Gets the list of node UUIDs to operate on. 1000 1001 """ 1002 if self.op.remote_node is None: 1003 # Iallocator will choose any node(s) in the same group 1004 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 1005 else: 1006 group_nodes = frozenset([self.op.remote_node_uuid]) 1007 1008 # Determine nodes to be locked 1009 return set([self.op.node_uuid]) | group_nodes
1010
1011 - def _DetermineInstances(self):
1012 """Builds list of instances to operate on. 1013 1014 """ 1015 assert self.op.mode in constants.NODE_EVAC_MODES 1016 1017 if self.op.mode == constants.NODE_EVAC_PRI: 1018 # Primary instances only 1019 inst_fn = _GetNodePrimaryInstances 1020 assert self.op.remote_node is None, \ 1021 "Evacuating primary instances requires iallocator" 1022 elif self.op.mode == constants.NODE_EVAC_SEC: 1023 # Secondary instances only 1024 inst_fn = _GetNodeSecondaryInstances 1025 else: 1026 # All instances 1027 assert self.op.mode == constants.NODE_EVAC_ALL 1028 inst_fn = _GetNodeInstances 1029 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 1030 # per instance 1031 raise errors.OpPrereqError("Due to an issue with the iallocator" 1032 " interface it is not possible to evacuate" 1033 " all instances at once; specify explicitly" 1034 " whether to evacuate primary or secondary" 1035 " instances", 1036 errors.ECODE_INVAL) 1037 1038 return inst_fn(self.cfg, self.op.node_uuid)
1039
1040 - def DeclareLocks(self, level):
1041 if level == locking.LEVEL_INSTANCE: 1042 # Lock instances optimistically, needs verification once node and group 1043 # locks have been acquired 1044 self.needed_locks[locking.LEVEL_INSTANCE] = \ 1045 set(i.name for i in self._DetermineInstances()) 1046 1047 elif level == locking.LEVEL_NODEGROUP: 1048 # Lock node groups for all potential target nodes optimistically, needs 1049 # verification once nodes have been acquired 1050 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1051 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 1052 1053 elif level == locking.LEVEL_NODE: 1054 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
1055
1056 - def CheckPrereq(self):
1057 # Verify locks 1058 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 1059 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 1060 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1061 1062 need_nodes = self._DetermineNodes() 1063 1064 if not owned_nodes.issuperset(need_nodes): 1065 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 1066 " locks were acquired, current nodes are" 1067 " are '%s', used to be '%s'; retry the" 1068 " operation" % 1069 (self.op.node_name, 1070 utils.CommaJoin(need_nodes), 1071 utils.CommaJoin(owned_nodes)), 1072 errors.ECODE_STATE) 1073 1074 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 1075 if owned_groups != wanted_groups: 1076 raise errors.OpExecError("Node groups changed since locks were acquired," 1077 " current groups are '%s', used to be '%s';" 1078 " retry the operation" % 1079 (utils.CommaJoin(wanted_groups), 1080 utils.CommaJoin(owned_groups))) 1081 1082 # Determine affected instances 1083 self.instances = self._DetermineInstances() 1084 self.instance_names = [i.name for i in self.instances] 1085 1086 if set(self.instance_names) != owned_instance_names: 1087 raise errors.OpExecError("Instances on node '%s' changed since locks" 1088 " were acquired, current instances are '%s'," 1089 " used to be '%s'; retry the operation" % 1090 (self.op.node_name, 1091 utils.CommaJoin(self.instance_names), 1092 utils.CommaJoin(owned_instance_names))) 1093 1094 if self.instance_names: 1095 self.LogInfo("Evacuating instances from node '%s': %s", 1096 self.op.node_name, 1097 utils.CommaJoin(utils.NiceSort(self.instance_names))) 1098 else: 1099 self.LogInfo("No instances to evacuate from node '%s'", 1100 self.op.node_name) 1101 1102 if self.op.remote_node is not None: 1103 for i in self.instances: 1104 if i.primary_node == self.op.remote_node_uuid: 1105 raise errors.OpPrereqError("Node %s is the primary node of" 1106 " instance %s, cannot use it as" 1107 " secondary" % 1108 (self.op.remote_node, i.name), 1109 errors.ECODE_INVAL)
1110
1111 - def Exec(self, feedback_fn):
1112 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 1113 1114 if not self.instance_names: 1115 # No instances to evacuate 1116 jobs = [] 1117 1118 elif self.op.iallocator is not None: 1119 # TODO: Implement relocation to other group 1120 req = iallocator.IAReqNodeEvac( 1121 evac_mode=self.op.mode, instances=list(self.instance_names), 1122 ignore_soft_errors=self.op.ignore_soft_errors) 1123 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1124 1125 ial.Run(self.op.iallocator) 1126 1127 if not ial.success: 1128 raise errors.OpPrereqError("Can't compute node evacuation using" 1129 " iallocator '%s': %s" % 1130 (self.op.iallocator, ial.info), 1131 errors.ECODE_NORES) 1132 1133 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1134 1135 elif self.op.remote_node is not None: 1136 assert self.op.mode == constants.NODE_EVAC_SEC 1137 jobs = [ 1138 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1139 remote_node=self.op.remote_node, 1140 disks=[], 1141 mode=constants.REPLACE_DISK_CHG, 1142 early_release=self.op.early_release)] 1143 for instance_name in self.instance_names] 1144 1145 else: 1146 raise errors.ProgrammerError("No iallocator or remote node") 1147 1148 return ResultWithJobs(jobs)
1149 1150
1151 -class LUNodeMigrate(LogicalUnit):
1152 """Migrate all instances from a node. 1153 1154 """ 1155 HPATH = "node-migrate" 1156 HTYPE = constants.HTYPE_NODE 1157 REQ_BGL = False 1158
1159 - def CheckArguments(self):
1160 pass
1161
1162 - def ExpandNames(self):
1163 (self.op.node_uuid, self.op.node_name) = \ 1164 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1165 1166 self.share_locks = ShareAll() 1167 self.needed_locks = { 1168 locking.LEVEL_NODE: [self.op.node_uuid], 1169 }
1170
1171 - def BuildHooksEnv(self):
1172 """Build hooks env. 1173 1174 This runs on the master, the primary and all the secondaries. 1175 1176 """ 1177 return { 1178 "NODE_NAME": self.op.node_name, 1179 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1180 }
1181
1182 - def BuildHooksNodes(self):
1183 """Build hooks nodes. 1184 1185 """ 1186 nl = [self.cfg.GetMasterNode()] 1187 return (nl, nl)
1188
1189 - def CheckPrereq(self):
1190 pass
1191
1192 - def Exec(self, feedback_fn):
1193 # Prepare jobs for migration instances 1194 jobs = [ 1195 [opcodes.OpInstanceMigrate( 1196 instance_name=inst.name, 1197 mode=self.op.mode, 1198 live=self.op.live, 1199 iallocator=self.op.iallocator, 1200 target_node=self.op.target_node, 1201 allow_runtime_changes=self.op.allow_runtime_changes, 1202 ignore_ipolicy=self.op.ignore_ipolicy)] 1203 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1204 1205 # TODO: Run iallocator in this opcode and pass correct placement options to 1206 # OpInstanceMigrate. Since other jobs can modify the cluster between 1207 # running the iallocator and the actual migration, a good consistency model 1208 # will have to be found. 1209 1210 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1211 frozenset([self.op.node_uuid])) 1212 1213 return ResultWithJobs(jobs)
1214 1215
1216 -def _GetStorageTypeArgs(cfg, storage_type):
1217 """Returns the arguments for a storage type. 1218 1219 """ 1220 # Special case for file storage 1221 1222 if storage_type == constants.ST_FILE: 1223 return [[cfg.GetFileStorageDir()]] 1224 elif storage_type == constants.ST_SHARED_FILE: 1225 return [[cfg.GetSharedFileStorageDir()]] 1226 elif storage_type == constants.ST_GLUSTER: 1227 return [[cfg.GetGlusterStorageDir()]] 1228 else: 1229 return []
1230 1231
1232 -class LUNodeModifyStorage(NoHooksLU):
1233 """Logical unit for modifying a storage volume on a node. 1234 1235 """ 1236 REQ_BGL = False 1237
1238 - def CheckArguments(self):
1239 (self.op.node_uuid, self.op.node_name) = \ 1240 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1241 1242 storage_type = self.op.storage_type 1243 1244 try: 1245 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1246 except KeyError: 1247 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1248 " modified" % storage_type, 1249 errors.ECODE_INVAL) 1250 1251 diff = set(self.op.changes.keys()) - modifiable 1252 if diff: 1253 raise errors.OpPrereqError("The following fields can not be modified for" 1254 " storage units of type '%s': %r" % 1255 (storage_type, list(diff)), 1256 errors.ECODE_INVAL)
1257
1258 - def CheckPrereq(self):
1259 """Check prerequisites. 1260 1261 """ 1262 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1263
1264 - def ExpandNames(self):
1265 self.needed_locks = { 1266 locking.LEVEL_NODE: self.op.node_uuid, 1267 }
1268
1269 - def Exec(self, feedback_fn):
1270 """Computes the list of nodes and their attributes. 1271 1272 """ 1273 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1274 result = self.rpc.call_storage_modify(self.op.node_uuid, 1275 self.op.storage_type, st_args, 1276 self.op.name, self.op.changes) 1277 result.Raise("Failed to modify storage unit '%s' on %s" % 1278 (self.op.name, self.op.node_name))
1279 1280
1281 -def _CheckOutputFields(fields, selected):
1282 """Checks whether all selected fields are valid according to fields. 1283 1284 @type fields: L{utils.FieldSet} 1285 @param fields: fields set 1286 @type selected: L{utils.FieldSet} 1287 @param selected: fields set 1288 1289 """ 1290 delta = fields.NonMatching(selected) 1291 if delta: 1292 raise errors.OpPrereqError("Unknown output fields selected: %s" 1293 % ",".join(delta), errors.ECODE_INVAL)
1294 1295
1296 -class LUNodeQueryvols(NoHooksLU):
1297 """Logical unit for getting volumes on node(s). 1298 1299 """ 1300 REQ_BGL = False 1301
1302 - def CheckArguments(self):
1307
1308 - def ExpandNames(self):
1309 self.share_locks = ShareAll() 1310 1311 if self.op.nodes: 1312 self.needed_locks = { 1313 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1314 } 1315 else: 1316 self.needed_locks = { 1317 locking.LEVEL_NODE: locking.ALL_SET, 1318 }
1319
1320 - def Exec(self, feedback_fn):
1321 """Computes the list of nodes and their attributes. 1322 1323 """ 1324 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1325 volumes = self.rpc.call_node_volumes(node_uuids) 1326 1327 ilist = self.cfg.GetAllInstancesInfo() 1328 vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values()) 1329 1330 output = [] 1331 for node_uuid in node_uuids: 1332 nresult = volumes[node_uuid] 1333 if nresult.offline: 1334 continue 1335 msg = nresult.fail_msg 1336 if msg: 1337 self.LogWarning("Can't compute volume data on node %s: %s", 1338 self.cfg.GetNodeName(node_uuid), msg) 1339 continue 1340 1341 node_vols = sorted(nresult.payload, 1342 key=operator.itemgetter(constants.VF_DEV)) 1343 1344 for vol in node_vols: 1345 node_output = [] 1346 for field in self.op.output_fields: 1347 if field == constants.VF_NODE: 1348 val = self.cfg.GetNodeName(node_uuid) 1349 elif field == constants.VF_PHYS: 1350 val = vol[constants.VF_DEV] 1351 elif field == constants.VF_VG: 1352 val = vol[constants.VF_VG] 1353 elif field == constants.VF_NAME: 1354 val = vol[constants.VF_NAME] 1355 elif field == constants.VF_SIZE: 1356 val = int(float(vol[constants.VF_SIZE])) 1357 elif field == constants.VF_INSTANCE: 1358 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + 1359 vol[constants.VF_NAME]), None) 1360 if inst is not None: 1361 val = inst.name 1362 else: 1363 val = "-" 1364 else: 1365 raise errors.ParameterError(field) 1366 node_output.append(str(val)) 1367 1368 output.append(node_output) 1369 1370 return output
1371 1372
1373 -class LUNodeQueryStorage(NoHooksLU):
1374 """Logical unit for getting information on storage units on node(s). 1375 1376 """ 1377 REQ_BGL = False 1378
1379 - def CheckArguments(self):
1380 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1381 self.op.output_fields)
1382
1383 - def ExpandNames(self):
1384 self.share_locks = ShareAll() 1385 1386 if self.op.nodes: 1387 self.needed_locks = { 1388 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1389 } 1390 else: 1391 self.needed_locks = { 1392 locking.LEVEL_NODE: locking.ALL_SET, 1393 }
1394
1395 - def _DetermineStorageType(self):
1396 """Determines the default storage type of the cluster. 1397 1398 """ 1399 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates 1400 default_storage_type = \ 1401 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] 1402 return default_storage_type
1403
1404 - def CheckPrereq(self):
1405 """Check prerequisites. 1406 1407 """ 1408 if self.op.storage_type: 1409 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1410 self.storage_type = self.op.storage_type 1411 else: 1412 self.storage_type = self._DetermineStorageType() 1413 supported_storage_types = constants.STS_REPORT_NODE_STORAGE 1414 if self.storage_type not in supported_storage_types: 1415 raise errors.OpPrereqError( 1416 "Storage reporting for storage type '%s' is not supported. Please" 1417 " use the --storage-type option to specify one of the supported" 1418 " storage types (%s) or set the default disk template to one that" 1419 " supports storage reporting." % 1420 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1421
1422 - def Exec(self, feedback_fn):
1423 """Computes the list of nodes and their attributes. 1424 1425 """ 1426 if self.op.storage_type: 1427 self.storage_type = self.op.storage_type 1428 else: 1429 self.storage_type = self._DetermineStorageType() 1430 1431 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1432 1433 # Always get name to sort by 1434 if constants.SF_NAME in self.op.output_fields: 1435 fields = self.op.output_fields[:] 1436 else: 1437 fields = [constants.SF_NAME] + self.op.output_fields 1438 1439 # Never ask for node or type as it's only known to the LU 1440 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1441 while extra in fields: 1442 fields.remove(extra) 1443 1444 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1445 name_idx = field_idx[constants.SF_NAME] 1446 1447 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) 1448 data = self.rpc.call_storage_list(self.node_uuids, 1449 self.storage_type, st_args, 1450 self.op.name, fields) 1451 1452 result = [] 1453 1454 for node_uuid in utils.NiceSort(self.node_uuids): 1455 node_name = self.cfg.GetNodeName(node_uuid) 1456 nresult = data[node_uuid] 1457 if nresult.offline: 1458 continue 1459 1460 msg = nresult.fail_msg 1461 if msg: 1462 self.LogWarning("Can't get storage data from node %s: %s", 1463 node_name, msg) 1464 continue 1465 1466 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1467 1468 for name in utils.NiceSort(rows.keys()): 1469 row = rows[name] 1470 1471 out = [] 1472 1473 for field in self.op.output_fields: 1474 if field == constants.SF_NODE: 1475 val = node_name 1476 elif field == constants.SF_TYPE: 1477 val = self.storage_type 1478 elif field in field_idx: 1479 val = row[field_idx[field]] 1480 else: 1481 raise errors.ParameterError(field) 1482 1483 out.append(val) 1484 1485 result.append(out) 1486 1487 return result
1488 1489
1490 -class LUNodeRemove(LogicalUnit):
1491 """Logical unit for removing a node. 1492 1493 """ 1494 HPATH = "node-remove" 1495 HTYPE = constants.HTYPE_NODE 1496
1497 - def BuildHooksEnv(self):
1498 """Build hooks env. 1499 1500 """ 1501 return { 1502 "OP_TARGET": self.op.node_name, 1503 "NODE_NAME": self.op.node_name, 1504 }
1505
1506 - def BuildHooksNodes(self):
1507 """Build hooks nodes. 1508 1509 This doesn't run on the target node in the pre phase as a failed 1510 node would then be impossible to remove. 1511 1512 """ 1513 all_nodes = self.cfg.GetNodeList() 1514 try: 1515 all_nodes.remove(self.op.node_uuid) 1516 except ValueError: 1517 pass 1518 return (all_nodes, all_nodes)
1519
1520 - def CheckPrereq(self):
1521 """Check prerequisites. 1522 1523 This checks: 1524 - the node exists in the configuration 1525 - it does not have primary or secondary instances 1526 - it's not the master 1527 1528 Any errors are signaled by raising errors.OpPrereqError. 1529 1530 """ 1531 (self.op.node_uuid, self.op.node_name) = \ 1532 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1533 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1534 assert node is not None 1535 1536 masternode = self.cfg.GetMasterNode() 1537 if node.uuid == masternode: 1538 raise errors.OpPrereqError("Node is the master node, failover to another" 1539 " node is required", errors.ECODE_INVAL) 1540 1541 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1542 if node.uuid in self.cfg.GetInstanceNodes(instance.uuid): 1543 raise errors.OpPrereqError("Instance %s is still running on the node," 1544 " please remove first" % instance.name, 1545 errors.ECODE_INVAL) 1546 self.op.node_name = node.name 1547 self.node = node
1548
1549 - def Exec(self, feedback_fn):
1550 """Removes the node from the cluster. 1551 1552 """ 1553 logging.info("Stopping the node daemon and removing configs from node %s", 1554 self.node.name) 1555 1556 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1557 1558 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1559 "Not owning BGL" 1560 1561 master_node = self.cfg.GetMasterNode() 1562 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 1563 if modify_ssh_setup: 1564 # retrieve the list of potential master candidates before the node is 1565 # removed 1566 potential_master_candidate = \ 1567 self.op.node_name in potential_master_candidates 1568 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 1569 result = self.rpc.call_node_ssh_key_remove( 1570 [master_node], 1571 self.node.uuid, self.op.node_name, 1572 master_candidate_uuids, potential_master_candidates, 1573 self.node.master_candidate, # from_authorized_keys 1574 potential_master_candidate, # from_public_keys 1575 True, # clear node's 'authorized_keys' 1576 True, # clear node's 'ganeti_public_keys' 1577 False) # no readd 1578 result[master_node].Raise( 1579 "Could not remove the SSH key of node '%s' (UUID: %s)." % 1580 (self.op.node_name, self.node.uuid)) 1581 WarnAboutFailedSshUpdates(result, master_node, feedback_fn) 1582 1583 # Promote nodes to master candidate as needed 1584 AdjustCandidatePool( 1585 self, [self.node.uuid], master_node, potential_master_candidates, 1586 feedback_fn, modify_ssh_setup) 1587 self.cfg.RemoveNode(self.node.uuid) 1588 1589 # Run post hooks on the node before it's removed 1590 RunPostHook(self, self.node.name) 1591 1592 # we have to call this by name rather than by UUID, as the node is no longer 1593 # in the config 1594 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1595 msg = result.fail_msg 1596 if msg: 1597 self.LogWarning("Errors encountered on the remote node while leaving" 1598 " the cluster: %s", msg) 1599 1600 cluster = self.cfg.GetClusterInfo() 1601 1602 # Remove node from candidate certificate list 1603 if self.node.master_candidate: 1604 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid) 1605 1606 # Remove node from our /etc/hosts 1607 if cluster.modify_etc_hosts: 1608 master_node_uuid = self.cfg.GetMasterNode() 1609 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1610 constants.ETC_HOSTS_REMOVE, 1611 self.node.name, None) 1612 result.Raise("Can't update hosts file with new host data") 1613 RedistributeAncillaryFiles(self)
1614 1615
1616 -class LURepairNodeStorage(NoHooksLU):
1617 """Repairs the volume group on a node. 1618 1619 """ 1620 REQ_BGL = False 1621
1622 - def CheckArguments(self):
1623 (self.op.node_uuid, self.op.node_name) = \ 1624 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1625 1626 storage_type = self.op.storage_type 1627 1628 if (constants.SO_FIX_CONSISTENCY not in 1629 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1630 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1631 " repaired" % storage_type, 1632 errors.ECODE_INVAL)
1633
1634 - def ExpandNames(self):
1635 self.needed_locks = { 1636 locking.LEVEL_NODE: [self.op.node_uuid], 1637 }
1638
1639 - def _CheckFaultyDisks(self, instance, node_uuid):
1640 """Ensure faulty disks abort the opcode or at least warn.""" 1641 try: 1642 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1643 node_uuid, True): 1644 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1645 " node '%s'" % 1646 (instance.name, 1647 self.cfg.GetNodeName(node_uuid)), 1648 errors.ECODE_STATE) 1649 except errors.OpPrereqError, err: 1650 if self.op.ignore_consistency: 1651 self.LogWarning(str(err.args[0])) 1652 else: 1653 raise
1654
1655 - def CheckPrereq(self):
1656 """Check prerequisites. 1657 1658 """ 1659 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1660 1661 # Check whether any instance on this node has faulty disks 1662 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1663 if not inst.disks_active: 1664 continue 1665 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid)) 1666 check_nodes.discard(self.op.node_uuid) 1667 for inst_node_uuid in check_nodes: 1668 self._CheckFaultyDisks(inst, inst_node_uuid)
1669
1670 - def Exec(self, feedback_fn):
1671 feedback_fn("Repairing storage unit '%s' on %s ..." % 1672 (self.op.name, self.op.node_name)) 1673 1674 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1675 result = self.rpc.call_storage_execute(self.op.node_uuid, 1676 self.op.storage_type, st_args, 1677 self.op.name, 1678 constants.SO_FIX_CONSISTENCY) 1679 result.Raise("Failed to repair storage unit '%s' on %s" % 1680 (self.op.name, self.op.node_name))
1681