Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with nodes.""" 
  32   
  33  import logging 
  34  import operator 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import netutils 
  40  from ganeti import objects 
  41  from ganeti import opcodes 
  42  import ganeti.rpc.node as rpc 
  43  from ganeti import utils 
  44  from ganeti.masterd import iallocator 
  45   
  46  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
  47  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  48    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  49    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  50    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  51    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  52    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  53    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  54    FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \ 
  55    AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \ 
  56    EnsureKvmdOnNodes, WarnAboutFailedSshUpdates, AddMasterCandidateSshKey 
  57   
  58   
59 -def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate. 61 62 """ 63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 65 # the new node will increase mc_max with one, so: 66 mc_should = min(mc_should + 1, cp_size) 67 return mc_now < mc_should
68 69
70 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip. 72 73 @type lu: L{LogicalUnit} 74 @param lu: the LU on behalf of which we make the check 75 @type node: L{objects.Node} 76 @param node: the node to check 77 @type secondary_ip: string 78 @param secondary_ip: the ip to check 79 @type prereq: boolean 80 @param prereq: whether to throw a prerequisite or an execute error 81 @raise errors.OpPrereqError: if the node doesn't have the ip, 82 and prereq=True 83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 84 85 """ 86 # this can be called with a new node, which has no UUID yet, so perform the 87 # RPC call using its name 88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 89 result.Raise("Failure checking secondary ip on node %s" % node.name, 90 prereq=prereq, ecode=errors.ECODE_ENVIRON) 91 if not result.payload: 92 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 93 " please fix and re-run this command" % secondary_ip) 94 if prereq: 95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 96 else: 97 raise errors.OpExecError(msg)
98 99
100 -class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster. 102 103 """ 104 HPATH = "node-add" 105 HTYPE = constants.HTYPE_NODE 106 _NFLAGS = ["master_capable", "vm_capable"] 107
108 - def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 110 # validate/normalize the node name 111 self.hostname = netutils.GetHostname(name=self.op.node_name, 112 family=self.primary_ip_family) 113 self.op.node_name = self.hostname.name 114 115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 116 raise errors.OpPrereqError("Cannot readd the master node", 117 errors.ECODE_STATE) 118 119 if self.op.readd and self.op.group: 120 raise errors.OpPrereqError("Cannot pass a node group when a node is" 121 " being readded", errors.ECODE_INVAL)
122
123 - def BuildHooksEnv(self):
124 """Build hooks env. 125 126 This will run on all nodes before, and on all nodes + the new node after. 127 128 """ 129 return { 130 "OP_TARGET": self.op.node_name, 131 "NODE_NAME": self.op.node_name, 132 "NODE_PIP": self.op.primary_ip, 133 "NODE_SIP": self.op.secondary_ip, 134 "MASTER_CAPABLE": str(self.op.master_capable), 135 "VM_CAPABLE": str(self.op.vm_capable), 136 }
137
138 - def BuildHooksNodes(self):
139 """Build hooks nodes. 140 141 """ 142 hook_nodes = self.cfg.GetNodeList() 143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 144 if new_node_info is not None: 145 # Exclude added node 146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 147 148 # add the new node as post hook node by name; it does not have an UUID yet 149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 - def HooksAbortCallBack(self, phase, feedback_fn, exception):
155 """Cleans up if the hooks fail. 156 157 This function runs actions that necessary to bring the cluster into a 158 clean state again. This is necessary if for example the hooks of this 159 operation failed and leave the node in an inconsistent state. 160 161 """ 162 if phase == constants.HOOKS_PHASE_PRE: 163 feedback_fn("Pre operation hook failed. Rolling back preparations.") 164 165 master_node = self.cfg.GetMasterNodeInfo().name 166 remove_result = self.rpc.call_node_ssh_key_remove_light( 167 [master_node], 168 self.op.node_name) 169 remove_result[master_node].Raise( 170 "Error removing SSH key of node '%s'." % self.op.node_name)
171
172 - def CheckPrereq(self):
173 """Check prerequisites. 174 175 This checks: 176 - the new node is not already in the config 177 - it is resolvable 178 - its parameters (single/dual homed) matches the cluster 179 180 Any errors are signaled by raising errors.OpPrereqError. 181 182 """ 183 node_name = self.hostname.name 184 self.op.primary_ip = self.hostname.ip 185 if self.op.secondary_ip is None: 186 if self.primary_ip_family == netutils.IP6Address.family: 187 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 188 " IPv4 address must be given as secondary", 189 errors.ECODE_INVAL) 190 self.op.secondary_ip = self.op.primary_ip 191 192 secondary_ip = self.op.secondary_ip 193 if not netutils.IP4Address.IsValid(secondary_ip): 194 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 195 " address" % secondary_ip, errors.ECODE_INVAL) 196 197 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 198 if not self.op.readd and existing_node_info is not None: 199 raise errors.OpPrereqError("Node %s is already in the configuration" % 200 node_name, errors.ECODE_EXISTS) 201 elif self.op.readd and existing_node_info is None: 202 raise errors.OpPrereqError("Node %s is not in the configuration" % 203 node_name, errors.ECODE_NOENT) 204 205 self.changed_primary_ip = False 206 207 for existing_node in self.cfg.GetAllNodesInfo().values(): 208 if self.op.readd and node_name == existing_node.name: 209 if existing_node.secondary_ip != secondary_ip: 210 raise errors.OpPrereqError("Readded node doesn't have the same IP" 211 " address configuration as before", 212 errors.ECODE_INVAL) 213 if existing_node.primary_ip != self.op.primary_ip: 214 self.changed_primary_ip = True 215 216 continue 217 218 if (existing_node.primary_ip == self.op.primary_ip or 219 existing_node.secondary_ip == self.op.primary_ip or 220 existing_node.primary_ip == secondary_ip or 221 existing_node.secondary_ip == secondary_ip): 222 raise errors.OpPrereqError("New node ip address(es) conflict with" 223 " existing node %s" % existing_node.name, 224 errors.ECODE_NOTUNIQUE) 225 226 # After this 'if' block, None is no longer a valid value for the 227 # _capable op attributes 228 if self.op.readd: 229 assert existing_node_info is not None, \ 230 "Can't retrieve locked node %s" % node_name 231 for attr in self._NFLAGS: 232 if getattr(self.op, attr) is None: 233 setattr(self.op, attr, getattr(existing_node_info, attr)) 234 else: 235 for attr in self._NFLAGS: 236 if getattr(self.op, attr) is None: 237 setattr(self.op, attr, True) 238 239 if self.op.readd and not self.op.vm_capable: 240 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 241 if pri or sec: 242 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 243 " flag set to false, but it already holds" 244 " instances" % node_name, 245 errors.ECODE_STATE) 246 247 # check that the type of the node (single versus dual homed) is the 248 # same as for the master 249 myself = self.cfg.GetMasterNodeInfo() 250 master_singlehomed = myself.secondary_ip == myself.primary_ip 251 newbie_singlehomed = secondary_ip == self.op.primary_ip 252 if master_singlehomed != newbie_singlehomed: 253 if master_singlehomed: 254 raise errors.OpPrereqError("The master has no secondary ip but the" 255 " new node has one", 256 errors.ECODE_INVAL) 257 else: 258 raise errors.OpPrereqError("The master has a secondary ip but the" 259 " new node doesn't have one", 260 errors.ECODE_INVAL) 261 262 # checks reachability 263 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 264 raise errors.OpPrereqError("Node not reachable by ping", 265 errors.ECODE_ENVIRON) 266 267 if not newbie_singlehomed: 268 # check reachability from my secondary ip to newbie's secondary ip 269 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 270 source=myself.secondary_ip): 271 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 272 " based ping to node daemon port", 273 errors.ECODE_ENVIRON) 274 275 if self.op.readd: 276 exceptions = [existing_node_info.uuid] 277 else: 278 exceptions = [] 279 280 if self.op.master_capable: 281 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 282 else: 283 self.master_candidate = False 284 285 self.node_group = None 286 if self.op.readd: 287 self.new_node = existing_node_info 288 self.node_group = existing_node_info.group 289 else: 290 self.node_group = self.cfg.LookupNodeGroup(self.op.group) 291 self.new_node = objects.Node(name=node_name, 292 primary_ip=self.op.primary_ip, 293 secondary_ip=secondary_ip, 294 master_candidate=self.master_candidate, 295 offline=False, drained=False, 296 group=self.node_group, ndparams={}) 297 298 if self.op.ndparams: 299 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 300 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 301 "node", "cluster or group") 302 303 if self.op.hv_state: 304 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 305 306 if self.op.disk_state: 307 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 308 309 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 310 # it a property on the base class. 311 rpcrunner = rpc.DnsOnlyRunner() 312 result = rpcrunner.call_version([node_name])[node_name] 313 result.Raise("Can't get version information from node %s" % node_name, 314 prereq=True, ecode=errors.ECODE_ENVIRON) 315 if constants.PROTOCOL_VERSION == result.payload: 316 logging.info("Communication to node %s fine, sw version %s match", 317 node_name, result.payload) 318 else: 319 raise errors.OpPrereqError("Version mismatch master version %s," 320 " node version %s" % 321 (constants.PROTOCOL_VERSION, result.payload), 322 errors.ECODE_ENVIRON) 323 324 vg_name = self.cfg.GetVGName() 325 if vg_name is not None: 326 vparams = {constants.NV_PVLIST: [vg_name]} 327 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 328 cname = self.cfg.GetClusterName() 329 result = rpcrunner.call_node_verify_light( 330 [node_name], vparams, cname, 331 self.cfg.GetClusterInfo().hvparams, 332 )[node_name] 333 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 334 if errmsgs: 335 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 336 "; ".join(errmsgs), errors.ECODE_ENVIRON)
337
338 - def _InitOpenVSwitch(self):
339 filled_ndparams = self.cfg.GetClusterInfo().FillND( 340 self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) 341 342 ovs = filled_ndparams.get(constants.ND_OVS, None) 343 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) 344 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) 345 346 if ovs: 347 if not ovs_link: 348 self.LogInfo("No physical interface for OpenvSwitch was given." 349 " OpenvSwitch will not have an outside connection. This" 350 " might not be what you want.") 351 352 result = self.rpc.call_node_configure_ovs( 353 self.new_node.name, ovs_name, ovs_link) 354 result.Raise("Failed to initialize OpenVSwitch on new node")
355
356 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate, 357 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
358 """Update the SSH setup of all nodes after adding a new node. 359 360 @type readd: boolean 361 @param readd: whether or not this node is readded 362 363 """ 364 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 365 master_node = self.cfg.GetMasterNode() 366 367 if readd: 368 # clear previous keys 369 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 370 remove_result = rpcrunner.call_node_ssh_key_remove( 371 [master_node], 372 new_node_uuid, new_node_name, 373 master_candidate_uuids, 374 potential_master_candidates, 375 True, # from authorized keys 376 True, # from public keys 377 False, # clear authorized keys 378 True, # clear public keys 379 True, # it's a readd 380 self.op.debug, 381 self.op.verbose) 382 remove_result[master_node].Raise( 383 "Could not remove SSH keys of node %s before readding," 384 " (UUID: %s)." % (new_node_name, new_node_uuid)) 385 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn) 386 387 result = rpcrunner.call_node_ssh_key_add( 388 [master_node], new_node_uuid, new_node_name, 389 potential_master_candidates, 390 is_master_candidate, is_potential_master_candidate, 391 is_potential_master_candidate, self.op.debug, self.op.verbose) 392 393 result[master_node].Raise("Could not update the node's SSH setup.") 394 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
395
396 - def Exec(self, feedback_fn):
397 """Adds the new node to the cluster. 398 399 """ 400 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 401 "Not owning BGL" 402 403 # We adding a new node so we assume it's powered 404 self.new_node.powered = True 405 406 # for re-adds, reset the offline/drained/master-candidate flags; 407 # we need to reset here, otherwise offline would prevent RPC calls 408 # later in the procedure; this also means that if the re-add 409 # fails, we are left with a non-offlined, broken node 410 if self.op.readd: 411 self.new_node.offline = False 412 self.new_node.drained = False 413 self.LogInfo("Readding a node, the offline/drained flags were reset") 414 # if we demote the node, we do cleanup later in the procedure 415 self.new_node.master_candidate = self.master_candidate 416 if self.changed_primary_ip: 417 self.new_node.primary_ip = self.op.primary_ip 418 419 # copy the master/vm_capable flags 420 for attr in self._NFLAGS: 421 setattr(self.new_node, attr, getattr(self.op, attr)) 422 423 # notify the user about any possible mc promotion 424 if self.new_node.master_candidate: 425 self.LogInfo("Node will be a master candidate") 426 427 if self.op.ndparams: 428 self.new_node.ndparams = self.op.ndparams 429 else: 430 self.new_node.ndparams = {} 431 432 if self.op.hv_state: 433 self.new_node.hv_state_static = self.new_hv_state 434 435 if self.op.disk_state: 436 self.new_node.disk_state_static = self.new_disk_state 437 438 # Add node to our /etc/hosts, and add key to known_hosts 439 if self.cfg.GetClusterInfo().modify_etc_hosts: 440 master_node = self.cfg.GetMasterNode() 441 result = self.rpc.call_etc_hosts_modify( 442 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 443 self.hostname.ip) 444 result.Raise("Can't update hosts file with new host data") 445 446 if self.new_node.secondary_ip != self.new_node.primary_ip: 447 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 448 False) 449 450 node_verifier_uuids = [self.cfg.GetMasterNode()] 451 node_verify_param = { 452 constants.NV_NODELIST: ([self.new_node.name], {}, []), 453 # TODO: do a node-net-test as well? 454 } 455 456 result = self.rpc.call_node_verify( 457 node_verifier_uuids, node_verify_param, 458 self.cfg.GetClusterName(), 459 self.cfg.GetClusterInfo().hvparams) 460 for verifier in node_verifier_uuids: 461 result[verifier].Raise("Cannot communicate with node %s" % verifier) 462 nl_payload = result[verifier].payload[constants.NV_NODELIST] 463 if nl_payload: 464 for failed in nl_payload: 465 feedback_fn("ssh/hostname verification failed" 466 " (checking from %s): %s" % 467 (verifier, nl_payload[failed])) 468 raise errors.OpExecError("ssh/hostname verification failed") 469 470 self._InitOpenVSwitch() 471 472 if self.op.readd: 473 RedistributeAncillaryFiles(self) 474 # make sure we redistribute the config 475 self.cfg.Update(self.new_node, feedback_fn) 476 # and make sure the new node will not have old files around 477 if not self.new_node.master_candidate: 478 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 479 result.Warn("Node failed to demote itself from master candidate status", 480 self.LogWarning) 481 else: 482 self.cfg.AddNode(self.new_node, self.proc.GetECId()) 483 RedistributeAncillaryFiles(self) 484 485 # We create a new certificate even if the node is readded 486 digest = GetClientCertDigest(self, self.new_node.uuid) 487 if self.new_node.master_candidate: 488 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest) 489 else: 490 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None) 491 492 # Ensure, that kvmd is in the expected state on the added node. 493 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid], 494 silent_stop=True) 495 496 # Update SSH setup of all nodes 497 if self.op.node_setup: 498 # FIXME: so far, all nodes are considered potential master candidates 499 self._SshUpdate(self.new_node.uuid, self.new_node.name, 500 self.new_node.master_candidate, True, 501 self.rpc, self.op.readd, feedback_fn)
502 503
504 -class LUNodeSetParams(LogicalUnit):
505 """Modifies the parameters of a node. 506 507 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 508 to the node role (as _ROLE_*) 509 @cvar _R2F: a dictionary from node role to tuples of flags 510 @cvar _FLAGS: a list of attribute names corresponding to the flags 511 512 """ 513 HPATH = "node-modify" 514 HTYPE = constants.HTYPE_NODE 515 REQ_BGL = False 516 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 517 _F2R = { 518 (True, False, False): _ROLE_CANDIDATE, 519 (False, True, False): _ROLE_DRAINED, 520 (False, False, True): _ROLE_OFFLINE, 521 (False, False, False): _ROLE_REGULAR, 522 } 523 _R2F = dict((v, k) for k, v in _F2R.items()) 524 _FLAGS = ["master_candidate", "drained", "offline"] 525
526 - def CheckArguments(self):
527 (self.op.node_uuid, self.op.node_name) = \ 528 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 529 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 530 self.op.master_capable, self.op.vm_capable, 531 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 532 self.op.disk_state] 533 if all_mods.count(None) == len(all_mods): 534 raise errors.OpPrereqError("Please pass at least one modification", 535 errors.ECODE_INVAL) 536 if all_mods.count(True) > 1: 537 raise errors.OpPrereqError("Can't set the node into more than one" 538 " state at the same time", 539 errors.ECODE_INVAL) 540 541 # Boolean value that tells us whether we might be demoting from MC 542 self.might_demote = (self.op.master_candidate is False or 543 self.op.offline is True or 544 self.op.drained is True or 545 self.op.master_capable is False) 546 547 if self.op.secondary_ip: 548 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 549 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 550 " address" % self.op.secondary_ip, 551 errors.ECODE_INVAL) 552 553 self.lock_all = self.op.auto_promote and self.might_demote 554 self.lock_instances = self.op.secondary_ip is not None
555
556 - def _InstanceFilter(self, instance):
557 """Filter for getting affected instances. 558 559 """ 560 disks = self.cfg.GetInstanceDisks(instance.uuid) 561 any_mirrored = utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR) 562 return (any_mirrored and 563 self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
564
565 - def ExpandNames(self):
566 if self.lock_all: 567 self.needed_locks = { 568 locking.LEVEL_NODE: locking.ALL_SET, 569 } 570 else: 571 self.needed_locks = { 572 locking.LEVEL_NODE: self.op.node_uuid, 573 } 574 575 # Since modifying a node can have severe effects on currently running 576 # operations the resource lock is at least acquired in shared mode 577 self.needed_locks[locking.LEVEL_NODE_RES] = \ 578 self.needed_locks[locking.LEVEL_NODE] 579 580 # Get all locks except nodes in shared mode; they are not used for anything 581 # but read-only access 582 self.share_locks = ShareAll() 583 self.share_locks[locking.LEVEL_NODE] = 0 584 self.share_locks[locking.LEVEL_NODE_RES] = 0 585 586 if self.lock_instances: 587 self.needed_locks[locking.LEVEL_INSTANCE] = \ 588 self.cfg.GetInstanceNames( 589 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
590
591 - def BuildHooksEnv(self):
592 """Build hooks env. 593 594 This runs on the master node. 595 596 """ 597 return { 598 "OP_TARGET": self.op.node_name, 599 "MASTER_CANDIDATE": str(self.op.master_candidate), 600 "OFFLINE": str(self.op.offline), 601 "DRAINED": str(self.op.drained), 602 "MASTER_CAPABLE": str(self.op.master_capable), 603 "VM_CAPABLE": str(self.op.vm_capable), 604 }
605
606 - def BuildHooksNodes(self):
607 """Build hooks nodes. 608 609 """ 610 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 611 return (nl, nl)
612
613 - def CheckPrereq(self):
614 """Check prerequisites. 615 616 This only checks the instance list against the existing names. 617 618 """ 619 node = self.cfg.GetNodeInfo(self.op.node_uuid) 620 if self.lock_instances: 621 affected_instances = \ 622 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 623 624 # Verify instance locks 625 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 626 wanted_instance_names = frozenset([inst.name for inst in 627 affected_instances.values()]) 628 if wanted_instance_names - owned_instance_names: 629 raise errors.OpPrereqError("Instances affected by changing node %s's" 630 " secondary IP address have changed since" 631 " locks were acquired, wanted '%s', have" 632 " '%s'; retry the operation" % 633 (node.name, 634 utils.CommaJoin(wanted_instance_names), 635 utils.CommaJoin(owned_instance_names)), 636 errors.ECODE_STATE) 637 else: 638 affected_instances = None 639 640 if (self.op.master_candidate is not None or 641 self.op.drained is not None or 642 self.op.offline is not None): 643 # we can't change the master's node flags 644 if node.uuid == self.cfg.GetMasterNode(): 645 raise errors.OpPrereqError("The master role can be changed" 646 " only via master-failover", 647 errors.ECODE_INVAL) 648 649 if self.op.master_candidate and not node.master_capable: 650 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 651 " it a master candidate" % node.name, 652 errors.ECODE_STATE) 653 654 if self.op.vm_capable is False: 655 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 656 if ipri or isec: 657 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 658 " the vm_capable flag" % node.name, 659 errors.ECODE_STATE) 660 661 if node.master_candidate and self.might_demote and not self.lock_all: 662 assert not self.op.auto_promote, "auto_promote set but lock_all not" 663 # check if after removing the current node, we're missing master 664 # candidates 665 (mc_remaining, mc_should, _) = \ 666 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 667 if mc_remaining < mc_should: 668 raise errors.OpPrereqError("Not enough master candidates, please" 669 " pass auto promote option to allow" 670 " promotion (--auto-promote or RAPI" 671 " auto_promote=True)", errors.ECODE_STATE) 672 673 self.old_flags = old_flags = (node.master_candidate, 674 node.drained, node.offline) 675 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 676 self.old_role = old_role = self._F2R[old_flags] 677 678 # Check for ineffective changes 679 for attr in self._FLAGS: 680 if getattr(self.op, attr) is False and getattr(node, attr) is False: 681 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 682 setattr(self.op, attr, None) 683 684 # Past this point, any flag change to False means a transition 685 # away from the respective state, as only real changes are kept 686 687 # TODO: We might query the real power state if it supports OOB 688 if SupportsOob(self.cfg, node): 689 if self.op.offline is False and not (node.powered or 690 self.op.powered is True): 691 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 692 " offline status can be reset") % 693 self.op.node_name, errors.ECODE_STATE) 694 elif self.op.powered is not None: 695 raise errors.OpPrereqError(("Unable to change powered state for node %s" 696 " as it does not support out-of-band" 697 " handling") % self.op.node_name, 698 errors.ECODE_STATE) 699 700 # If we're being deofflined/drained, we'll MC ourself if needed 701 if (self.op.drained is False or self.op.offline is False or 702 (self.op.master_capable and not node.master_capable)): 703 if _DecideSelfPromotion(self): 704 self.op.master_candidate = True 705 self.LogInfo("Auto-promoting node to master candidate") 706 707 # If we're no longer master capable, we'll demote ourselves from MC 708 if self.op.master_capable is False and node.master_candidate: 709 if self.op.node_uuid == self.cfg.GetMasterNode(): 710 raise errors.OpPrereqError("Master must remain master capable", 711 errors.ECODE_STATE) 712 self.LogInfo("Demoting from master candidate") 713 self.op.master_candidate = False 714 715 # Compute new role 716 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 717 if self.op.master_candidate: 718 new_role = self._ROLE_CANDIDATE 719 elif self.op.drained: 720 new_role = self._ROLE_DRAINED 721 elif self.op.offline: 722 new_role = self._ROLE_OFFLINE 723 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 724 # False is still in new flags, which means we're un-setting (the 725 # only) True flag 726 new_role = self._ROLE_REGULAR 727 else: # no new flags, nothing, keep old role 728 new_role = old_role 729 730 self.new_role = new_role 731 732 if old_role == self._ROLE_OFFLINE and new_role != old_role: 733 # Trying to transition out of offline status 734 result = self.rpc.call_version([node.uuid])[node.uuid] 735 if result.fail_msg: 736 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 737 " to report its version: %s" % 738 (node.name, result.fail_msg), 739 errors.ECODE_STATE) 740 else: 741 self.LogWarning("Transitioning node from offline to online state" 742 " without using re-add. Please make sure the node" 743 " is healthy!") 744 745 # When changing the secondary ip, verify if this is a single-homed to 746 # multi-homed transition or vice versa, and apply the relevant 747 # restrictions. 748 if self.op.secondary_ip: 749 # Ok even without locking, because this can't be changed by any LU 750 master = self.cfg.GetMasterNodeInfo() 751 master_singlehomed = master.secondary_ip == master.primary_ip 752 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 753 if self.op.force and node.uuid == master.uuid: 754 self.LogWarning("Transitioning from single-homed to multi-homed" 755 " cluster; all nodes will require a secondary IP" 756 " address") 757 else: 758 raise errors.OpPrereqError("Changing the secondary ip on a" 759 " single-homed cluster requires the" 760 " --force option to be passed, and the" 761 " target node to be the master", 762 errors.ECODE_INVAL) 763 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 764 if self.op.force and node.uuid == master.uuid: 765 self.LogWarning("Transitioning from multi-homed to single-homed" 766 " cluster; secondary IP addresses will have to be" 767 " removed") 768 else: 769 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 770 " same as the primary IP on a multi-homed" 771 " cluster, unless the --force option is" 772 " passed, and the target node is the" 773 " master", errors.ECODE_INVAL) 774 775 assert not (set([inst.name for inst in affected_instances.values()]) - 776 self.owned_locks(locking.LEVEL_INSTANCE)) 777 778 if node.offline: 779 if affected_instances: 780 msg = ("Cannot change secondary IP address: offline node has" 781 " instances (%s) configured to use it" % 782 utils.CommaJoin( 783 [inst.name for inst in affected_instances.values()])) 784 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 785 else: 786 # On online nodes, check that no instances are running, and that 787 # the node has the new ip and we can reach it. 788 for instance in affected_instances.values(): 789 CheckInstanceState(self, instance, INSTANCE_DOWN, 790 msg="cannot change secondary ip") 791 792 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 793 if master.uuid != node.uuid: 794 # check reachability from master secondary ip to new secondary ip 795 if not netutils.TcpPing(self.op.secondary_ip, 796 constants.DEFAULT_NODED_PORT, 797 source=master.secondary_ip): 798 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 799 " based ping to node daemon port", 800 errors.ECODE_ENVIRON) 801 802 if self.op.ndparams: 803 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 804 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 805 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 806 "node", "cluster or group") 807 self.new_ndparams = new_ndparams 808 809 if self.op.hv_state: 810 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 811 node.hv_state_static) 812 813 if self.op.disk_state: 814 self.new_disk_state = \ 815 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
816
817 - def Exec(self, feedback_fn):
818 """Modifies a node. 819 820 """ 821 node = self.cfg.GetNodeInfo(self.op.node_uuid) 822 result = [] 823 824 if self.op.ndparams: 825 node.ndparams = self.new_ndparams 826 827 if self.op.powered is not None: 828 node.powered = self.op.powered 829 830 if self.op.hv_state: 831 node.hv_state_static = self.new_hv_state 832 833 if self.op.disk_state: 834 node.disk_state_static = self.new_disk_state 835 836 for attr in ["master_capable", "vm_capable"]: 837 val = getattr(self.op, attr) 838 if val is not None: 839 setattr(node, attr, val) 840 result.append((attr, str(val))) 841 842 if self.op.secondary_ip: 843 node.secondary_ip = self.op.secondary_ip 844 result.append(("secondary_ip", self.op.secondary_ip)) 845 846 # this will trigger configuration file update, if needed 847 self.cfg.Update(node, feedback_fn) 848 master_node = self.cfg.GetMasterNode() 849 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 850 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 851 852 if self.new_role != self.old_role: 853 new_flags = self._R2F[self.new_role] 854 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 855 if of != nf: 856 result.append((desc, str(nf))) 857 (node.master_candidate, node.drained, node.offline) = new_flags 858 self.cfg.Update(node, feedback_fn) 859 860 # Tell the node to demote itself, if no longer MC and not offline. 861 # This must be done only after the configuration is updated so that 862 # it's ensured the node won't receive any further configuration updates. 863 if self.old_role == self._ROLE_CANDIDATE and \ 864 self.new_role != self._ROLE_OFFLINE: 865 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 866 if msg: 867 self.LogWarning("Node failed to demote itself: %s", msg) 868 869 # we locked all nodes, we adjust the CP before updating this node 870 if self.lock_all: 871 AdjustCandidatePool( 872 self, [node.uuid], master_node, potential_master_candidates, 873 feedback_fn, modify_ssh_setup) 874 875 # if node gets promoted, grant RPC priviledges 876 if self.new_role == self._ROLE_CANDIDATE: 877 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid) 878 # if node is demoted, revoke RPC priviledges 879 if self.old_role == self._ROLE_CANDIDATE: 880 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid) 881 882 # KVM configuration never changes here, so disable warnings if KVM disabled. 883 silent_stop = constants.HT_KVM not in \ 884 self.cfg.GetClusterInfo().enabled_hypervisors 885 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid], 886 silent_stop=silent_stop) 887 888 # this will trigger job queue propagation or cleanup if the mc 889 # flag changed 890 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 891 892 if modify_ssh_setup: 893 if self.old_role == self._ROLE_CANDIDATE: 894 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 895 ssh_result = self.rpc.call_node_ssh_key_remove( 896 [master_node], 897 node.uuid, node.name, 898 master_candidate_uuids, potential_master_candidates, 899 True, # remove node's key from all nodes' authorized_keys file 900 False, # currently, all nodes are potential master candidates 901 False, # do not clear node's 'authorized_keys' 902 False, # do not clear node's 'ganeti_pub_keys' 903 False, # no readd 904 self.op.debug, 905 self.op.verbose) 906 ssh_result[master_node].Raise( 907 "Could not adjust the SSH setup after demoting node '%s'" 908 " (UUID: %s)." % (node.name, node.uuid)) 909 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn) 910 911 if self.new_role == self._ROLE_CANDIDATE: 912 AddMasterCandidateSshKey( 913 self, master_node, node, potential_master_candidates, feedback_fn) 914 915 return result
916 917
918 -class LUNodePowercycle(NoHooksLU):
919 """Powercycles a node. 920 921 """ 922 REQ_BGL = False 923
924 - def CheckArguments(self):
925 (self.op.node_uuid, self.op.node_name) = \ 926 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 927 928 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 929 raise errors.OpPrereqError("The node is the master and the force" 930 " parameter was not set", 931 errors.ECODE_INVAL)
932
933 - def ExpandNames(self):
934 """Locking for PowercycleNode. 935 936 This is a last-resort option and shouldn't block on other 937 jobs. Therefore, we grab no locks. 938 939 """ 940 self.needed_locks = {}
941
942 - def Exec(self, feedback_fn):
943 """Reboots a node. 944 945 """ 946 default_hypervisor = self.cfg.GetHypervisorType() 947 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 948 result = self.rpc.call_node_powercycle(self.op.node_uuid, 949 default_hypervisor, 950 hvparams) 951 result.Raise("Failed to schedule the reboot") 952 return result.payload
953 954
955 -def _GetNodeInstancesInner(cfg, fn):
956 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
957 958
959 -def _GetNodePrimaryInstances(cfg, node_uuid):
960 """Returns primary instances on a node. 961 962 """ 963 return _GetNodeInstancesInner(cfg, 964 lambda inst: node_uuid == inst.primary_node)
965 966
967 -def _GetNodeSecondaryInstances(cfg, node_uuid):
968 """Returns secondary instances on a node. 969 970 """ 971 return _GetNodeInstancesInner(cfg, 972 lambda inst: node_uuid in 973 cfg.GetInstanceSecondaryNodes(inst.uuid))
974 975
976 -def _GetNodeInstances(cfg, node_uuid):
977 """Returns a list of all primary and secondary instances on a node. 978 979 """ 980 981 return _GetNodeInstancesInner(cfg, 982 lambda inst: node_uuid in 983 cfg.GetInstanceNodes(inst.uuid))
984 985
986 -class LUNodeEvacuate(NoHooksLU):
987 """Evacuates instances off a list of nodes. 988 989 """ 990 REQ_BGL = False 991
992 - def CheckArguments(self):
993 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
994
995 - def ExpandNames(self):
996 (self.op.node_uuid, self.op.node_name) = \ 997 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 998 999 if self.op.remote_node is not None: 1000 (self.op.remote_node_uuid, self.op.remote_node) = \ 1001 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 1002 self.op.remote_node) 1003 assert self.op.remote_node 1004 1005 if self.op.node_uuid == self.op.remote_node_uuid: 1006 raise errors.OpPrereqError("Can not use evacuated node as a new" 1007 " secondary node", errors.ECODE_INVAL) 1008 1009 if self.op.mode != constants.NODE_EVAC_SEC: 1010 raise errors.OpPrereqError("Without the use of an iallocator only" 1011 " secondary instances can be evacuated", 1012 errors.ECODE_INVAL) 1013 1014 # Declare locks 1015 self.share_locks = ShareAll() 1016 self.needed_locks = { 1017 locking.LEVEL_INSTANCE: [], 1018 locking.LEVEL_NODEGROUP: [], 1019 locking.LEVEL_NODE: [], 1020 } 1021 1022 # Determine nodes (via group) optimistically, needs verification once locks 1023 # have been acquired 1024 self.lock_nodes = self._DetermineNodes()
1025
1026 - def _DetermineNodes(self):
1027 """Gets the list of node UUIDs to operate on. 1028 1029 """ 1030 if self.op.remote_node is None: 1031 # Iallocator will choose any node(s) in the same group 1032 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 1033 else: 1034 group_nodes = frozenset([self.op.remote_node_uuid]) 1035 1036 # Determine nodes to be locked 1037 return set([self.op.node_uuid]) | group_nodes
1038
1039 - def _DetermineInstances(self):
1040 """Builds list of instances to operate on. 1041 1042 """ 1043 assert self.op.mode in constants.NODE_EVAC_MODES 1044 1045 if self.op.mode == constants.NODE_EVAC_PRI: 1046 # Primary instances only 1047 inst_fn = _GetNodePrimaryInstances 1048 assert self.op.remote_node is None, \ 1049 "Evacuating primary instances requires iallocator" 1050 elif self.op.mode == constants.NODE_EVAC_SEC: 1051 # Secondary instances only 1052 inst_fn = _GetNodeSecondaryInstances 1053 else: 1054 # All instances 1055 assert self.op.mode == constants.NODE_EVAC_ALL 1056 inst_fn = _GetNodeInstances 1057 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 1058 # per instance 1059 raise errors.OpPrereqError("Due to an issue with the iallocator" 1060 " interface it is not possible to evacuate" 1061 " all instances at once; specify explicitly" 1062 " whether to evacuate primary or secondary" 1063 " instances", 1064 errors.ECODE_INVAL) 1065 1066 return inst_fn(self.cfg, self.op.node_uuid)
1067
1068 - def DeclareLocks(self, level):
1069 if level == locking.LEVEL_INSTANCE: 1070 # Lock instances optimistically, needs verification once node and group 1071 # locks have been acquired 1072 self.needed_locks[locking.LEVEL_INSTANCE] = \ 1073 set(i.name for i in self._DetermineInstances()) 1074 1075 elif level == locking.LEVEL_NODEGROUP: 1076 # Lock node groups for all potential target nodes optimistically, needs 1077 # verification once nodes have been acquired 1078 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1079 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 1080 1081 elif level == locking.LEVEL_NODE: 1082 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
1083
1084 - def CheckPrereq(self):
1085 # Verify locks 1086 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 1087 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 1088 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1089 1090 need_nodes = self._DetermineNodes() 1091 1092 if not owned_nodes.issuperset(need_nodes): 1093 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 1094 " locks were acquired, current nodes are" 1095 " are '%s', used to be '%s'; retry the" 1096 " operation" % 1097 (self.op.node_name, 1098 utils.CommaJoin(need_nodes), 1099 utils.CommaJoin(owned_nodes)), 1100 errors.ECODE_STATE) 1101 1102 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 1103 if owned_groups != wanted_groups: 1104 raise errors.OpExecError("Node groups changed since locks were acquired," 1105 " current groups are '%s', used to be '%s';" 1106 " retry the operation" % 1107 (utils.CommaJoin(wanted_groups), 1108 utils.CommaJoin(owned_groups))) 1109 1110 # Determine affected instances 1111 self.instances = self._DetermineInstances() 1112 self.instance_names = [i.name for i in self.instances] 1113 1114 if set(self.instance_names) != owned_instance_names: 1115 raise errors.OpExecError("Instances on node '%s' changed since locks" 1116 " were acquired, current instances are '%s'," 1117 " used to be '%s'; retry the operation" % 1118 (self.op.node_name, 1119 utils.CommaJoin(self.instance_names), 1120 utils.CommaJoin(owned_instance_names))) 1121 1122 if self.instance_names: 1123 self.LogInfo("Evacuating instances from node '%s': %s", 1124 self.op.node_name, 1125 utils.CommaJoin(utils.NiceSort(self.instance_names))) 1126 else: 1127 self.LogInfo("No instances to evacuate from node '%s'", 1128 self.op.node_name) 1129 1130 if self.op.remote_node is not None: 1131 for i in self.instances: 1132 if i.primary_node == self.op.remote_node_uuid: 1133 raise errors.OpPrereqError("Node %s is the primary node of" 1134 " instance %s, cannot use it as" 1135 " secondary" % 1136 (self.op.remote_node, i.name), 1137 errors.ECODE_INVAL)
1138
1139 - def Exec(self, feedback_fn):
1140 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 1141 1142 if not self.instance_names: 1143 # No instances to evacuate 1144 jobs = [] 1145 1146 elif self.op.iallocator is not None: 1147 # TODO: Implement relocation to other group 1148 req = iallocator.IAReqNodeEvac( 1149 evac_mode=self.op.mode, instances=list(self.instance_names), 1150 ignore_soft_errors=self.op.ignore_soft_errors) 1151 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1152 1153 ial.Run(self.op.iallocator) 1154 1155 if not ial.success: 1156 raise errors.OpPrereqError("Can't compute node evacuation using" 1157 " iallocator '%s': %s" % 1158 (self.op.iallocator, ial.info), 1159 errors.ECODE_NORES) 1160 1161 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1162 1163 elif self.op.remote_node is not None: 1164 assert self.op.mode == constants.NODE_EVAC_SEC 1165 jobs = [ 1166 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1167 remote_node=self.op.remote_node, 1168 disks=[], 1169 mode=constants.REPLACE_DISK_CHG, 1170 early_release=self.op.early_release)] 1171 for instance_name in self.instance_names] 1172 1173 else: 1174 raise errors.ProgrammerError("No iallocator or remote node") 1175 1176 return ResultWithJobs(jobs)
1177 1178
1179 -class LUNodeMigrate(LogicalUnit):
1180 """Migrate all instances from a node. 1181 1182 """ 1183 HPATH = "node-migrate" 1184 HTYPE = constants.HTYPE_NODE 1185 REQ_BGL = False 1186
1187 - def CheckArguments(self):
1188 pass
1189
1190 - def ExpandNames(self):
1191 (self.op.node_uuid, self.op.node_name) = \ 1192 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1193 1194 self.share_locks = ShareAll() 1195 self.needed_locks = { 1196 locking.LEVEL_NODE: [self.op.node_uuid], 1197 }
1198
1199 - def BuildHooksEnv(self):
1200 """Build hooks env. 1201 1202 This runs on the master, the primary and all the secondaries. 1203 1204 """ 1205 return { 1206 "NODE_NAME": self.op.node_name, 1207 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1208 }
1209
1210 - def BuildHooksNodes(self):
1211 """Build hooks nodes. 1212 1213 """ 1214 nl = [self.cfg.GetMasterNode()] 1215 return (nl, nl)
1216
1217 - def CheckPrereq(self):
1218 pass
1219
1220 - def Exec(self, feedback_fn):
1221 # Prepare jobs for migration instances 1222 jobs = [ 1223 [opcodes.OpInstanceMigrate( 1224 instance_name=inst.name, 1225 mode=self.op.mode, 1226 live=self.op.live, 1227 iallocator=self.op.iallocator, 1228 target_node=self.op.target_node, 1229 allow_runtime_changes=self.op.allow_runtime_changes, 1230 ignore_ipolicy=self.op.ignore_ipolicy)] 1231 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1232 1233 # TODO: Run iallocator in this opcode and pass correct placement options to 1234 # OpInstanceMigrate. Since other jobs can modify the cluster between 1235 # running the iallocator and the actual migration, a good consistency model 1236 # will have to be found. 1237 1238 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1239 frozenset([self.op.node_uuid])) 1240 1241 return ResultWithJobs(jobs)
1242 1243
1244 -def _GetStorageTypeArgs(cfg, storage_type):
1245 """Returns the arguments for a storage type. 1246 1247 """ 1248 # Special case for file storage 1249 1250 if storage_type == constants.ST_FILE: 1251 return [[cfg.GetFileStorageDir()]] 1252 elif storage_type == constants.ST_SHARED_FILE: 1253 return [[cfg.GetSharedFileStorageDir()]] 1254 elif storage_type == constants.ST_GLUSTER: 1255 return [[cfg.GetGlusterStorageDir()]] 1256 else: 1257 return []
1258 1259
1260 -class LUNodeModifyStorage(NoHooksLU):
1261 """Logical unit for modifying a storage volume on a node. 1262 1263 """ 1264 REQ_BGL = False 1265
1266 - def CheckArguments(self):
1267 (self.op.node_uuid, self.op.node_name) = \ 1268 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1269 1270 storage_type = self.op.storage_type 1271 1272 try: 1273 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1274 except KeyError: 1275 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1276 " modified" % storage_type, 1277 errors.ECODE_INVAL) 1278 1279 diff = set(self.op.changes.keys()) - modifiable 1280 if diff: 1281 raise errors.OpPrereqError("The following fields can not be modified for" 1282 " storage units of type '%s': %r" % 1283 (storage_type, list(diff)), 1284 errors.ECODE_INVAL)
1285
1286 - def CheckPrereq(self):
1287 """Check prerequisites. 1288 1289 """ 1290 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1291
1292 - def ExpandNames(self):
1293 self.needed_locks = { 1294 locking.LEVEL_NODE: self.op.node_uuid, 1295 }
1296
1297 - def Exec(self, feedback_fn):
1298 """Computes the list of nodes and their attributes. 1299 1300 """ 1301 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1302 result = self.rpc.call_storage_modify(self.op.node_uuid, 1303 self.op.storage_type, st_args, 1304 self.op.name, self.op.changes) 1305 result.Raise("Failed to modify storage unit '%s' on %s" % 1306 (self.op.name, self.op.node_name))
1307 1308
1309 -def _CheckOutputFields(fields, selected):
1310 """Checks whether all selected fields are valid according to fields. 1311 1312 @type fields: L{utils.FieldSet} 1313 @param fields: fields set 1314 @type selected: L{utils.FieldSet} 1315 @param selected: fields set 1316 1317 """ 1318 delta = fields.NonMatching(selected) 1319 if delta: 1320 raise errors.OpPrereqError("Unknown output fields selected: %s" 1321 % ",".join(delta), errors.ECODE_INVAL)
1322 1323
1324 -class LUNodeQueryvols(NoHooksLU):
1325 """Logical unit for getting volumes on node(s). 1326 1327 """ 1328 REQ_BGL = False 1329
1330 - def CheckArguments(self):
1335
1336 - def ExpandNames(self):
1337 self.share_locks = ShareAll() 1338 1339 if self.op.nodes: 1340 self.needed_locks = { 1341 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1342 } 1343 else: 1344 self.needed_locks = { 1345 locking.LEVEL_NODE: locking.ALL_SET, 1346 }
1347
1348 - def Exec(self, feedback_fn):
1349 """Computes the list of nodes and their attributes. 1350 1351 """ 1352 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1353 volumes = self.rpc.call_node_volumes(node_uuids) 1354 1355 ilist = self.cfg.GetAllInstancesInfo() 1356 vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values()) 1357 1358 output = [] 1359 for node_uuid in node_uuids: 1360 nresult = volumes[node_uuid] 1361 if nresult.offline: 1362 continue 1363 msg = nresult.fail_msg 1364 if msg: 1365 self.LogWarning("Can't compute volume data on node %s: %s", 1366 self.cfg.GetNodeName(node_uuid), msg) 1367 continue 1368 1369 node_vols = sorted(nresult.payload, 1370 key=operator.itemgetter(constants.VF_DEV)) 1371 1372 for vol in node_vols: 1373 node_output = [] 1374 for field in self.op.output_fields: 1375 if field == constants.VF_NODE: 1376 val = self.cfg.GetNodeName(node_uuid) 1377 elif field == constants.VF_PHYS: 1378 val = vol[constants.VF_DEV] 1379 elif field == constants.VF_VG: 1380 val = vol[constants.VF_VG] 1381 elif field == constants.VF_NAME: 1382 val = vol[constants.VF_NAME] 1383 elif field == constants.VF_SIZE: 1384 val = int(float(vol[constants.VF_SIZE])) 1385 elif field == constants.VF_INSTANCE: 1386 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + 1387 vol[constants.VF_NAME]), None) 1388 if inst is not None: 1389 val = inst.name 1390 else: 1391 val = "-" 1392 else: 1393 raise errors.ParameterError(field) 1394 node_output.append(str(val)) 1395 1396 output.append(node_output) 1397 1398 return output
1399 1400
1401 -class LUNodeQueryStorage(NoHooksLU):
1402 """Logical unit for getting information on storage units on node(s). 1403 1404 """ 1405 REQ_BGL = False 1406
1407 - def CheckArguments(self):
1408 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1409 self.op.output_fields)
1410
1411 - def ExpandNames(self):
1412 self.share_locks = ShareAll() 1413 1414 if self.op.nodes: 1415 self.needed_locks = { 1416 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1417 } 1418 else: 1419 self.needed_locks = { 1420 locking.LEVEL_NODE: locking.ALL_SET, 1421 }
1422
1423 - def _DetermineStorageType(self):
1424 """Determines the default storage type of the cluster. 1425 1426 """ 1427 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates 1428 default_storage_type = \ 1429 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] 1430 return default_storage_type
1431
1432 - def CheckPrereq(self):
1433 """Check prerequisites. 1434 1435 """ 1436 if self.op.storage_type: 1437 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1438 self.storage_type = self.op.storage_type 1439 else: 1440 self.storage_type = self._DetermineStorageType() 1441 supported_storage_types = constants.STS_REPORT_NODE_STORAGE 1442 if self.storage_type not in supported_storage_types: 1443 raise errors.OpPrereqError( 1444 "Storage reporting for storage type '%s' is not supported. Please" 1445 " use the --storage-type option to specify one of the supported" 1446 " storage types (%s) or set the default disk template to one that" 1447 " supports storage reporting." % 1448 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1449
1450 - def Exec(self, feedback_fn):
1451 """Computes the list of nodes and their attributes. 1452 1453 """ 1454 if self.op.storage_type: 1455 self.storage_type = self.op.storage_type 1456 else: 1457 self.storage_type = self._DetermineStorageType() 1458 1459 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1460 1461 # Always get name to sort by 1462 if constants.SF_NAME in self.op.output_fields: 1463 fields = self.op.output_fields[:] 1464 else: 1465 fields = [constants.SF_NAME] + self.op.output_fields 1466 1467 # Never ask for node or type as it's only known to the LU 1468 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1469 while extra in fields: 1470 fields.remove(extra) 1471 1472 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1473 name_idx = field_idx[constants.SF_NAME] 1474 1475 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) 1476 data = self.rpc.call_storage_list(self.node_uuids, 1477 self.storage_type, st_args, 1478 self.op.name, fields) 1479 1480 result = [] 1481 1482 for node_uuid in utils.NiceSort(self.node_uuids): 1483 node_name = self.cfg.GetNodeName(node_uuid) 1484 nresult = data[node_uuid] 1485 if nresult.offline: 1486 continue 1487 1488 msg = nresult.fail_msg 1489 if msg: 1490 self.LogWarning("Can't get storage data from node %s: %s", 1491 node_name, msg) 1492 continue 1493 1494 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1495 1496 for name in utils.NiceSort(rows.keys()): 1497 row = rows[name] 1498 1499 out = [] 1500 1501 for field in self.op.output_fields: 1502 if field == constants.SF_NODE: 1503 val = node_name 1504 elif field == constants.SF_TYPE: 1505 val = self.storage_type 1506 elif field in field_idx: 1507 val = row[field_idx[field]] 1508 else: 1509 raise errors.ParameterError(field) 1510 1511 out.append(val) 1512 1513 result.append(out) 1514 1515 return result
1516 1517
1518 -class LUNodeRemove(LogicalUnit):
1519 """Logical unit for removing a node. 1520 1521 """ 1522 HPATH = "node-remove" 1523 HTYPE = constants.HTYPE_NODE 1524
1525 - def BuildHooksEnv(self):
1526 """Build hooks env. 1527 1528 """ 1529 return { 1530 "OP_TARGET": self.op.node_name, 1531 "NODE_NAME": self.op.node_name, 1532 }
1533
1534 - def BuildHooksNodes(self):
1535 """Build hooks nodes. 1536 1537 This doesn't run on the target node in the pre phase as a failed 1538 node would then be impossible to remove. 1539 1540 """ 1541 all_nodes = self.cfg.GetNodeList() 1542 try: 1543 all_nodes.remove(self.op.node_uuid) 1544 except ValueError: 1545 pass 1546 return (all_nodes, all_nodes)
1547
1548 - def CheckPrereq(self):
1549 """Check prerequisites. 1550 1551 This checks: 1552 - the node exists in the configuration 1553 - it does not have primary or secondary instances 1554 - it's not the master 1555 1556 Any errors are signaled by raising errors.OpPrereqError. 1557 1558 """ 1559 (self.op.node_uuid, self.op.node_name) = \ 1560 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1561 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1562 assert node is not None 1563 1564 masternode = self.cfg.GetMasterNode() 1565 if node.uuid == masternode: 1566 raise errors.OpPrereqError("Node is the master node, failover to another" 1567 " node is required", errors.ECODE_INVAL) 1568 1569 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1570 if node.uuid in self.cfg.GetInstanceNodes(instance.uuid): 1571 raise errors.OpPrereqError("Instance %s is still running on the node," 1572 " please remove first" % instance.name, 1573 errors.ECODE_INVAL) 1574 self.op.node_name = node.name 1575 self.node = node
1576
1577 - def Exec(self, feedback_fn):
1578 """Removes the node from the cluster. 1579 1580 """ 1581 logging.info("Stopping the node daemon and removing configs from node %s", 1582 self.node.name) 1583 1584 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1585 1586 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1587 "Not owning BGL" 1588 1589 master_node = self.cfg.GetMasterNode() 1590 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 1591 if modify_ssh_setup: 1592 # retrieve the list of potential master candidates before the node is 1593 # removed 1594 potential_master_candidate = \ 1595 self.op.node_name in potential_master_candidates 1596 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 1597 result = self.rpc.call_node_ssh_key_remove( 1598 [master_node], 1599 self.node.uuid, self.op.node_name, 1600 master_candidate_uuids, potential_master_candidates, 1601 self.node.master_candidate, # from_authorized_keys 1602 potential_master_candidate, # from_public_keys 1603 True, # clear node's 'authorized_keys' 1604 True, # clear node's 'ganeti_public_keys' 1605 False, # no readd 1606 self.op.debug, 1607 self.op.verbose) 1608 result[master_node].Raise( 1609 "Could not remove the SSH key of node '%s' (UUID: %s)." % 1610 (self.op.node_name, self.node.uuid)) 1611 WarnAboutFailedSshUpdates(result, master_node, feedback_fn) 1612 1613 # Promote nodes to master candidate as needed 1614 AdjustCandidatePool( 1615 self, [self.node.uuid], master_node, potential_master_candidates, 1616 feedback_fn, modify_ssh_setup) 1617 self.cfg.RemoveNode(self.node.uuid) 1618 1619 # Run post hooks on the node before it's removed 1620 RunPostHook(self, self.node.uuid) 1621 1622 # we have to call this by name rather than by UUID, as the node is no longer 1623 # in the config 1624 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1625 msg = result.fail_msg 1626 if msg: 1627 self.LogWarning("Errors encountered on the remote node while leaving" 1628 " the cluster: %s", msg) 1629 1630 cluster = self.cfg.GetClusterInfo() 1631 1632 # Remove node from candidate certificate list 1633 if self.node.master_candidate: 1634 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid) 1635 1636 # Remove node from our /etc/hosts 1637 if cluster.modify_etc_hosts: 1638 master_node_uuid = self.cfg.GetMasterNode() 1639 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1640 constants.ETC_HOSTS_REMOVE, 1641 self.node.name, None) 1642 result.Raise("Can't update hosts file with new host data") 1643 RedistributeAncillaryFiles(self)
1644 1645
1646 -class LURepairNodeStorage(NoHooksLU):
1647 """Repairs the volume group on a node. 1648 1649 """ 1650 REQ_BGL = False 1651
1652 - def CheckArguments(self):
1653 (self.op.node_uuid, self.op.node_name) = \ 1654 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1655 1656 storage_type = self.op.storage_type 1657 1658 if (constants.SO_FIX_CONSISTENCY not in 1659 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1660 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1661 " repaired" % storage_type, 1662 errors.ECODE_INVAL)
1663
1664 - def ExpandNames(self):
1665 self.needed_locks = { 1666 locking.LEVEL_NODE: [self.op.node_uuid], 1667 }
1668
1669 - def _CheckFaultyDisks(self, instance, node_uuid):
1670 """Ensure faulty disks abort the opcode or at least warn.""" 1671 try: 1672 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1673 node_uuid, True): 1674 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1675 " node '%s'" % 1676 (instance.name, 1677 self.cfg.GetNodeName(node_uuid)), 1678 errors.ECODE_STATE) 1679 except errors.OpPrereqError, err: 1680 if self.op.ignore_consistency: 1681 self.LogWarning(str(err.args[0])) 1682 else: 1683 raise
1684
1685 - def CheckPrereq(self):
1686 """Check prerequisites. 1687 1688 """ 1689 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1690 1691 # Check whether any instance on this node has faulty disks 1692 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1693 if not inst.disks_active: 1694 continue 1695 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid)) 1696 check_nodes.discard(self.op.node_uuid) 1697 for inst_node_uuid in check_nodes: 1698 self._CheckFaultyDisks(inst, inst_node_uuid)
1699
1700 - def Exec(self, feedback_fn):
1701 feedback_fn("Repairing storage unit '%s' on %s ..." % 1702 (self.op.name, self.op.node_name)) 1703 1704 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1705 result = self.rpc.call_storage_execute(self.op.node_uuid, 1706 self.op.storage_type, st_args, 1707 self.op.name, 1708 constants.SO_FIX_CONSISTENCY) 1709 result.Raise("Failed to repair storage unit '%s' on %s" % 1710 (self.op.name, self.op.node_name))
1711