Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with nodes.""" 
  32   
  33  import logging 
  34  import operator 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import netutils 
  40  from ganeti import objects 
  41  from ganeti import opcodes 
  42  import ganeti.rpc.node as rpc 
  43  from ganeti import utils 
  44  from ganeti.masterd import iallocator 
  45   
  46  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
  47  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  48    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  49    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  50    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  51    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  52    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  53    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  54    FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \ 
  55    AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \ 
  56    EnsureKvmdOnNodes, WarnAboutFailedSshUpdates, AddMasterCandidateSshKey 
  57   
  58   
59 -def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate. 61 62 """ 63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 65 # the new node will increase mc_max with one, so: 66 mc_should = min(mc_should + 1, cp_size) 67 return mc_now < mc_should
68 69
70 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip. 72 73 @type lu: L{LogicalUnit} 74 @param lu: the LU on behalf of which we make the check 75 @type node: L{objects.Node} 76 @param node: the node to check 77 @type secondary_ip: string 78 @param secondary_ip: the ip to check 79 @type prereq: boolean 80 @param prereq: whether to throw a prerequisite or an execute error 81 @raise errors.OpPrereqError: if the node doesn't have the ip, 82 and prereq=True 83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 84 85 """ 86 # this can be called with a new node, which has no UUID yet, so perform the 87 # RPC call using its name 88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 89 result.Raise("Failure checking secondary ip on node %s" % node.name, 90 prereq=prereq, ecode=errors.ECODE_ENVIRON) 91 if not result.payload: 92 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 93 " please fix and re-run this command" % secondary_ip) 94 if prereq: 95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 96 else: 97 raise errors.OpExecError(msg)
98 99
100 -class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster. 102 103 """ 104 HPATH = "node-add" 105 HTYPE = constants.HTYPE_NODE 106 _NFLAGS = ["master_capable", "vm_capable"] 107
108 - def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 110 # validate/normalize the node name 111 self.hostname = netutils.GetHostname(name=self.op.node_name, 112 family=self.primary_ip_family) 113 self.op.node_name = self.hostname.name 114 115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 116 raise errors.OpPrereqError("Cannot readd the master node", 117 errors.ECODE_STATE) 118 119 if self.op.readd and self.op.group: 120 raise errors.OpPrereqError("Cannot pass a node group when a node is" 121 " being readded", errors.ECODE_INVAL)
122
123 - def BuildHooksEnv(self):
124 """Build hooks env. 125 126 This will run on all nodes before, and on all nodes + the new node after. 127 128 """ 129 return { 130 "OP_TARGET": self.op.node_name, 131 "NODE_NAME": self.op.node_name, 132 "NODE_PIP": self.op.primary_ip, 133 "NODE_SIP": self.op.secondary_ip, 134 "MASTER_CAPABLE": str(self.op.master_capable), 135 "VM_CAPABLE": str(self.op.vm_capable), 136 }
137
138 - def BuildHooksNodes(self):
139 """Build hooks nodes. 140 141 """ 142 hook_nodes = self.cfg.GetNodeList() 143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 144 if new_node_info is not None: 145 # Exclude added node 146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 147 148 # add the new node as post hook node by name; it does not have an UUID yet 149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 - def HooksAbortCallBack(self, phase, feedback_fn, exception):
155 """Cleans up if the hooks fail. 156 157 This function runs actions that necessary to bring the cluster into a 158 clean state again. This is necessary if for example the hooks of this 159 operation failed and leave the node in an inconsistent state. 160 161 """ 162 if phase == constants.HOOKS_PHASE_PRE: 163 feedback_fn("Pre operation hook failed. Rolling back preparations.") 164 165 master_node = self.cfg.GetMasterNodeInfo().name 166 remove_result = self.rpc.call_node_ssh_key_remove_light( 167 [master_node], 168 self.op.node_name) 169 remove_result[master_node].Raise( 170 "Error removing SSH key of node '%s'." % self.op.node_name)
171
172 - def CheckPrereq(self):
173 """Check prerequisites. 174 175 This checks: 176 - the new node is not already in the config 177 - it is resolvable 178 - its parameters (single/dual homed) matches the cluster 179 180 Any errors are signaled by raising errors.OpPrereqError. 181 182 """ 183 node_name = self.hostname.name 184 self.op.primary_ip = self.hostname.ip 185 if self.op.secondary_ip is None: 186 if self.primary_ip_family == netutils.IP6Address.family: 187 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 188 " IPv4 address must be given as secondary", 189 errors.ECODE_INVAL) 190 self.op.secondary_ip = self.op.primary_ip 191 192 secondary_ip = self.op.secondary_ip 193 if not netutils.IP4Address.IsValid(secondary_ip): 194 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 195 " address" % secondary_ip, errors.ECODE_INVAL) 196 197 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 198 if not self.op.readd and existing_node_info is not None: 199 raise errors.OpPrereqError("Node %s is already in the configuration" % 200 node_name, errors.ECODE_EXISTS) 201 elif self.op.readd and existing_node_info is None: 202 raise errors.OpPrereqError("Node %s is not in the configuration" % 203 node_name, errors.ECODE_NOENT) 204 205 self.changed_primary_ip = False 206 207 for existing_node in self.cfg.GetAllNodesInfo().values(): 208 if self.op.readd and node_name == existing_node.name: 209 if existing_node.secondary_ip != secondary_ip: 210 raise errors.OpPrereqError("Readded node doesn't have the same IP" 211 " address configuration as before", 212 errors.ECODE_INVAL) 213 if existing_node.primary_ip != self.op.primary_ip: 214 self.changed_primary_ip = True 215 216 continue 217 218 if (existing_node.primary_ip == self.op.primary_ip or 219 existing_node.secondary_ip == self.op.primary_ip or 220 existing_node.primary_ip == secondary_ip or 221 existing_node.secondary_ip == secondary_ip): 222 raise errors.OpPrereqError("New node ip address(es) conflict with" 223 " existing node %s" % existing_node.name, 224 errors.ECODE_NOTUNIQUE) 225 226 # After this 'if' block, None is no longer a valid value for the 227 # _capable op attributes 228 if self.op.readd: 229 assert existing_node_info is not None, \ 230 "Can't retrieve locked node %s" % node_name 231 for attr in self._NFLAGS: 232 if getattr(self.op, attr) is None: 233 setattr(self.op, attr, getattr(existing_node_info, attr)) 234 else: 235 for attr in self._NFLAGS: 236 if getattr(self.op, attr) is None: 237 setattr(self.op, attr, True) 238 239 if self.op.readd and not self.op.vm_capable: 240 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 241 if pri or sec: 242 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 243 " flag set to false, but it already holds" 244 " instances" % node_name, 245 errors.ECODE_STATE) 246 247 # check that the type of the node (single versus dual homed) is the 248 # same as for the master 249 myself = self.cfg.GetMasterNodeInfo() 250 master_singlehomed = myself.secondary_ip == myself.primary_ip 251 newbie_singlehomed = secondary_ip == self.op.primary_ip 252 if master_singlehomed != newbie_singlehomed: 253 if master_singlehomed: 254 raise errors.OpPrereqError("The master has no secondary ip but the" 255 " new node has one", 256 errors.ECODE_INVAL) 257 else: 258 raise errors.OpPrereqError("The master has a secondary ip but the" 259 " new node doesn't have one", 260 errors.ECODE_INVAL) 261 262 # checks reachability 263 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 264 raise errors.OpPrereqError("Node not reachable by ping", 265 errors.ECODE_ENVIRON) 266 267 if not newbie_singlehomed: 268 # check reachability from my secondary ip to newbie's secondary ip 269 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 270 source=myself.secondary_ip): 271 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 272 " based ping to node daemon port", 273 errors.ECODE_ENVIRON) 274 275 if self.op.readd: 276 exceptions = [existing_node_info.uuid] 277 else: 278 exceptions = [] 279 280 if self.op.master_capable: 281 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 282 else: 283 self.master_candidate = False 284 285 self.node_group = None 286 if self.op.readd: 287 self.new_node = existing_node_info 288 self.node_group = existing_node_info.group 289 else: 290 self.node_group = self.cfg.LookupNodeGroup(self.op.group) 291 self.new_node = objects.Node(name=node_name, 292 primary_ip=self.op.primary_ip, 293 secondary_ip=secondary_ip, 294 master_candidate=self.master_candidate, 295 offline=False, drained=False, 296 group=self.node_group, ndparams={}) 297 298 if self.op.ndparams: 299 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 300 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 301 "node", "cluster or group") 302 303 if self.op.hv_state: 304 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 305 306 if self.op.disk_state: 307 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 308 309 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 310 # it a property on the base class. 311 rpcrunner = rpc.DnsOnlyRunner() 312 result = rpcrunner.call_version([node_name])[node_name] 313 result.Raise("Can't get version information from node %s" % node_name, 314 prereq=True, ecode=errors.ECODE_ENVIRON) 315 if constants.PROTOCOL_VERSION == result.payload: 316 logging.info("Communication to node %s fine, sw version %s match", 317 node_name, result.payload) 318 else: 319 raise errors.OpPrereqError("Version mismatch master version %s," 320 " node version %s" % 321 (constants.PROTOCOL_VERSION, result.payload), 322 errors.ECODE_ENVIRON) 323 324 vg_name = self.cfg.GetVGName() 325 if vg_name is not None: 326 vparams = {constants.NV_PVLIST: [vg_name]} 327 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 328 cname = self.cfg.GetClusterName() 329 result = rpcrunner.call_node_verify_light( 330 [node_name], vparams, cname, 331 self.cfg.GetClusterInfo().hvparams, 332 )[node_name] 333 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 334 if errmsgs: 335 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 336 "; ".join(errmsgs), errors.ECODE_ENVIRON)
337
338 - def _InitOpenVSwitch(self):
339 filled_ndparams = self.cfg.GetClusterInfo().FillND( 340 self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) 341 342 ovs = filled_ndparams.get(constants.ND_OVS, None) 343 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) 344 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) 345 346 if ovs: 347 if not ovs_link: 348 self.LogInfo("No physical interface for OpenvSwitch was given." 349 " OpenvSwitch will not have an outside connection. This" 350 " might not be what you want.") 351 352 result = self.rpc.call_node_configure_ovs( 353 self.new_node.name, ovs_name, ovs_link) 354 result.Raise("Failed to initialize OpenVSwitch on new node")
355
356 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate, 357 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
358 """Update the SSH setup of all nodes after adding a new node. 359 360 @type readd: boolean 361 @param readd: whether or not this node is readded 362 363 """ 364 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 365 master_node = self.cfg.GetMasterNode() 366 367 if readd: 368 # clear previous keys 369 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 370 remove_result = rpcrunner.call_node_ssh_key_remove( 371 [master_node], 372 new_node_uuid, new_node_name, 373 master_candidate_uuids, 374 potential_master_candidates, 375 True, # from authorized keys 376 True, # from public keys 377 False, # clear authorized keys 378 True, # clear public keys 379 True, # it's a readd 380 self.op.debug, 381 self.op.verbose) 382 remove_result[master_node].Raise( 383 "Could not remove SSH keys of node %s before readding," 384 " (UUID: %s)." % (new_node_name, new_node_uuid)) 385 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn) 386 387 result = rpcrunner.call_node_ssh_key_add( 388 [master_node], new_node_uuid, new_node_name, 389 potential_master_candidates, 390 is_master_candidate, is_potential_master_candidate, 391 is_potential_master_candidate, self.op.debug, self.op.verbose) 392 393 result[master_node].Raise("Could not update the node's SSH setup.") 394 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
395
396 - def Exec(self, feedback_fn):
397 """Adds the new node to the cluster. 398 399 """ 400 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 401 "Not owning BGL" 402 403 # We adding a new node so we assume it's powered 404 self.new_node.powered = True 405 406 # for re-adds, reset the offline/drained/master-candidate flags; 407 # we need to reset here, otherwise offline would prevent RPC calls 408 # later in the procedure; this also means that if the re-add 409 # fails, we are left with a non-offlined, broken node 410 if self.op.readd: 411 self.new_node.offline = False 412 self.new_node.drained = False 413 self.LogInfo("Readding a node, the offline/drained flags were reset") 414 # if we demote the node, we do cleanup later in the procedure 415 self.new_node.master_candidate = self.master_candidate 416 if self.changed_primary_ip: 417 self.new_node.primary_ip = self.op.primary_ip 418 419 # copy the master/vm_capable flags 420 for attr in self._NFLAGS: 421 setattr(self.new_node, attr, getattr(self.op, attr)) 422 423 # notify the user about any possible mc promotion 424 if self.new_node.master_candidate: 425 self.LogInfo("Node will be a master candidate") 426 427 if self.op.ndparams: 428 self.new_node.ndparams = self.op.ndparams 429 else: 430 self.new_node.ndparams = {} 431 432 if self.op.hv_state: 433 self.new_node.hv_state_static = self.new_hv_state 434 435 if self.op.disk_state: 436 self.new_node.disk_state_static = self.new_disk_state 437 438 # Add node to our /etc/hosts, and add key to known_hosts 439 if self.cfg.GetClusterInfo().modify_etc_hosts: 440 master_node = self.cfg.GetMasterNode() 441 result = self.rpc.call_etc_hosts_modify( 442 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 443 self.hostname.ip) 444 result.Raise("Can't update hosts file with new host data") 445 446 if self.new_node.secondary_ip != self.new_node.primary_ip: 447 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 448 False) 449 450 node_verifier_uuids = [self.cfg.GetMasterNode()] 451 node_verify_param = { 452 constants.NV_NODELIST: ([self.new_node.name], {}, []), 453 # TODO: do a node-net-test as well? 454 } 455 456 result = self.rpc.call_node_verify( 457 node_verifier_uuids, node_verify_param, 458 self.cfg.GetClusterName(), 459 self.cfg.GetClusterInfo().hvparams) 460 for verifier in node_verifier_uuids: 461 result[verifier].Raise("Cannot communicate with node %s" % verifier) 462 nl_payload = result[verifier].payload[constants.NV_NODELIST] 463 if nl_payload: 464 for failed in nl_payload: 465 feedback_fn("ssh/hostname verification failed" 466 " (checking from %s): %s" % 467 (verifier, nl_payload[failed])) 468 raise errors.OpExecError("ssh/hostname verification failed") 469 470 self._InitOpenVSwitch() 471 472 if self.op.readd: 473 RedistributeAncillaryFiles(self) 474 # make sure we redistribute the config 475 self.cfg.Update(self.new_node, feedback_fn) 476 # and make sure the new node will not have old files around 477 if not self.new_node.master_candidate: 478 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 479 result.Warn("Node failed to demote itself from master candidate status", 480 self.LogWarning) 481 else: 482 self.cfg.AddNode(self.new_node, self.proc.GetECId()) 483 RedistributeAncillaryFiles(self) 484 485 # We create a new certificate even if the node is readded 486 digest = GetClientCertDigest(self, self.new_node.uuid) 487 if self.new_node.master_candidate: 488 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest) 489 else: 490 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None) 491 492 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid]) 493 494 # Update SSH setup of all nodes 495 if self.op.node_setup: 496 # FIXME: so far, all nodes are considered potential master candidates 497 self._SshUpdate(self.new_node.uuid, self.new_node.name, 498 self.new_node.master_candidate, True, 499 self.rpc, self.op.readd, feedback_fn)
500 501
502 -class LUNodeSetParams(LogicalUnit):
503 """Modifies the parameters of a node. 504 505 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 506 to the node role (as _ROLE_*) 507 @cvar _R2F: a dictionary from node role to tuples of flags 508 @cvar _FLAGS: a list of attribute names corresponding to the flags 509 510 """ 511 HPATH = "node-modify" 512 HTYPE = constants.HTYPE_NODE 513 REQ_BGL = False 514 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 515 _F2R = { 516 (True, False, False): _ROLE_CANDIDATE, 517 (False, True, False): _ROLE_DRAINED, 518 (False, False, True): _ROLE_OFFLINE, 519 (False, False, False): _ROLE_REGULAR, 520 } 521 _R2F = dict((v, k) for k, v in _F2R.items()) 522 _FLAGS = ["master_candidate", "drained", "offline"] 523
524 - def CheckArguments(self):
525 (self.op.node_uuid, self.op.node_name) = \ 526 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 527 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 528 self.op.master_capable, self.op.vm_capable, 529 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 530 self.op.disk_state] 531 if all_mods.count(None) == len(all_mods): 532 raise errors.OpPrereqError("Please pass at least one modification", 533 errors.ECODE_INVAL) 534 if all_mods.count(True) > 1: 535 raise errors.OpPrereqError("Can't set the node into more than one" 536 " state at the same time", 537 errors.ECODE_INVAL) 538 539 # Boolean value that tells us whether we might be demoting from MC 540 self.might_demote = (self.op.master_candidate is False or 541 self.op.offline is True or 542 self.op.drained is True or 543 self.op.master_capable is False) 544 545 if self.op.secondary_ip: 546 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 547 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 548 " address" % self.op.secondary_ip, 549 errors.ECODE_INVAL) 550 551 self.lock_all = self.op.auto_promote and self.might_demote 552 self.lock_instances = self.op.secondary_ip is not None
553
554 - def _InstanceFilter(self, instance):
555 """Filter for getting affected instances. 556 557 """ 558 disks = self.cfg.GetInstanceDisks(instance.uuid) 559 any_mirrored = utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR) 560 return (any_mirrored and 561 self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
562
563 - def ExpandNames(self):
564 if self.lock_all: 565 self.needed_locks = { 566 locking.LEVEL_NODE: locking.ALL_SET, 567 } 568 else: 569 self.needed_locks = { 570 locking.LEVEL_NODE: self.op.node_uuid, 571 } 572 573 # Since modifying a node can have severe effects on currently running 574 # operations the resource lock is at least acquired in shared mode 575 self.needed_locks[locking.LEVEL_NODE_RES] = \ 576 self.needed_locks[locking.LEVEL_NODE] 577 578 # Get all locks except nodes in shared mode; they are not used for anything 579 # but read-only access 580 self.share_locks = ShareAll() 581 self.share_locks[locking.LEVEL_NODE] = 0 582 self.share_locks[locking.LEVEL_NODE_RES] = 0 583 584 if self.lock_instances: 585 self.needed_locks[locking.LEVEL_INSTANCE] = \ 586 self.cfg.GetInstanceNames( 587 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
588
589 - def BuildHooksEnv(self):
590 """Build hooks env. 591 592 This runs on the master node. 593 594 """ 595 return { 596 "OP_TARGET": self.op.node_name, 597 "MASTER_CANDIDATE": str(self.op.master_candidate), 598 "OFFLINE": str(self.op.offline), 599 "DRAINED": str(self.op.drained), 600 "MASTER_CAPABLE": str(self.op.master_capable), 601 "VM_CAPABLE": str(self.op.vm_capable), 602 }
603
604 - def BuildHooksNodes(self):
605 """Build hooks nodes. 606 607 """ 608 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 609 return (nl, nl)
610
611 - def CheckPrereq(self):
612 """Check prerequisites. 613 614 This only checks the instance list against the existing names. 615 616 """ 617 node = self.cfg.GetNodeInfo(self.op.node_uuid) 618 if self.lock_instances: 619 affected_instances = \ 620 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 621 622 # Verify instance locks 623 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 624 wanted_instance_names = frozenset([inst.name for inst in 625 affected_instances.values()]) 626 if wanted_instance_names - owned_instance_names: 627 raise errors.OpPrereqError("Instances affected by changing node %s's" 628 " secondary IP address have changed since" 629 " locks were acquired, wanted '%s', have" 630 " '%s'; retry the operation" % 631 (node.name, 632 utils.CommaJoin(wanted_instance_names), 633 utils.CommaJoin(owned_instance_names)), 634 errors.ECODE_STATE) 635 else: 636 affected_instances = None 637 638 if (self.op.master_candidate is not None or 639 self.op.drained is not None or 640 self.op.offline is not None): 641 # we can't change the master's node flags 642 if node.uuid == self.cfg.GetMasterNode(): 643 raise errors.OpPrereqError("The master role can be changed" 644 " only via master-failover", 645 errors.ECODE_INVAL) 646 647 if self.op.master_candidate and not node.master_capable: 648 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 649 " it a master candidate" % node.name, 650 errors.ECODE_STATE) 651 652 if self.op.vm_capable is False: 653 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 654 if ipri or isec: 655 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 656 " the vm_capable flag" % node.name, 657 errors.ECODE_STATE) 658 659 if node.master_candidate and self.might_demote and not self.lock_all: 660 assert not self.op.auto_promote, "auto_promote set but lock_all not" 661 # check if after removing the current node, we're missing master 662 # candidates 663 (mc_remaining, mc_should, _) = \ 664 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 665 if mc_remaining < mc_should: 666 raise errors.OpPrereqError("Not enough master candidates, please" 667 " pass auto promote option to allow" 668 " promotion (--auto-promote or RAPI" 669 " auto_promote=True)", errors.ECODE_STATE) 670 671 self.old_flags = old_flags = (node.master_candidate, 672 node.drained, node.offline) 673 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 674 self.old_role = old_role = self._F2R[old_flags] 675 676 # Check for ineffective changes 677 for attr in self._FLAGS: 678 if getattr(self.op, attr) is False and getattr(node, attr) is False: 679 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 680 setattr(self.op, attr, None) 681 682 # Past this point, any flag change to False means a transition 683 # away from the respective state, as only real changes are kept 684 685 # TODO: We might query the real power state if it supports OOB 686 if SupportsOob(self.cfg, node): 687 if self.op.offline is False and not (node.powered or 688 self.op.powered is True): 689 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 690 " offline status can be reset") % 691 self.op.node_name, errors.ECODE_STATE) 692 elif self.op.powered is not None: 693 raise errors.OpPrereqError(("Unable to change powered state for node %s" 694 " as it does not support out-of-band" 695 " handling") % self.op.node_name, 696 errors.ECODE_STATE) 697 698 # If we're being deofflined/drained, we'll MC ourself if needed 699 if (self.op.drained is False or self.op.offline is False or 700 (self.op.master_capable and not node.master_capable)): 701 if _DecideSelfPromotion(self): 702 self.op.master_candidate = True 703 self.LogInfo("Auto-promoting node to master candidate") 704 705 # If we're no longer master capable, we'll demote ourselves from MC 706 if self.op.master_capable is False and node.master_candidate: 707 if self.op.node_uuid == self.cfg.GetMasterNode(): 708 raise errors.OpPrereqError("Master must remain master capable", 709 errors.ECODE_STATE) 710 self.LogInfo("Demoting from master candidate") 711 self.op.master_candidate = False 712 713 # Compute new role 714 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 715 if self.op.master_candidate: 716 new_role = self._ROLE_CANDIDATE 717 elif self.op.drained: 718 new_role = self._ROLE_DRAINED 719 elif self.op.offline: 720 new_role = self._ROLE_OFFLINE 721 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 722 # False is still in new flags, which means we're un-setting (the 723 # only) True flag 724 new_role = self._ROLE_REGULAR 725 else: # no new flags, nothing, keep old role 726 new_role = old_role 727 728 self.new_role = new_role 729 730 if old_role == self._ROLE_OFFLINE and new_role != old_role: 731 # Trying to transition out of offline status 732 result = self.rpc.call_version([node.uuid])[node.uuid] 733 if result.fail_msg: 734 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 735 " to report its version: %s" % 736 (node.name, result.fail_msg), 737 errors.ECODE_STATE) 738 else: 739 self.LogWarning("Transitioning node from offline to online state" 740 " without using re-add. Please make sure the node" 741 " is healthy!") 742 743 # When changing the secondary ip, verify if this is a single-homed to 744 # multi-homed transition or vice versa, and apply the relevant 745 # restrictions. 746 if self.op.secondary_ip: 747 # Ok even without locking, because this can't be changed by any LU 748 master = self.cfg.GetMasterNodeInfo() 749 master_singlehomed = master.secondary_ip == master.primary_ip 750 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 751 if self.op.force and node.uuid == master.uuid: 752 self.LogWarning("Transitioning from single-homed to multi-homed" 753 " cluster; all nodes will require a secondary IP" 754 " address") 755 else: 756 raise errors.OpPrereqError("Changing the secondary ip on a" 757 " single-homed cluster requires the" 758 " --force option to be passed, and the" 759 " target node to be the master", 760 errors.ECODE_INVAL) 761 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 762 if self.op.force and node.uuid == master.uuid: 763 self.LogWarning("Transitioning from multi-homed to single-homed" 764 " cluster; secondary IP addresses will have to be" 765 " removed") 766 else: 767 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 768 " same as the primary IP on a multi-homed" 769 " cluster, unless the --force option is" 770 " passed, and the target node is the" 771 " master", errors.ECODE_INVAL) 772 773 assert not (set([inst.name for inst in affected_instances.values()]) - 774 self.owned_locks(locking.LEVEL_INSTANCE)) 775 776 if node.offline: 777 if affected_instances: 778 msg = ("Cannot change secondary IP address: offline node has" 779 " instances (%s) configured to use it" % 780 utils.CommaJoin( 781 [inst.name for inst in affected_instances.values()])) 782 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 783 else: 784 # On online nodes, check that no instances are running, and that 785 # the node has the new ip and we can reach it. 786 for instance in affected_instances.values(): 787 CheckInstanceState(self, instance, INSTANCE_DOWN, 788 msg="cannot change secondary ip") 789 790 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 791 if master.uuid != node.uuid: 792 # check reachability from master secondary ip to new secondary ip 793 if not netutils.TcpPing(self.op.secondary_ip, 794 constants.DEFAULT_NODED_PORT, 795 source=master.secondary_ip): 796 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 797 " based ping to node daemon port", 798 errors.ECODE_ENVIRON) 799 800 if self.op.ndparams: 801 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 802 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 803 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 804 "node", "cluster or group") 805 self.new_ndparams = new_ndparams 806 807 if self.op.hv_state: 808 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 809 node.hv_state_static) 810 811 if self.op.disk_state: 812 self.new_disk_state = \ 813 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
814
815 - def Exec(self, feedback_fn):
816 """Modifies a node. 817 818 """ 819 node = self.cfg.GetNodeInfo(self.op.node_uuid) 820 result = [] 821 822 if self.op.ndparams: 823 node.ndparams = self.new_ndparams 824 825 if self.op.powered is not None: 826 node.powered = self.op.powered 827 828 if self.op.hv_state: 829 node.hv_state_static = self.new_hv_state 830 831 if self.op.disk_state: 832 node.disk_state_static = self.new_disk_state 833 834 for attr in ["master_capable", "vm_capable"]: 835 val = getattr(self.op, attr) 836 if val is not None: 837 setattr(node, attr, val) 838 result.append((attr, str(val))) 839 840 if self.op.secondary_ip: 841 node.secondary_ip = self.op.secondary_ip 842 result.append(("secondary_ip", self.op.secondary_ip)) 843 844 # this will trigger configuration file update, if needed 845 self.cfg.Update(node, feedback_fn) 846 master_node = self.cfg.GetMasterNode() 847 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 848 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 849 850 if self.new_role != self.old_role: 851 new_flags = self._R2F[self.new_role] 852 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 853 if of != nf: 854 result.append((desc, str(nf))) 855 (node.master_candidate, node.drained, node.offline) = new_flags 856 self.cfg.Update(node, feedback_fn) 857 858 # Tell the node to demote itself, if no longer MC and not offline. 859 # This must be done only after the configuration is updated so that 860 # it's ensured the node won't receive any further configuration updates. 861 if self.old_role == self._ROLE_CANDIDATE and \ 862 self.new_role != self._ROLE_OFFLINE: 863 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 864 if msg: 865 self.LogWarning("Node failed to demote itself: %s", msg) 866 867 # we locked all nodes, we adjust the CP before updating this node 868 if self.lock_all: 869 AdjustCandidatePool( 870 self, [node.uuid], master_node, potential_master_candidates, 871 feedback_fn, modify_ssh_setup) 872 873 # if node gets promoted, grant RPC priviledges 874 if self.new_role == self._ROLE_CANDIDATE: 875 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid) 876 # if node is demoted, revoke RPC priviledges 877 if self.old_role == self._ROLE_CANDIDATE: 878 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid) 879 880 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid]) 881 882 # this will trigger job queue propagation or cleanup if the mc 883 # flag changed 884 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 885 886 if modify_ssh_setup: 887 if self.old_role == self._ROLE_CANDIDATE: 888 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 889 ssh_result = self.rpc.call_node_ssh_key_remove( 890 [master_node], 891 node.uuid, node.name, 892 master_candidate_uuids, potential_master_candidates, 893 True, # remove node's key from all nodes' authorized_keys file 894 False, # currently, all nodes are potential master candidates 895 False, # do not clear node's 'authorized_keys' 896 False, # do not clear node's 'ganeti_pub_keys' 897 False, # no readd 898 self.op.debug, 899 self.op.verbose) 900 ssh_result[master_node].Raise( 901 "Could not adjust the SSH setup after demoting node '%s'" 902 " (UUID: %s)." % (node.name, node.uuid)) 903 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn) 904 905 if self.new_role == self._ROLE_CANDIDATE: 906 AddMasterCandidateSshKey( 907 self, master_node, node, potential_master_candidates, feedback_fn) 908 909 return result
910 911
912 -class LUNodePowercycle(NoHooksLU):
913 """Powercycles a node. 914 915 """ 916 REQ_BGL = False 917
918 - def CheckArguments(self):
919 (self.op.node_uuid, self.op.node_name) = \ 920 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 921 922 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 923 raise errors.OpPrereqError("The node is the master and the force" 924 " parameter was not set", 925 errors.ECODE_INVAL)
926
927 - def ExpandNames(self):
928 """Locking for PowercycleNode. 929 930 This is a last-resort option and shouldn't block on other 931 jobs. Therefore, we grab no locks. 932 933 """ 934 self.needed_locks = {}
935
936 - def Exec(self, feedback_fn):
937 """Reboots a node. 938 939 """ 940 default_hypervisor = self.cfg.GetHypervisorType() 941 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 942 result = self.rpc.call_node_powercycle(self.op.node_uuid, 943 default_hypervisor, 944 hvparams) 945 result.Raise("Failed to schedule the reboot") 946 return result.payload
947 948
949 -def _GetNodeInstancesInner(cfg, fn):
950 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
951 952
953 -def _GetNodePrimaryInstances(cfg, node_uuid):
954 """Returns primary instances on a node. 955 956 """ 957 return _GetNodeInstancesInner(cfg, 958 lambda inst: node_uuid == inst.primary_node)
959 960
961 -def _GetNodeSecondaryInstances(cfg, node_uuid):
962 """Returns secondary instances on a node. 963 964 """ 965 return _GetNodeInstancesInner(cfg, 966 lambda inst: node_uuid in 967 cfg.GetInstanceSecondaryNodes(inst.uuid))
968 969
970 -def _GetNodeInstances(cfg, node_uuid):
971 """Returns a list of all primary and secondary instances on a node. 972 973 """ 974 975 return _GetNodeInstancesInner(cfg, 976 lambda inst: node_uuid in 977 cfg.GetInstanceNodes(inst.uuid))
978 979
980 -class LUNodeEvacuate(NoHooksLU):
981 """Evacuates instances off a list of nodes. 982 983 """ 984 REQ_BGL = False 985
986 - def CheckArguments(self):
987 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
988
989 - def ExpandNames(self):
990 (self.op.node_uuid, self.op.node_name) = \ 991 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 992 993 if self.op.remote_node is not None: 994 (self.op.remote_node_uuid, self.op.remote_node) = \ 995 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 996 self.op.remote_node) 997 assert self.op.remote_node 998 999 if self.op.node_uuid == self.op.remote_node_uuid: 1000 raise errors.OpPrereqError("Can not use evacuated node as a new" 1001 " secondary node", errors.ECODE_INVAL) 1002 1003 if self.op.mode != constants.NODE_EVAC_SEC: 1004 raise errors.OpPrereqError("Without the use of an iallocator only" 1005 " secondary instances can be evacuated", 1006 errors.ECODE_INVAL) 1007 1008 # Declare locks 1009 self.share_locks = ShareAll() 1010 self.needed_locks = { 1011 locking.LEVEL_INSTANCE: [], 1012 locking.LEVEL_NODEGROUP: [], 1013 locking.LEVEL_NODE: [], 1014 } 1015 1016 # Determine nodes (via group) optimistically, needs verification once locks 1017 # have been acquired 1018 self.lock_nodes = self._DetermineNodes()
1019
1020 - def _DetermineNodes(self):
1021 """Gets the list of node UUIDs to operate on. 1022 1023 """ 1024 if self.op.remote_node is None: 1025 # Iallocator will choose any node(s) in the same group 1026 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 1027 else: 1028 group_nodes = frozenset([self.op.remote_node_uuid]) 1029 1030 # Determine nodes to be locked 1031 return set([self.op.node_uuid]) | group_nodes
1032
1033 - def _DetermineInstances(self):
1034 """Builds list of instances to operate on. 1035 1036 """ 1037 assert self.op.mode in constants.NODE_EVAC_MODES 1038 1039 if self.op.mode == constants.NODE_EVAC_PRI: 1040 # Primary instances only 1041 inst_fn = _GetNodePrimaryInstances 1042 assert self.op.remote_node is None, \ 1043 "Evacuating primary instances requires iallocator" 1044 elif self.op.mode == constants.NODE_EVAC_SEC: 1045 # Secondary instances only 1046 inst_fn = _GetNodeSecondaryInstances 1047 else: 1048 # All instances 1049 assert self.op.mode == constants.NODE_EVAC_ALL 1050 inst_fn = _GetNodeInstances 1051 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 1052 # per instance 1053 raise errors.OpPrereqError("Due to an issue with the iallocator" 1054 " interface it is not possible to evacuate" 1055 " all instances at once; specify explicitly" 1056 " whether to evacuate primary or secondary" 1057 " instances", 1058 errors.ECODE_INVAL) 1059 1060 return inst_fn(self.cfg, self.op.node_uuid)
1061
1062 - def DeclareLocks(self, level):
1063 if level == locking.LEVEL_INSTANCE: 1064 # Lock instances optimistically, needs verification once node and group 1065 # locks have been acquired 1066 self.needed_locks[locking.LEVEL_INSTANCE] = \ 1067 set(i.name for i in self._DetermineInstances()) 1068 1069 elif level == locking.LEVEL_NODEGROUP: 1070 # Lock node groups for all potential target nodes optimistically, needs 1071 # verification once nodes have been acquired 1072 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1073 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 1074 1075 elif level == locking.LEVEL_NODE: 1076 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
1077
1078 - def CheckPrereq(self):
1079 # Verify locks 1080 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 1081 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 1082 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1083 1084 need_nodes = self._DetermineNodes() 1085 1086 if not owned_nodes.issuperset(need_nodes): 1087 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 1088 " locks were acquired, current nodes are" 1089 " are '%s', used to be '%s'; retry the" 1090 " operation" % 1091 (self.op.node_name, 1092 utils.CommaJoin(need_nodes), 1093 utils.CommaJoin(owned_nodes)), 1094 errors.ECODE_STATE) 1095 1096 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 1097 if owned_groups != wanted_groups: 1098 raise errors.OpExecError("Node groups changed since locks were acquired," 1099 " current groups are '%s', used to be '%s';" 1100 " retry the operation" % 1101 (utils.CommaJoin(wanted_groups), 1102 utils.CommaJoin(owned_groups))) 1103 1104 # Determine affected instances 1105 self.instances = self._DetermineInstances() 1106 self.instance_names = [i.name for i in self.instances] 1107 1108 if set(self.instance_names) != owned_instance_names: 1109 raise errors.OpExecError("Instances on node '%s' changed since locks" 1110 " were acquired, current instances are '%s'," 1111 " used to be '%s'; retry the operation" % 1112 (self.op.node_name, 1113 utils.CommaJoin(self.instance_names), 1114 utils.CommaJoin(owned_instance_names))) 1115 1116 if self.instance_names: 1117 self.LogInfo("Evacuating instances from node '%s': %s", 1118 self.op.node_name, 1119 utils.CommaJoin(utils.NiceSort(self.instance_names))) 1120 else: 1121 self.LogInfo("No instances to evacuate from node '%s'", 1122 self.op.node_name) 1123 1124 if self.op.remote_node is not None: 1125 for i in self.instances: 1126 if i.primary_node == self.op.remote_node_uuid: 1127 raise errors.OpPrereqError("Node %s is the primary node of" 1128 " instance %s, cannot use it as" 1129 " secondary" % 1130 (self.op.remote_node, i.name), 1131 errors.ECODE_INVAL)
1132
1133 - def Exec(self, feedback_fn):
1134 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 1135 1136 if not self.instance_names: 1137 # No instances to evacuate 1138 jobs = [] 1139 1140 elif self.op.iallocator is not None: 1141 # TODO: Implement relocation to other group 1142 req = iallocator.IAReqNodeEvac( 1143 evac_mode=self.op.mode, instances=list(self.instance_names), 1144 ignore_soft_errors=self.op.ignore_soft_errors) 1145 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1146 1147 ial.Run(self.op.iallocator) 1148 1149 if not ial.success: 1150 raise errors.OpPrereqError("Can't compute node evacuation using" 1151 " iallocator '%s': %s" % 1152 (self.op.iallocator, ial.info), 1153 errors.ECODE_NORES) 1154 1155 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1156 1157 elif self.op.remote_node is not None: 1158 assert self.op.mode == constants.NODE_EVAC_SEC 1159 jobs = [ 1160 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1161 remote_node=self.op.remote_node, 1162 disks=[], 1163 mode=constants.REPLACE_DISK_CHG, 1164 early_release=self.op.early_release)] 1165 for instance_name in self.instance_names] 1166 1167 else: 1168 raise errors.ProgrammerError("No iallocator or remote node") 1169 1170 return ResultWithJobs(jobs)
1171 1172
1173 -class LUNodeMigrate(LogicalUnit):
1174 """Migrate all instances from a node. 1175 1176 """ 1177 HPATH = "node-migrate" 1178 HTYPE = constants.HTYPE_NODE 1179 REQ_BGL = False 1180
1181 - def CheckArguments(self):
1182 pass
1183
1184 - def ExpandNames(self):
1185 (self.op.node_uuid, self.op.node_name) = \ 1186 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1187 1188 self.share_locks = ShareAll() 1189 self.needed_locks = { 1190 locking.LEVEL_NODE: [self.op.node_uuid], 1191 }
1192
1193 - def BuildHooksEnv(self):
1194 """Build hooks env. 1195 1196 This runs on the master, the primary and all the secondaries. 1197 1198 """ 1199 return { 1200 "NODE_NAME": self.op.node_name, 1201 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1202 }
1203
1204 - def BuildHooksNodes(self):
1205 """Build hooks nodes. 1206 1207 """ 1208 nl = [self.cfg.GetMasterNode()] 1209 return (nl, nl)
1210
1211 - def CheckPrereq(self):
1212 pass
1213
1214 - def Exec(self, feedback_fn):
1215 # Prepare jobs for migration instances 1216 jobs = [ 1217 [opcodes.OpInstanceMigrate( 1218 instance_name=inst.name, 1219 mode=self.op.mode, 1220 live=self.op.live, 1221 iallocator=self.op.iallocator, 1222 target_node=self.op.target_node, 1223 allow_runtime_changes=self.op.allow_runtime_changes, 1224 ignore_ipolicy=self.op.ignore_ipolicy)] 1225 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1226 1227 # TODO: Run iallocator in this opcode and pass correct placement options to 1228 # OpInstanceMigrate. Since other jobs can modify the cluster between 1229 # running the iallocator and the actual migration, a good consistency model 1230 # will have to be found. 1231 1232 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1233 frozenset([self.op.node_uuid])) 1234 1235 return ResultWithJobs(jobs)
1236 1237
1238 -def _GetStorageTypeArgs(cfg, storage_type):
1239 """Returns the arguments for a storage type. 1240 1241 """ 1242 # Special case for file storage 1243 1244 if storage_type == constants.ST_FILE: 1245 return [[cfg.GetFileStorageDir()]] 1246 elif storage_type == constants.ST_SHARED_FILE: 1247 return [[cfg.GetSharedFileStorageDir()]] 1248 elif storage_type == constants.ST_GLUSTER: 1249 return [[cfg.GetGlusterStorageDir()]] 1250 else: 1251 return []
1252 1253
1254 -class LUNodeModifyStorage(NoHooksLU):
1255 """Logical unit for modifying a storage volume on a node. 1256 1257 """ 1258 REQ_BGL = False 1259
1260 - def CheckArguments(self):
1261 (self.op.node_uuid, self.op.node_name) = \ 1262 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1263 1264 storage_type = self.op.storage_type 1265 1266 try: 1267 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1268 except KeyError: 1269 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1270 " modified" % storage_type, 1271 errors.ECODE_INVAL) 1272 1273 diff = set(self.op.changes.keys()) - modifiable 1274 if diff: 1275 raise errors.OpPrereqError("The following fields can not be modified for" 1276 " storage units of type '%s': %r" % 1277 (storage_type, list(diff)), 1278 errors.ECODE_INVAL)
1279
1280 - def CheckPrereq(self):
1281 """Check prerequisites. 1282 1283 """ 1284 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1285
1286 - def ExpandNames(self):
1287 self.needed_locks = { 1288 locking.LEVEL_NODE: self.op.node_uuid, 1289 }
1290
1291 - def Exec(self, feedback_fn):
1292 """Computes the list of nodes and their attributes. 1293 1294 """ 1295 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1296 result = self.rpc.call_storage_modify(self.op.node_uuid, 1297 self.op.storage_type, st_args, 1298 self.op.name, self.op.changes) 1299 result.Raise("Failed to modify storage unit '%s' on %s" % 1300 (self.op.name, self.op.node_name))
1301 1302
1303 -def _CheckOutputFields(fields, selected):
1304 """Checks whether all selected fields are valid according to fields. 1305 1306 @type fields: L{utils.FieldSet} 1307 @param fields: fields set 1308 @type selected: L{utils.FieldSet} 1309 @param selected: fields set 1310 1311 """ 1312 delta = fields.NonMatching(selected) 1313 if delta: 1314 raise errors.OpPrereqError("Unknown output fields selected: %s" 1315 % ",".join(delta), errors.ECODE_INVAL)
1316 1317
1318 -class LUNodeQueryvols(NoHooksLU):
1319 """Logical unit for getting volumes on node(s). 1320 1321 """ 1322 REQ_BGL = False 1323
1324 - def CheckArguments(self):
1329
1330 - def ExpandNames(self):
1331 self.share_locks = ShareAll() 1332 1333 if self.op.nodes: 1334 self.needed_locks = { 1335 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1336 } 1337 else: 1338 self.needed_locks = { 1339 locking.LEVEL_NODE: locking.ALL_SET, 1340 }
1341
1342 - def Exec(self, feedback_fn):
1343 """Computes the list of nodes and their attributes. 1344 1345 """ 1346 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1347 volumes = self.rpc.call_node_volumes(node_uuids) 1348 1349 ilist = self.cfg.GetAllInstancesInfo() 1350 vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values()) 1351 1352 output = [] 1353 for node_uuid in node_uuids: 1354 nresult = volumes[node_uuid] 1355 if nresult.offline: 1356 continue 1357 msg = nresult.fail_msg 1358 if msg: 1359 self.LogWarning("Can't compute volume data on node %s: %s", 1360 self.cfg.GetNodeName(node_uuid), msg) 1361 continue 1362 1363 node_vols = sorted(nresult.payload, 1364 key=operator.itemgetter(constants.VF_DEV)) 1365 1366 for vol in node_vols: 1367 node_output = [] 1368 for field in self.op.output_fields: 1369 if field == constants.VF_NODE: 1370 val = self.cfg.GetNodeName(node_uuid) 1371 elif field == constants.VF_PHYS: 1372 val = vol[constants.VF_DEV] 1373 elif field == constants.VF_VG: 1374 val = vol[constants.VF_VG] 1375 elif field == constants.VF_NAME: 1376 val = vol[constants.VF_NAME] 1377 elif field == constants.VF_SIZE: 1378 val = int(float(vol[constants.VF_SIZE])) 1379 elif field == constants.VF_INSTANCE: 1380 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + 1381 vol[constants.VF_NAME]), None) 1382 if inst is not None: 1383 val = inst.name 1384 else: 1385 val = "-" 1386 else: 1387 raise errors.ParameterError(field) 1388 node_output.append(str(val)) 1389 1390 output.append(node_output) 1391 1392 return output
1393 1394
1395 -class LUNodeQueryStorage(NoHooksLU):
1396 """Logical unit for getting information on storage units on node(s). 1397 1398 """ 1399 REQ_BGL = False 1400
1401 - def CheckArguments(self):
1402 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1403 self.op.output_fields)
1404
1405 - def ExpandNames(self):
1406 self.share_locks = ShareAll() 1407 1408 if self.op.nodes: 1409 self.needed_locks = { 1410 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1411 } 1412 else: 1413 self.needed_locks = { 1414 locking.LEVEL_NODE: locking.ALL_SET, 1415 }
1416
1417 - def _DetermineStorageType(self):
1418 """Determines the default storage type of the cluster. 1419 1420 """ 1421 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates 1422 default_storage_type = \ 1423 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] 1424 return default_storage_type
1425
1426 - def CheckPrereq(self):
1427 """Check prerequisites. 1428 1429 """ 1430 if self.op.storage_type: 1431 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1432 self.storage_type = self.op.storage_type 1433 else: 1434 self.storage_type = self._DetermineStorageType() 1435 supported_storage_types = constants.STS_REPORT_NODE_STORAGE 1436 if self.storage_type not in supported_storage_types: 1437 raise errors.OpPrereqError( 1438 "Storage reporting for storage type '%s' is not supported. Please" 1439 " use the --storage-type option to specify one of the supported" 1440 " storage types (%s) or set the default disk template to one that" 1441 " supports storage reporting." % 1442 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1443
1444 - def Exec(self, feedback_fn):
1445 """Computes the list of nodes and their attributes. 1446 1447 """ 1448 if self.op.storage_type: 1449 self.storage_type = self.op.storage_type 1450 else: 1451 self.storage_type = self._DetermineStorageType() 1452 1453 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1454 1455 # Always get name to sort by 1456 if constants.SF_NAME in self.op.output_fields: 1457 fields = self.op.output_fields[:] 1458 else: 1459 fields = [constants.SF_NAME] + self.op.output_fields 1460 1461 # Never ask for node or type as it's only known to the LU 1462 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1463 while extra in fields: 1464 fields.remove(extra) 1465 1466 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1467 name_idx = field_idx[constants.SF_NAME] 1468 1469 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) 1470 data = self.rpc.call_storage_list(self.node_uuids, 1471 self.storage_type, st_args, 1472 self.op.name, fields) 1473 1474 result = [] 1475 1476 for node_uuid in utils.NiceSort(self.node_uuids): 1477 node_name = self.cfg.GetNodeName(node_uuid) 1478 nresult = data[node_uuid] 1479 if nresult.offline: 1480 continue 1481 1482 msg = nresult.fail_msg 1483 if msg: 1484 self.LogWarning("Can't get storage data from node %s: %s", 1485 node_name, msg) 1486 continue 1487 1488 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1489 1490 for name in utils.NiceSort(rows.keys()): 1491 row = rows[name] 1492 1493 out = [] 1494 1495 for field in self.op.output_fields: 1496 if field == constants.SF_NODE: 1497 val = node_name 1498 elif field == constants.SF_TYPE: 1499 val = self.storage_type 1500 elif field in field_idx: 1501 val = row[field_idx[field]] 1502 else: 1503 raise errors.ParameterError(field) 1504 1505 out.append(val) 1506 1507 result.append(out) 1508 1509 return result
1510 1511
1512 -class LUNodeRemove(LogicalUnit):
1513 """Logical unit for removing a node. 1514 1515 """ 1516 HPATH = "node-remove" 1517 HTYPE = constants.HTYPE_NODE 1518
1519 - def BuildHooksEnv(self):
1520 """Build hooks env. 1521 1522 """ 1523 return { 1524 "OP_TARGET": self.op.node_name, 1525 "NODE_NAME": self.op.node_name, 1526 }
1527
1528 - def BuildHooksNodes(self):
1529 """Build hooks nodes. 1530 1531 This doesn't run on the target node in the pre phase as a failed 1532 node would then be impossible to remove. 1533 1534 """ 1535 all_nodes = self.cfg.GetNodeList() 1536 try: 1537 all_nodes.remove(self.op.node_uuid) 1538 except ValueError: 1539 pass 1540 return (all_nodes, all_nodes)
1541
1542 - def CheckPrereq(self):
1543 """Check prerequisites. 1544 1545 This checks: 1546 - the node exists in the configuration 1547 - it does not have primary or secondary instances 1548 - it's not the master 1549 1550 Any errors are signaled by raising errors.OpPrereqError. 1551 1552 """ 1553 (self.op.node_uuid, self.op.node_name) = \ 1554 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1555 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1556 assert node is not None 1557 1558 masternode = self.cfg.GetMasterNode() 1559 if node.uuid == masternode: 1560 raise errors.OpPrereqError("Node is the master node, failover to another" 1561 " node is required", errors.ECODE_INVAL) 1562 1563 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1564 if node.uuid in self.cfg.GetInstanceNodes(instance.uuid): 1565 raise errors.OpPrereqError("Instance %s is still running on the node," 1566 " please remove first" % instance.name, 1567 errors.ECODE_INVAL) 1568 self.op.node_name = node.name 1569 self.node = node
1570
1571 - def Exec(self, feedback_fn):
1572 """Removes the node from the cluster. 1573 1574 """ 1575 logging.info("Stopping the node daemon and removing configs from node %s", 1576 self.node.name) 1577 1578 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1579 1580 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1581 "Not owning BGL" 1582 1583 master_node = self.cfg.GetMasterNode() 1584 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 1585 if modify_ssh_setup: 1586 # retrieve the list of potential master candidates before the node is 1587 # removed 1588 potential_master_candidate = \ 1589 self.op.node_name in potential_master_candidates 1590 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 1591 result = self.rpc.call_node_ssh_key_remove( 1592 [master_node], 1593 self.node.uuid, self.op.node_name, 1594 master_candidate_uuids, potential_master_candidates, 1595 self.node.master_candidate, # from_authorized_keys 1596 potential_master_candidate, # from_public_keys 1597 True, # clear node's 'authorized_keys' 1598 True, # clear node's 'ganeti_public_keys' 1599 False, # no readd 1600 self.op.debug, 1601 self.op.verbose) 1602 result[master_node].Raise( 1603 "Could not remove the SSH key of node '%s' (UUID: %s)." % 1604 (self.op.node_name, self.node.uuid)) 1605 WarnAboutFailedSshUpdates(result, master_node, feedback_fn) 1606 1607 # Promote nodes to master candidate as needed 1608 AdjustCandidatePool( 1609 self, [self.node.uuid], master_node, potential_master_candidates, 1610 feedback_fn, modify_ssh_setup) 1611 self.cfg.RemoveNode(self.node.uuid) 1612 1613 # Run post hooks on the node before it's removed 1614 RunPostHook(self, self.node.name) 1615 1616 # we have to call this by name rather than by UUID, as the node is no longer 1617 # in the config 1618 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1619 msg = result.fail_msg 1620 if msg: 1621 self.LogWarning("Errors encountered on the remote node while leaving" 1622 " the cluster: %s", msg) 1623 1624 cluster = self.cfg.GetClusterInfo() 1625 1626 # Remove node from candidate certificate list 1627 if self.node.master_candidate: 1628 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid) 1629 1630 # Remove node from our /etc/hosts 1631 if cluster.modify_etc_hosts: 1632 master_node_uuid = self.cfg.GetMasterNode() 1633 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1634 constants.ETC_HOSTS_REMOVE, 1635 self.node.name, None) 1636 result.Raise("Can't update hosts file with new host data") 1637 RedistributeAncillaryFiles(self)
1638 1639
1640 -class LURepairNodeStorage(NoHooksLU):
1641 """Repairs the volume group on a node. 1642 1643 """ 1644 REQ_BGL = False 1645
1646 - def CheckArguments(self):
1647 (self.op.node_uuid, self.op.node_name) = \ 1648 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1649 1650 storage_type = self.op.storage_type 1651 1652 if (constants.SO_FIX_CONSISTENCY not in 1653 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1654 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1655 " repaired" % storage_type, 1656 errors.ECODE_INVAL)
1657
1658 - def ExpandNames(self):
1659 self.needed_locks = { 1660 locking.LEVEL_NODE: [self.op.node_uuid], 1661 }
1662
1663 - def _CheckFaultyDisks(self, instance, node_uuid):
1664 """Ensure faulty disks abort the opcode or at least warn.""" 1665 try: 1666 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1667 node_uuid, True): 1668 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1669 " node '%s'" % 1670 (instance.name, 1671 self.cfg.GetNodeName(node_uuid)), 1672 errors.ECODE_STATE) 1673 except errors.OpPrereqError, err: 1674 if self.op.ignore_consistency: 1675 self.LogWarning(str(err.args[0])) 1676 else: 1677 raise
1678
1679 - def CheckPrereq(self):
1680 """Check prerequisites. 1681 1682 """ 1683 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1684 1685 # Check whether any instance on this node has faulty disks 1686 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1687 if not inst.disks_active: 1688 continue 1689 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid)) 1690 check_nodes.discard(self.op.node_uuid) 1691 for inst_node_uuid in check_nodes: 1692 self._CheckFaultyDisks(inst, inst_node_uuid)
1693
1694 - def Exec(self, feedback_fn):
1695 feedback_fn("Repairing storage unit '%s' on %s ..." % 1696 (self.op.name, self.op.node_name)) 1697 1698 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1699 result = self.rpc.call_storage_execute(self.op.node_uuid, 1700 self.op.storage_type, st_args, 1701 self.op.name, 1702 constants.SO_FIX_CONSISTENCY) 1703 result.Raise("Failed to repair storage unit '%s' on %s" % 1704 (self.op.name, self.op.node_name))
1705