Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with nodes.""" 
  32   
  33  import logging 
  34  import operator 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import netutils 
  40  from ganeti import objects 
  41  from ganeti import opcodes 
  42  import ganeti.rpc.node as rpc 
  43  from ganeti import utils 
  44  from ganeti.masterd import iallocator 
  45   
  46  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
  47  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  48    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  49    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  50    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  51    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  52    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  53    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  54    FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \ 
  55    AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \ 
  56    EnsureKvmdOnNodes, WarnAboutFailedSshUpdates, AddMasterCandidateSshKey 
  57   
  58   
59 -def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate. 61 62 """ 63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 65 # the new node will increase mc_max with one, so: 66 mc_should = min(mc_should + 1, cp_size) 67 return mc_now < mc_should
68 69
70 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip. 72 73 @type lu: L{LogicalUnit} 74 @param lu: the LU on behalf of which we make the check 75 @type node: L{objects.Node} 76 @param node: the node to check 77 @type secondary_ip: string 78 @param secondary_ip: the ip to check 79 @type prereq: boolean 80 @param prereq: whether to throw a prerequisite or an execute error 81 @raise errors.OpPrereqError: if the node doesn't have the ip, 82 and prereq=True 83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 84 85 """ 86 # this can be called with a new node, which has no UUID yet, so perform the 87 # RPC call using its name 88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 89 result.Raise("Failure checking secondary ip on node %s" % node.name, 90 prereq=prereq, ecode=errors.ECODE_ENVIRON) 91 if not result.payload: 92 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 93 " please fix and re-run this command" % secondary_ip) 94 if prereq: 95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 96 else: 97 raise errors.OpExecError(msg)
98 99
100 -class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster. 102 103 """ 104 HPATH = "node-add" 105 HTYPE = constants.HTYPE_NODE 106 _NFLAGS = ["master_capable", "vm_capable"] 107
108 - def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 110 # validate/normalize the node name 111 self.hostname = netutils.GetHostname(name=self.op.node_name, 112 family=self.primary_ip_family) 113 self.op.node_name = self.hostname.name 114 115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 116 raise errors.OpPrereqError("Cannot readd the master node", 117 errors.ECODE_STATE) 118 119 if self.op.readd and self.op.group: 120 raise errors.OpPrereqError("Cannot pass a node group when a node is" 121 " being readded", errors.ECODE_INVAL)
122
123 - def BuildHooksEnv(self):
124 """Build hooks env. 125 126 This will run on all nodes before, and on all nodes + the new node after. 127 128 """ 129 return { 130 "OP_TARGET": self.op.node_name, 131 "NODE_NAME": self.op.node_name, 132 "NODE_PIP": self.op.primary_ip, 133 "NODE_SIP": self.op.secondary_ip, 134 "MASTER_CAPABLE": str(self.op.master_capable), 135 "VM_CAPABLE": str(self.op.vm_capable), 136 }
137
138 - def BuildHooksNodes(self):
139 """Build hooks nodes. 140 141 """ 142 hook_nodes = self.cfg.GetNodeList() 143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 144 if new_node_info is not None: 145 # Exclude added node 146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 147 148 # add the new node as post hook node by name; it does not have an UUID yet 149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 - def CheckPrereq(self):
155 """Check prerequisites. 156 157 This checks: 158 - the new node is not already in the config 159 - it is resolvable 160 - its parameters (single/dual homed) matches the cluster 161 162 Any errors are signaled by raising errors.OpPrereqError. 163 164 """ 165 node_name = self.hostname.name 166 self.op.primary_ip = self.hostname.ip 167 if self.op.secondary_ip is None: 168 if self.primary_ip_family == netutils.IP6Address.family: 169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 170 " IPv4 address must be given as secondary", 171 errors.ECODE_INVAL) 172 self.op.secondary_ip = self.op.primary_ip 173 174 secondary_ip = self.op.secondary_ip 175 if not netutils.IP4Address.IsValid(secondary_ip): 176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 177 " address" % secondary_ip, errors.ECODE_INVAL) 178 179 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 180 if not self.op.readd and existing_node_info is not None: 181 raise errors.OpPrereqError("Node %s is already in the configuration" % 182 node_name, errors.ECODE_EXISTS) 183 elif self.op.readd and existing_node_info is None: 184 raise errors.OpPrereqError("Node %s is not in the configuration" % 185 node_name, errors.ECODE_NOENT) 186 187 self.changed_primary_ip = False 188 189 for existing_node in self.cfg.GetAllNodesInfo().values(): 190 if self.op.readd and node_name == existing_node.name: 191 if existing_node.secondary_ip != secondary_ip: 192 raise errors.OpPrereqError("Readded node doesn't have the same IP" 193 " address configuration as before", 194 errors.ECODE_INVAL) 195 if existing_node.primary_ip != self.op.primary_ip: 196 self.changed_primary_ip = True 197 198 continue 199 200 if (existing_node.primary_ip == self.op.primary_ip or 201 existing_node.secondary_ip == self.op.primary_ip or 202 existing_node.primary_ip == secondary_ip or 203 existing_node.secondary_ip == secondary_ip): 204 raise errors.OpPrereqError("New node ip address(es) conflict with" 205 " existing node %s" % existing_node.name, 206 errors.ECODE_NOTUNIQUE) 207 208 # After this 'if' block, None is no longer a valid value for the 209 # _capable op attributes 210 if self.op.readd: 211 assert existing_node_info is not None, \ 212 "Can't retrieve locked node %s" % node_name 213 for attr in self._NFLAGS: 214 if getattr(self.op, attr) is None: 215 setattr(self.op, attr, getattr(existing_node_info, attr)) 216 else: 217 for attr in self._NFLAGS: 218 if getattr(self.op, attr) is None: 219 setattr(self.op, attr, True) 220 221 if self.op.readd and not self.op.vm_capable: 222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 223 if pri or sec: 224 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 225 " flag set to false, but it already holds" 226 " instances" % node_name, 227 errors.ECODE_STATE) 228 229 # check that the type of the node (single versus dual homed) is the 230 # same as for the master 231 myself = self.cfg.GetMasterNodeInfo() 232 master_singlehomed = myself.secondary_ip == myself.primary_ip 233 newbie_singlehomed = secondary_ip == self.op.primary_ip 234 if master_singlehomed != newbie_singlehomed: 235 if master_singlehomed: 236 raise errors.OpPrereqError("The master has no secondary ip but the" 237 " new node has one", 238 errors.ECODE_INVAL) 239 else: 240 raise errors.OpPrereqError("The master has a secondary ip but the" 241 " new node doesn't have one", 242 errors.ECODE_INVAL) 243 244 # checks reachability 245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 246 raise errors.OpPrereqError("Node not reachable by ping", 247 errors.ECODE_ENVIRON) 248 249 if not newbie_singlehomed: 250 # check reachability from my secondary ip to newbie's secondary ip 251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 252 source=myself.secondary_ip): 253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 254 " based ping to node daemon port", 255 errors.ECODE_ENVIRON) 256 257 if self.op.readd: 258 exceptions = [existing_node_info.uuid] 259 else: 260 exceptions = [] 261 262 if self.op.master_capable: 263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 264 else: 265 self.master_candidate = False 266 267 self.node_group = None 268 if self.op.readd: 269 self.new_node = existing_node_info 270 self.node_group = existing_node_info.group 271 else: 272 self.node_group = self.cfg.LookupNodeGroup(self.op.group) 273 self.new_node = objects.Node(name=node_name, 274 primary_ip=self.op.primary_ip, 275 secondary_ip=secondary_ip, 276 master_candidate=self.master_candidate, 277 offline=False, drained=False, 278 group=self.node_group, ndparams={}) 279 280 if self.op.ndparams: 281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 283 "node", "cluster or group") 284 285 if self.op.hv_state: 286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 287 288 if self.op.disk_state: 289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 290 291 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 292 # it a property on the base class. 293 rpcrunner = rpc.DnsOnlyRunner() 294 result = rpcrunner.call_version([node_name])[node_name] 295 result.Raise("Can't get version information from node %s" % node_name, 296 prereq=True, ecode=errors.ECODE_ENVIRON) 297 if constants.PROTOCOL_VERSION == result.payload: 298 logging.info("Communication to node %s fine, sw version %s match", 299 node_name, result.payload) 300 else: 301 raise errors.OpPrereqError("Version mismatch master version %s," 302 " node version %s" % 303 (constants.PROTOCOL_VERSION, result.payload), 304 errors.ECODE_ENVIRON) 305 306 vg_name = self.cfg.GetVGName() 307 if vg_name is not None: 308 vparams = {constants.NV_PVLIST: [vg_name]} 309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 310 cname = self.cfg.GetClusterName() 311 result = rpcrunner.call_node_verify_light( 312 [node_name], vparams, cname, 313 self.cfg.GetClusterInfo().hvparams, 314 {node_name: self.node_group}, 315 self.cfg.GetAllNodeGroupsInfoDict() 316 )[node_name] 317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 318 if errmsgs: 319 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
322 - def _InitOpenVSwitch(self):
323 filled_ndparams = self.cfg.GetClusterInfo().FillND( 324 self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) 325 326 ovs = filled_ndparams.get(constants.ND_OVS, None) 327 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) 328 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) 329 330 if ovs: 331 if not ovs_link: 332 self.LogInfo("No physical interface for OpenvSwitch was given." 333 " OpenvSwitch will not have an outside connection. This" 334 " might not be what you want.") 335 336 result = self.rpc.call_node_configure_ovs( 337 self.new_node.name, ovs_name, ovs_link) 338 result.Raise("Failed to initialize OpenVSwitch on new node")
339
340 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate, 341 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
342 """Update the SSH setup of all nodes after adding a new node. 343 344 @type readd: boolean 345 @param readd: whether or not this node is readded 346 347 """ 348 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 349 master_node = self.cfg.GetMasterNode() 350 351 if readd: 352 # clear previous keys 353 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 354 remove_result = rpcrunner.call_node_ssh_key_remove( 355 [master_node], 356 new_node_uuid, new_node_name, 357 master_candidate_uuids, 358 potential_master_candidates, 359 True, # from authorized keys 360 True, # from public keys 361 False, # clear authorized keys 362 True, # clear public keys 363 True) # it's a readd 364 remove_result[master_node].Raise( 365 "Could not remove SSH keys of node %s before readding," 366 " (UUID: %s)." % (new_node_name, new_node_uuid)) 367 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn) 368 369 result = rpcrunner.call_node_ssh_key_add( 370 [master_node], new_node_uuid, new_node_name, 371 potential_master_candidates, 372 is_master_candidate, is_potential_master_candidate, 373 is_potential_master_candidate) 374 375 result[master_node].Raise("Could not update the node's SSH setup.") 376 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
377
378 - def Exec(self, feedback_fn):
379 """Adds the new node to the cluster. 380 381 """ 382 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 383 "Not owning BGL" 384 385 # We adding a new node so we assume it's powered 386 self.new_node.powered = True 387 388 # for re-adds, reset the offline/drained/master-candidate flags; 389 # we need to reset here, otherwise offline would prevent RPC calls 390 # later in the procedure; this also means that if the re-add 391 # fails, we are left with a non-offlined, broken node 392 if self.op.readd: 393 self.new_node.offline = False 394 self.new_node.drained = False 395 self.LogInfo("Readding a node, the offline/drained flags were reset") 396 # if we demote the node, we do cleanup later in the procedure 397 self.new_node.master_candidate = self.master_candidate 398 if self.changed_primary_ip: 399 self.new_node.primary_ip = self.op.primary_ip 400 401 # copy the master/vm_capable flags 402 for attr in self._NFLAGS: 403 setattr(self.new_node, attr, getattr(self.op, attr)) 404 405 # notify the user about any possible mc promotion 406 if self.new_node.master_candidate: 407 self.LogInfo("Node will be a master candidate") 408 409 if self.op.ndparams: 410 self.new_node.ndparams = self.op.ndparams 411 else: 412 self.new_node.ndparams = {} 413 414 if self.op.hv_state: 415 self.new_node.hv_state_static = self.new_hv_state 416 417 if self.op.disk_state: 418 self.new_node.disk_state_static = self.new_disk_state 419 420 # Add node to our /etc/hosts, and add key to known_hosts 421 if self.cfg.GetClusterInfo().modify_etc_hosts: 422 master_node = self.cfg.GetMasterNode() 423 result = self.rpc.call_etc_hosts_modify( 424 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 425 self.hostname.ip) 426 result.Raise("Can't update hosts file with new host data") 427 428 if self.new_node.secondary_ip != self.new_node.primary_ip: 429 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 430 False) 431 432 node_verifier_uuids = [self.cfg.GetMasterNode()] 433 node_verify_param = { 434 constants.NV_NODELIST: ([self.new_node.name], {}, []), 435 # TODO: do a node-net-test as well? 436 } 437 438 result = self.rpc.call_node_verify( 439 node_verifier_uuids, node_verify_param, 440 self.cfg.GetClusterName(), 441 self.cfg.GetClusterInfo().hvparams, 442 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)}, 443 self.cfg.GetAllNodeGroupsInfoDict() 444 ) 445 for verifier in node_verifier_uuids: 446 result[verifier].Raise("Cannot communicate with node %s" % verifier) 447 nl_payload = result[verifier].payload[constants.NV_NODELIST] 448 if nl_payload: 449 for failed in nl_payload: 450 feedback_fn("ssh/hostname verification failed" 451 " (checking from %s): %s" % 452 (verifier, nl_payload[failed])) 453 raise errors.OpExecError("ssh/hostname verification failed") 454 455 self._InitOpenVSwitch() 456 457 if self.op.readd: 458 self.context.ReaddNode(self.new_node) 459 RedistributeAncillaryFiles(self) 460 # make sure we redistribute the config 461 self.cfg.Update(self.new_node, feedback_fn) 462 # and make sure the new node will not have old files around 463 if not self.new_node.master_candidate: 464 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 465 result.Warn("Node failed to demote itself from master candidate status", 466 self.LogWarning) 467 else: 468 self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId()) 469 RedistributeAncillaryFiles(self) 470 471 # We create a new certificate even if the node is readded 472 digest = GetClientCertDigest(self, self.new_node.uuid) 473 if self.new_node.master_candidate: 474 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest) 475 else: 476 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None) 477 478 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid]) 479 480 # Update SSH setup of all nodes 481 if self.op.node_setup: 482 # FIXME: so far, all nodes are considered potential master candidates 483 self._SshUpdate(self.new_node.uuid, self.new_node.name, 484 self.new_node.master_candidate, True, 485 self.rpc, self.op.readd, feedback_fn)
486 487
488 -class LUNodeSetParams(LogicalUnit):
489 """Modifies the parameters of a node. 490 491 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 492 to the node role (as _ROLE_*) 493 @cvar _R2F: a dictionary from node role to tuples of flags 494 @cvar _FLAGS: a list of attribute names corresponding to the flags 495 496 """ 497 HPATH = "node-modify" 498 HTYPE = constants.HTYPE_NODE 499 REQ_BGL = False 500 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 501 _F2R = { 502 (True, False, False): _ROLE_CANDIDATE, 503 (False, True, False): _ROLE_DRAINED, 504 (False, False, True): _ROLE_OFFLINE, 505 (False, False, False): _ROLE_REGULAR, 506 } 507 _R2F = dict((v, k) for k, v in _F2R.items()) 508 _FLAGS = ["master_candidate", "drained", "offline"] 509
510 - def CheckArguments(self):
511 (self.op.node_uuid, self.op.node_name) = \ 512 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 513 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 514 self.op.master_capable, self.op.vm_capable, 515 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 516 self.op.disk_state] 517 if all_mods.count(None) == len(all_mods): 518 raise errors.OpPrereqError("Please pass at least one modification", 519 errors.ECODE_INVAL) 520 if all_mods.count(True) > 1: 521 raise errors.OpPrereqError("Can't set the node into more than one" 522 " state at the same time", 523 errors.ECODE_INVAL) 524 525 # Boolean value that tells us whether we might be demoting from MC 526 self.might_demote = (self.op.master_candidate is False or 527 self.op.offline is True or 528 self.op.drained is True or 529 self.op.master_capable is False) 530 531 if self.op.secondary_ip: 532 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 533 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 534 " address" % self.op.secondary_ip, 535 errors.ECODE_INVAL) 536 537 self.lock_all = self.op.auto_promote and self.might_demote 538 self.lock_instances = self.op.secondary_ip is not None
539
540 - def _InstanceFilter(self, instance):
541 """Filter for getting affected instances. 542 543 """ 544 disks = self.cfg.GetInstanceDisks(instance.uuid) 545 any_mirrored = utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR) 546 return (any_mirrored and 547 self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
548
549 - def ExpandNames(self):
550 if self.lock_all: 551 self.needed_locks = { 552 locking.LEVEL_NODE: locking.ALL_SET, 553 } 554 else: 555 self.needed_locks = { 556 locking.LEVEL_NODE: self.op.node_uuid, 557 } 558 559 # Since modifying a node can have severe effects on currently running 560 # operations the resource lock is at least acquired in shared mode 561 self.needed_locks[locking.LEVEL_NODE_RES] = \ 562 self.needed_locks[locking.LEVEL_NODE] 563 564 # Get all locks except nodes in shared mode; they are not used for anything 565 # but read-only access 566 self.share_locks = ShareAll() 567 self.share_locks[locking.LEVEL_NODE] = 0 568 self.share_locks[locking.LEVEL_NODE_RES] = 0 569 570 if self.lock_instances: 571 self.needed_locks[locking.LEVEL_INSTANCE] = \ 572 self.cfg.GetInstanceNames( 573 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
574
575 - def BuildHooksEnv(self):
576 """Build hooks env. 577 578 This runs on the master node. 579 580 """ 581 return { 582 "OP_TARGET": self.op.node_name, 583 "MASTER_CANDIDATE": str(self.op.master_candidate), 584 "OFFLINE": str(self.op.offline), 585 "DRAINED": str(self.op.drained), 586 "MASTER_CAPABLE": str(self.op.master_capable), 587 "VM_CAPABLE": str(self.op.vm_capable), 588 }
589
590 - def BuildHooksNodes(self):
591 """Build hooks nodes. 592 593 """ 594 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 595 return (nl, nl)
596
597 - def CheckPrereq(self):
598 """Check prerequisites. 599 600 This only checks the instance list against the existing names. 601 602 """ 603 node = self.cfg.GetNodeInfo(self.op.node_uuid) 604 if self.lock_instances: 605 affected_instances = \ 606 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 607 608 # Verify instance locks 609 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 610 wanted_instance_names = frozenset([inst.name for inst in 611 affected_instances.values()]) 612 if wanted_instance_names - owned_instance_names: 613 raise errors.OpPrereqError("Instances affected by changing node %s's" 614 " secondary IP address have changed since" 615 " locks were acquired, wanted '%s', have" 616 " '%s'; retry the operation" % 617 (node.name, 618 utils.CommaJoin(wanted_instance_names), 619 utils.CommaJoin(owned_instance_names)), 620 errors.ECODE_STATE) 621 else: 622 affected_instances = None 623 624 if (self.op.master_candidate is not None or 625 self.op.drained is not None or 626 self.op.offline is not None): 627 # we can't change the master's node flags 628 if node.uuid == self.cfg.GetMasterNode(): 629 raise errors.OpPrereqError("The master role can be changed" 630 " only via master-failover", 631 errors.ECODE_INVAL) 632 633 if self.op.master_candidate and not node.master_capable: 634 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 635 " it a master candidate" % node.name, 636 errors.ECODE_STATE) 637 638 if self.op.vm_capable is False: 639 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 640 if ipri or isec: 641 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 642 " the vm_capable flag" % node.name, 643 errors.ECODE_STATE) 644 645 if node.master_candidate and self.might_demote and not self.lock_all: 646 assert not self.op.auto_promote, "auto_promote set but lock_all not" 647 # check if after removing the current node, we're missing master 648 # candidates 649 (mc_remaining, mc_should, _) = \ 650 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 651 if mc_remaining < mc_should: 652 raise errors.OpPrereqError("Not enough master candidates, please" 653 " pass auto promote option to allow" 654 " promotion (--auto-promote or RAPI" 655 " auto_promote=True)", errors.ECODE_STATE) 656 657 self.old_flags = old_flags = (node.master_candidate, 658 node.drained, node.offline) 659 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 660 self.old_role = old_role = self._F2R[old_flags] 661 662 # Check for ineffective changes 663 for attr in self._FLAGS: 664 if getattr(self.op, attr) is False and getattr(node, attr) is False: 665 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 666 setattr(self.op, attr, None) 667 668 # Past this point, any flag change to False means a transition 669 # away from the respective state, as only real changes are kept 670 671 # TODO: We might query the real power state if it supports OOB 672 if SupportsOob(self.cfg, node): 673 if self.op.offline is False and not (node.powered or 674 self.op.powered is True): 675 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 676 " offline status can be reset") % 677 self.op.node_name, errors.ECODE_STATE) 678 elif self.op.powered is not None: 679 raise errors.OpPrereqError(("Unable to change powered state for node %s" 680 " as it does not support out-of-band" 681 " handling") % self.op.node_name, 682 errors.ECODE_STATE) 683 684 # If we're being deofflined/drained, we'll MC ourself if needed 685 if (self.op.drained is False or self.op.offline is False or 686 (self.op.master_capable and not node.master_capable)): 687 if _DecideSelfPromotion(self): 688 self.op.master_candidate = True 689 self.LogInfo("Auto-promoting node to master candidate") 690 691 # If we're no longer master capable, we'll demote ourselves from MC 692 if self.op.master_capable is False and node.master_candidate: 693 if self.op.node_uuid == self.cfg.GetMasterNode(): 694 raise errors.OpPrereqError("Master must remain master capable", 695 errors.ECODE_STATE) 696 self.LogInfo("Demoting from master candidate") 697 self.op.master_candidate = False 698 699 # Compute new role 700 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 701 if self.op.master_candidate: 702 new_role = self._ROLE_CANDIDATE 703 elif self.op.drained: 704 new_role = self._ROLE_DRAINED 705 elif self.op.offline: 706 new_role = self._ROLE_OFFLINE 707 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 708 # False is still in new flags, which means we're un-setting (the 709 # only) True flag 710 new_role = self._ROLE_REGULAR 711 else: # no new flags, nothing, keep old role 712 new_role = old_role 713 714 self.new_role = new_role 715 716 if old_role == self._ROLE_OFFLINE and new_role != old_role: 717 # Trying to transition out of offline status 718 result = self.rpc.call_version([node.uuid])[node.uuid] 719 if result.fail_msg: 720 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 721 " to report its version: %s" % 722 (node.name, result.fail_msg), 723 errors.ECODE_STATE) 724 else: 725 self.LogWarning("Transitioning node from offline to online state" 726 " without using re-add. Please make sure the node" 727 " is healthy!") 728 729 # When changing the secondary ip, verify if this is a single-homed to 730 # multi-homed transition or vice versa, and apply the relevant 731 # restrictions. 732 if self.op.secondary_ip: 733 # Ok even without locking, because this can't be changed by any LU 734 master = self.cfg.GetMasterNodeInfo() 735 master_singlehomed = master.secondary_ip == master.primary_ip 736 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 737 if self.op.force and node.uuid == master.uuid: 738 self.LogWarning("Transitioning from single-homed to multi-homed" 739 " cluster; all nodes will require a secondary IP" 740 " address") 741 else: 742 raise errors.OpPrereqError("Changing the secondary ip on a" 743 " single-homed cluster requires the" 744 " --force option to be passed, and the" 745 " target node to be the master", 746 errors.ECODE_INVAL) 747 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 748 if self.op.force and node.uuid == master.uuid: 749 self.LogWarning("Transitioning from multi-homed to single-homed" 750 " cluster; secondary IP addresses will have to be" 751 " removed") 752 else: 753 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 754 " same as the primary IP on a multi-homed" 755 " cluster, unless the --force option is" 756 " passed, and the target node is the" 757 " master", errors.ECODE_INVAL) 758 759 assert not (set([inst.name for inst in affected_instances.values()]) - 760 self.owned_locks(locking.LEVEL_INSTANCE)) 761 762 if node.offline: 763 if affected_instances: 764 msg = ("Cannot change secondary IP address: offline node has" 765 " instances (%s) configured to use it" % 766 utils.CommaJoin( 767 [inst.name for inst in affected_instances.values()])) 768 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 769 else: 770 # On online nodes, check that no instances are running, and that 771 # the node has the new ip and we can reach it. 772 for instance in affected_instances.values(): 773 CheckInstanceState(self, instance, INSTANCE_DOWN, 774 msg="cannot change secondary ip") 775 776 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 777 if master.uuid != node.uuid: 778 # check reachability from master secondary ip to new secondary ip 779 if not netutils.TcpPing(self.op.secondary_ip, 780 constants.DEFAULT_NODED_PORT, 781 source=master.secondary_ip): 782 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 783 " based ping to node daemon port", 784 errors.ECODE_ENVIRON) 785 786 if self.op.ndparams: 787 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 788 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 789 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 790 "node", "cluster or group") 791 self.new_ndparams = new_ndparams 792 793 if self.op.hv_state: 794 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 795 node.hv_state_static) 796 797 if self.op.disk_state: 798 self.new_disk_state = \ 799 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
800
801 - def Exec(self, feedback_fn):
802 """Modifies a node. 803 804 """ 805 node = self.cfg.GetNodeInfo(self.op.node_uuid) 806 result = [] 807 808 if self.op.ndparams: 809 node.ndparams = self.new_ndparams 810 811 if self.op.powered is not None: 812 node.powered = self.op.powered 813 814 if self.op.hv_state: 815 node.hv_state_static = self.new_hv_state 816 817 if self.op.disk_state: 818 node.disk_state_static = self.new_disk_state 819 820 for attr in ["master_capable", "vm_capable"]: 821 val = getattr(self.op, attr) 822 if val is not None: 823 setattr(node, attr, val) 824 result.append((attr, str(val))) 825 826 if self.op.secondary_ip: 827 node.secondary_ip = self.op.secondary_ip 828 result.append(("secondary_ip", self.op.secondary_ip)) 829 830 # this will trigger configuration file update, if needed 831 self.cfg.Update(node, feedback_fn) 832 master_node = self.cfg.GetMasterNode() 833 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 834 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 835 836 if self.new_role != self.old_role: 837 new_flags = self._R2F[self.new_role] 838 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 839 if of != nf: 840 result.append((desc, str(nf))) 841 (node.master_candidate, node.drained, node.offline) = new_flags 842 self.cfg.Update(node, feedback_fn) 843 844 # Tell the node to demote itself, if no longer MC and not offline. 845 # This must be done only after the configuration is updated so that 846 # it's ensured the node won't receive any further configuration updates. 847 if self.old_role == self._ROLE_CANDIDATE and \ 848 self.new_role != self._ROLE_OFFLINE: 849 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 850 if msg: 851 self.LogWarning("Node failed to demote itself: %s", msg) 852 853 # we locked all nodes, we adjust the CP before updating this node 854 if self.lock_all: 855 AdjustCandidatePool( 856 self, [node.uuid], master_node, potential_master_candidates, 857 feedback_fn, modify_ssh_setup) 858 859 # if node gets promoted, grant RPC priviledges 860 if self.new_role == self._ROLE_CANDIDATE: 861 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid) 862 # if node is demoted, revoke RPC priviledges 863 if self.old_role == self._ROLE_CANDIDATE: 864 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid) 865 866 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid]) 867 868 # this will trigger job queue propagation or cleanup if the mc 869 # flag changed 870 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 871 self.context.ReaddNode(node) 872 873 if modify_ssh_setup: 874 if self.old_role == self._ROLE_CANDIDATE: 875 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 876 ssh_result = self.rpc.call_node_ssh_key_remove( 877 [master_node], 878 node.uuid, node.name, 879 master_candidate_uuids, potential_master_candidates, 880 True, # remove node's key from all nodes' authorized_keys file 881 False, # currently, all nodes are potential master candidates 882 False, # do not clear node's 'authorized_keys' 883 False, # do not clear node's 'ganeti_pub_keys' 884 False) # no readd 885 ssh_result[master_node].Raise( 886 "Could not adjust the SSH setup after demoting node '%s'" 887 " (UUID: %s)." % (node.name, node.uuid)) 888 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn) 889 890 if self.new_role == self._ROLE_CANDIDATE: 891 AddMasterCandidateSshKey( 892 self, master_node, node, potential_master_candidates, feedback_fn) 893 894 return result
895 896
897 -class LUNodePowercycle(NoHooksLU):
898 """Powercycles a node. 899 900 """ 901 REQ_BGL = False 902
903 - def CheckArguments(self):
904 (self.op.node_uuid, self.op.node_name) = \ 905 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 906 907 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 908 raise errors.OpPrereqError("The node is the master and the force" 909 " parameter was not set", 910 errors.ECODE_INVAL)
911
912 - def ExpandNames(self):
913 """Locking for PowercycleNode. 914 915 This is a last-resort option and shouldn't block on other 916 jobs. Therefore, we grab no locks. 917 918 """ 919 self.needed_locks = {}
920
921 - def Exec(self, feedback_fn):
922 """Reboots a node. 923 924 """ 925 default_hypervisor = self.cfg.GetHypervisorType() 926 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 927 result = self.rpc.call_node_powercycle(self.op.node_uuid, 928 default_hypervisor, 929 hvparams) 930 result.Raise("Failed to schedule the reboot") 931 return result.payload
932 933
934 -def _GetNodeInstancesInner(cfg, fn):
935 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
936 937
938 -def _GetNodePrimaryInstances(cfg, node_uuid):
939 """Returns primary instances on a node. 940 941 """ 942 return _GetNodeInstancesInner(cfg, 943 lambda inst: node_uuid == inst.primary_node)
944 945
946 -def _GetNodeSecondaryInstances(cfg, node_uuid):
947 """Returns secondary instances on a node. 948 949 """ 950 return _GetNodeInstancesInner(cfg, 951 lambda inst: node_uuid in 952 cfg.GetInstanceSecondaryNodes(inst.uuid))
953 954
955 -def _GetNodeInstances(cfg, node_uuid):
956 """Returns a list of all primary and secondary instances on a node. 957 958 """ 959 960 return _GetNodeInstancesInner(cfg, 961 lambda inst: node_uuid in 962 cfg.GetInstanceNodes(inst.uuid))
963 964
965 -class LUNodeEvacuate(NoHooksLU):
966 """Evacuates instances off a list of nodes. 967 968 """ 969 REQ_BGL = False 970
971 - def CheckArguments(self):
972 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
973
974 - def ExpandNames(self):
975 (self.op.node_uuid, self.op.node_name) = \ 976 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 977 978 if self.op.remote_node is not None: 979 (self.op.remote_node_uuid, self.op.remote_node) = \ 980 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 981 self.op.remote_node) 982 assert self.op.remote_node 983 984 if self.op.node_uuid == self.op.remote_node_uuid: 985 raise errors.OpPrereqError("Can not use evacuated node as a new" 986 " secondary node", errors.ECODE_INVAL) 987 988 if self.op.mode != constants.NODE_EVAC_SEC: 989 raise errors.OpPrereqError("Without the use of an iallocator only" 990 " secondary instances can be evacuated", 991 errors.ECODE_INVAL) 992 993 # Declare locks 994 self.share_locks = ShareAll() 995 self.needed_locks = { 996 locking.LEVEL_INSTANCE: [], 997 locking.LEVEL_NODEGROUP: [], 998 locking.LEVEL_NODE: [], 999 } 1000 1001 # Determine nodes (via group) optimistically, needs verification once locks 1002 # have been acquired 1003 self.lock_nodes = self._DetermineNodes()
1004
1005 - def _DetermineNodes(self):
1006 """Gets the list of node UUIDs to operate on. 1007 1008 """ 1009 if self.op.remote_node is None: 1010 # Iallocator will choose any node(s) in the same group 1011 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 1012 else: 1013 group_nodes = frozenset([self.op.remote_node_uuid]) 1014 1015 # Determine nodes to be locked 1016 return set([self.op.node_uuid]) | group_nodes
1017
1018 - def _DetermineInstances(self):
1019 """Builds list of instances to operate on. 1020 1021 """ 1022 assert self.op.mode in constants.NODE_EVAC_MODES 1023 1024 if self.op.mode == constants.NODE_EVAC_PRI: 1025 # Primary instances only 1026 inst_fn = _GetNodePrimaryInstances 1027 assert self.op.remote_node is None, \ 1028 "Evacuating primary instances requires iallocator" 1029 elif self.op.mode == constants.NODE_EVAC_SEC: 1030 # Secondary instances only 1031 inst_fn = _GetNodeSecondaryInstances 1032 else: 1033 # All instances 1034 assert self.op.mode == constants.NODE_EVAC_ALL 1035 inst_fn = _GetNodeInstances 1036 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 1037 # per instance 1038 raise errors.OpPrereqError("Due to an issue with the iallocator" 1039 " interface it is not possible to evacuate" 1040 " all instances at once; specify explicitly" 1041 " whether to evacuate primary or secondary" 1042 " instances", 1043 errors.ECODE_INVAL) 1044 1045 return inst_fn(self.cfg, self.op.node_uuid)
1046
1047 - def DeclareLocks(self, level):
1048 if level == locking.LEVEL_INSTANCE: 1049 # Lock instances optimistically, needs verification once node and group 1050 # locks have been acquired 1051 self.needed_locks[locking.LEVEL_INSTANCE] = \ 1052 set(i.name for i in self._DetermineInstances()) 1053 1054 elif level == locking.LEVEL_NODEGROUP: 1055 # Lock node groups for all potential target nodes optimistically, needs 1056 # verification once nodes have been acquired 1057 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1058 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 1059 1060 elif level == locking.LEVEL_NODE: 1061 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
1062
1063 - def CheckPrereq(self):
1064 # Verify locks 1065 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 1066 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 1067 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1068 1069 need_nodes = self._DetermineNodes() 1070 1071 if not owned_nodes.issuperset(need_nodes): 1072 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 1073 " locks were acquired, current nodes are" 1074 " are '%s', used to be '%s'; retry the" 1075 " operation" % 1076 (self.op.node_name, 1077 utils.CommaJoin(need_nodes), 1078 utils.CommaJoin(owned_nodes)), 1079 errors.ECODE_STATE) 1080 1081 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 1082 if owned_groups != wanted_groups: 1083 raise errors.OpExecError("Node groups changed since locks were acquired," 1084 " current groups are '%s', used to be '%s';" 1085 " retry the operation" % 1086 (utils.CommaJoin(wanted_groups), 1087 utils.CommaJoin(owned_groups))) 1088 1089 # Determine affected instances 1090 self.instances = self._DetermineInstances() 1091 self.instance_names = [i.name for i in self.instances] 1092 1093 if set(self.instance_names) != owned_instance_names: 1094 raise errors.OpExecError("Instances on node '%s' changed since locks" 1095 " were acquired, current instances are '%s'," 1096 " used to be '%s'; retry the operation" % 1097 (self.op.node_name, 1098 utils.CommaJoin(self.instance_names), 1099 utils.CommaJoin(owned_instance_names))) 1100 1101 if self.instance_names: 1102 self.LogInfo("Evacuating instances from node '%s': %s", 1103 self.op.node_name, 1104 utils.CommaJoin(utils.NiceSort(self.instance_names))) 1105 else: 1106 self.LogInfo("No instances to evacuate from node '%s'", 1107 self.op.node_name) 1108 1109 if self.op.remote_node is not None: 1110 for i in self.instances: 1111 if i.primary_node == self.op.remote_node_uuid: 1112 raise errors.OpPrereqError("Node %s is the primary node of" 1113 " instance %s, cannot use it as" 1114 " secondary" % 1115 (self.op.remote_node, i.name), 1116 errors.ECODE_INVAL)
1117
1118 - def Exec(self, feedback_fn):
1119 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 1120 1121 if not self.instance_names: 1122 # No instances to evacuate 1123 jobs = [] 1124 1125 elif self.op.iallocator is not None: 1126 # TODO: Implement relocation to other group 1127 req = iallocator.IAReqNodeEvac( 1128 evac_mode=self.op.mode, instances=list(self.instance_names), 1129 ignore_soft_errors=self.op.ignore_soft_errors) 1130 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1131 1132 ial.Run(self.op.iallocator) 1133 1134 if not ial.success: 1135 raise errors.OpPrereqError("Can't compute node evacuation using" 1136 " iallocator '%s': %s" % 1137 (self.op.iallocator, ial.info), 1138 errors.ECODE_NORES) 1139 1140 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1141 1142 elif self.op.remote_node is not None: 1143 assert self.op.mode == constants.NODE_EVAC_SEC 1144 jobs = [ 1145 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1146 remote_node=self.op.remote_node, 1147 disks=[], 1148 mode=constants.REPLACE_DISK_CHG, 1149 early_release=self.op.early_release)] 1150 for instance_name in self.instance_names] 1151 1152 else: 1153 raise errors.ProgrammerError("No iallocator or remote node") 1154 1155 return ResultWithJobs(jobs)
1156 1157
1158 -class LUNodeMigrate(LogicalUnit):
1159 """Migrate all instances from a node. 1160 1161 """ 1162 HPATH = "node-migrate" 1163 HTYPE = constants.HTYPE_NODE 1164 REQ_BGL = False 1165
1166 - def CheckArguments(self):
1167 pass
1168
1169 - def ExpandNames(self):
1170 (self.op.node_uuid, self.op.node_name) = \ 1171 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1172 1173 self.share_locks = ShareAll() 1174 self.needed_locks = { 1175 locking.LEVEL_NODE: [self.op.node_uuid], 1176 }
1177
1178 - def BuildHooksEnv(self):
1179 """Build hooks env. 1180 1181 This runs on the master, the primary and all the secondaries. 1182 1183 """ 1184 return { 1185 "NODE_NAME": self.op.node_name, 1186 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1187 }
1188
1189 - def BuildHooksNodes(self):
1190 """Build hooks nodes. 1191 1192 """ 1193 nl = [self.cfg.GetMasterNode()] 1194 return (nl, nl)
1195
1196 - def CheckPrereq(self):
1197 pass
1198
1199 - def Exec(self, feedback_fn):
1200 # Prepare jobs for migration instances 1201 jobs = [ 1202 [opcodes.OpInstanceMigrate( 1203 instance_name=inst.name, 1204 mode=self.op.mode, 1205 live=self.op.live, 1206 iallocator=self.op.iallocator, 1207 target_node=self.op.target_node, 1208 allow_runtime_changes=self.op.allow_runtime_changes, 1209 ignore_ipolicy=self.op.ignore_ipolicy)] 1210 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1211 1212 # TODO: Run iallocator in this opcode and pass correct placement options to 1213 # OpInstanceMigrate. Since other jobs can modify the cluster between 1214 # running the iallocator and the actual migration, a good consistency model 1215 # will have to be found. 1216 1217 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1218 frozenset([self.op.node_uuid])) 1219 1220 return ResultWithJobs(jobs)
1221 1222
1223 -def _GetStorageTypeArgs(cfg, storage_type):
1224 """Returns the arguments for a storage type. 1225 1226 """ 1227 # Special case for file storage 1228 1229 if storage_type == constants.ST_FILE: 1230 return [[cfg.GetFileStorageDir()]] 1231 elif storage_type == constants.ST_SHARED_FILE: 1232 return [[cfg.GetSharedFileStorageDir()]] 1233 elif storage_type == constants.ST_GLUSTER: 1234 return [[cfg.GetGlusterStorageDir()]] 1235 else: 1236 return []
1237 1238
1239 -class LUNodeModifyStorage(NoHooksLU):
1240 """Logical unit for modifying a storage volume on a node. 1241 1242 """ 1243 REQ_BGL = False 1244
1245 - def CheckArguments(self):
1246 (self.op.node_uuid, self.op.node_name) = \ 1247 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1248 1249 storage_type = self.op.storage_type 1250 1251 try: 1252 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1253 except KeyError: 1254 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1255 " modified" % storage_type, 1256 errors.ECODE_INVAL) 1257 1258 diff = set(self.op.changes.keys()) - modifiable 1259 if diff: 1260 raise errors.OpPrereqError("The following fields can not be modified for" 1261 " storage units of type '%s': %r" % 1262 (storage_type, list(diff)), 1263 errors.ECODE_INVAL)
1264
1265 - def CheckPrereq(self):
1266 """Check prerequisites. 1267 1268 """ 1269 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1270
1271 - def ExpandNames(self):
1272 self.needed_locks = { 1273 locking.LEVEL_NODE: self.op.node_uuid, 1274 }
1275
1276 - def Exec(self, feedback_fn):
1277 """Computes the list of nodes and their attributes. 1278 1279 """ 1280 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1281 result = self.rpc.call_storage_modify(self.op.node_uuid, 1282 self.op.storage_type, st_args, 1283 self.op.name, self.op.changes) 1284 result.Raise("Failed to modify storage unit '%s' on %s" % 1285 (self.op.name, self.op.node_name))
1286 1287
1288 -def _CheckOutputFields(fields, selected):
1289 """Checks whether all selected fields are valid according to fields. 1290 1291 @type fields: L{utils.FieldSet} 1292 @param fields: fields set 1293 @type selected: L{utils.FieldSet} 1294 @param selected: fields set 1295 1296 """ 1297 delta = fields.NonMatching(selected) 1298 if delta: 1299 raise errors.OpPrereqError("Unknown output fields selected: %s" 1300 % ",".join(delta), errors.ECODE_INVAL)
1301 1302
1303 -class LUNodeQueryvols(NoHooksLU):
1304 """Logical unit for getting volumes on node(s). 1305 1306 """ 1307 REQ_BGL = False 1308
1309 - def CheckArguments(self):
1314
1315 - def ExpandNames(self):
1316 self.share_locks = ShareAll() 1317 1318 if self.op.nodes: 1319 self.needed_locks = { 1320 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1321 } 1322 else: 1323 self.needed_locks = { 1324 locking.LEVEL_NODE: locking.ALL_SET, 1325 }
1326
1327 - def Exec(self, feedback_fn):
1328 """Computes the list of nodes and their attributes. 1329 1330 """ 1331 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1332 volumes = self.rpc.call_node_volumes(node_uuids) 1333 1334 ilist = self.cfg.GetAllInstancesInfo() 1335 vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values()) 1336 1337 output = [] 1338 for node_uuid in node_uuids: 1339 nresult = volumes[node_uuid] 1340 if nresult.offline: 1341 continue 1342 msg = nresult.fail_msg 1343 if msg: 1344 self.LogWarning("Can't compute volume data on node %s: %s", 1345 self.cfg.GetNodeName(node_uuid), msg) 1346 continue 1347 1348 node_vols = sorted(nresult.payload, 1349 key=operator.itemgetter(constants.VF_DEV)) 1350 1351 for vol in node_vols: 1352 node_output = [] 1353 for field in self.op.output_fields: 1354 if field == constants.VF_NODE: 1355 val = self.cfg.GetNodeName(node_uuid) 1356 elif field == constants.VF_PHYS: 1357 val = vol[constants.VF_DEV] 1358 elif field == constants.VF_VG: 1359 val = vol[constants.VF_VG] 1360 elif field == constants.VF_NAME: 1361 val = vol[constants.VF_NAME] 1362 elif field == constants.VF_SIZE: 1363 val = int(float(vol[constants.VF_SIZE])) 1364 elif field == constants.VF_INSTANCE: 1365 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + 1366 vol[constants.VF_NAME]), None) 1367 if inst is not None: 1368 val = inst.name 1369 else: 1370 val = "-" 1371 else: 1372 raise errors.ParameterError(field) 1373 node_output.append(str(val)) 1374 1375 output.append(node_output) 1376 1377 return output
1378 1379
1380 -class LUNodeQueryStorage(NoHooksLU):
1381 """Logical unit for getting information on storage units on node(s). 1382 1383 """ 1384 REQ_BGL = False 1385
1386 - def CheckArguments(self):
1387 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1388 self.op.output_fields)
1389
1390 - def ExpandNames(self):
1391 self.share_locks = ShareAll() 1392 1393 if self.op.nodes: 1394 self.needed_locks = { 1395 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1396 } 1397 else: 1398 self.needed_locks = { 1399 locking.LEVEL_NODE: locking.ALL_SET, 1400 }
1401
1402 - def _DetermineStorageType(self):
1403 """Determines the default storage type of the cluster. 1404 1405 """ 1406 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates 1407 default_storage_type = \ 1408 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] 1409 return default_storage_type
1410
1411 - def CheckPrereq(self):
1412 """Check prerequisites. 1413 1414 """ 1415 if self.op.storage_type: 1416 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1417 self.storage_type = self.op.storage_type 1418 else: 1419 self.storage_type = self._DetermineStorageType() 1420 supported_storage_types = constants.STS_REPORT_NODE_STORAGE 1421 if self.storage_type not in supported_storage_types: 1422 raise errors.OpPrereqError( 1423 "Storage reporting for storage type '%s' is not supported. Please" 1424 " use the --storage-type option to specify one of the supported" 1425 " storage types (%s) or set the default disk template to one that" 1426 " supports storage reporting." % 1427 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1428
1429 - def Exec(self, feedback_fn):
1430 """Computes the list of nodes and their attributes. 1431 1432 """ 1433 if self.op.storage_type: 1434 self.storage_type = self.op.storage_type 1435 else: 1436 self.storage_type = self._DetermineStorageType() 1437 1438 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1439 1440 # Always get name to sort by 1441 if constants.SF_NAME in self.op.output_fields: 1442 fields = self.op.output_fields[:] 1443 else: 1444 fields = [constants.SF_NAME] + self.op.output_fields 1445 1446 # Never ask for node or type as it's only known to the LU 1447 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1448 while extra in fields: 1449 fields.remove(extra) 1450 1451 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1452 name_idx = field_idx[constants.SF_NAME] 1453 1454 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) 1455 data = self.rpc.call_storage_list(self.node_uuids, 1456 self.storage_type, st_args, 1457 self.op.name, fields) 1458 1459 result = [] 1460 1461 for node_uuid in utils.NiceSort(self.node_uuids): 1462 node_name = self.cfg.GetNodeName(node_uuid) 1463 nresult = data[node_uuid] 1464 if nresult.offline: 1465 continue 1466 1467 msg = nresult.fail_msg 1468 if msg: 1469 self.LogWarning("Can't get storage data from node %s: %s", 1470 node_name, msg) 1471 continue 1472 1473 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1474 1475 for name in utils.NiceSort(rows.keys()): 1476 row = rows[name] 1477 1478 out = [] 1479 1480 for field in self.op.output_fields: 1481 if field == constants.SF_NODE: 1482 val = node_name 1483 elif field == constants.SF_TYPE: 1484 val = self.storage_type 1485 elif field in field_idx: 1486 val = row[field_idx[field]] 1487 else: 1488 raise errors.ParameterError(field) 1489 1490 out.append(val) 1491 1492 result.append(out) 1493 1494 return result
1495 1496
1497 -class LUNodeRemove(LogicalUnit):
1498 """Logical unit for removing a node. 1499 1500 """ 1501 HPATH = "node-remove" 1502 HTYPE = constants.HTYPE_NODE 1503
1504 - def BuildHooksEnv(self):
1505 """Build hooks env. 1506 1507 """ 1508 return { 1509 "OP_TARGET": self.op.node_name, 1510 "NODE_NAME": self.op.node_name, 1511 }
1512
1513 - def BuildHooksNodes(self):
1514 """Build hooks nodes. 1515 1516 This doesn't run on the target node in the pre phase as a failed 1517 node would then be impossible to remove. 1518 1519 """ 1520 all_nodes = self.cfg.GetNodeList() 1521 try: 1522 all_nodes.remove(self.op.node_uuid) 1523 except ValueError: 1524 pass 1525 return (all_nodes, all_nodes)
1526
1527 - def CheckPrereq(self):
1528 """Check prerequisites. 1529 1530 This checks: 1531 - the node exists in the configuration 1532 - it does not have primary or secondary instances 1533 - it's not the master 1534 1535 Any errors are signaled by raising errors.OpPrereqError. 1536 1537 """ 1538 (self.op.node_uuid, self.op.node_name) = \ 1539 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1540 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1541 assert node is not None 1542 1543 masternode = self.cfg.GetMasterNode() 1544 if node.uuid == masternode: 1545 raise errors.OpPrereqError("Node is the master node, failover to another" 1546 " node is required", errors.ECODE_INVAL) 1547 1548 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1549 if node.uuid in self.cfg.GetInstanceNodes(instance.uuid): 1550 raise errors.OpPrereqError("Instance %s is still running on the node," 1551 " please remove first" % instance.name, 1552 errors.ECODE_INVAL) 1553 self.op.node_name = node.name 1554 self.node = node
1555
1556 - def Exec(self, feedback_fn):
1557 """Removes the node from the cluster. 1558 1559 """ 1560 logging.info("Stopping the node daemon and removing configs from node %s", 1561 self.node.name) 1562 1563 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1564 1565 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1566 "Not owning BGL" 1567 1568 master_node = self.cfg.GetMasterNode() 1569 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 1570 if modify_ssh_setup: 1571 # retrieve the list of potential master candidates before the node is 1572 # removed 1573 potential_master_candidate = \ 1574 self.op.node_name in potential_master_candidates 1575 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 1576 result = self.rpc.call_node_ssh_key_remove( 1577 [master_node], 1578 self.node.uuid, self.op.node_name, 1579 master_candidate_uuids, potential_master_candidates, 1580 self.node.master_candidate, # from_authorized_keys 1581 potential_master_candidate, # from_public_keys 1582 True, # clear node's 'authorized_keys' 1583 True, # clear node's 'ganeti_public_keys' 1584 False) # no readd 1585 result[master_node].Raise( 1586 "Could not remove the SSH key of node '%s' (UUID: %s)." % 1587 (self.op.node_name, self.node.uuid)) 1588 WarnAboutFailedSshUpdates(result, master_node, feedback_fn) 1589 1590 # Promote nodes to master candidate as needed 1591 AdjustCandidatePool( 1592 self, [self.node.uuid], master_node, potential_master_candidates, 1593 feedback_fn, modify_ssh_setup) 1594 self.context.RemoveNode(self.cfg, self.node) 1595 1596 # Run post hooks on the node before it's removed 1597 RunPostHook(self, self.node.name) 1598 1599 # we have to call this by name rather than by UUID, as the node is no longer 1600 # in the config 1601 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1602 msg = result.fail_msg 1603 if msg: 1604 self.LogWarning("Errors encountered on the remote node while leaving" 1605 " the cluster: %s", msg) 1606 1607 cluster = self.cfg.GetClusterInfo() 1608 1609 # Remove node from candidate certificate list 1610 if self.node.master_candidate: 1611 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid) 1612 1613 # Remove node from our /etc/hosts 1614 if cluster.modify_etc_hosts: 1615 master_node_uuid = self.cfg.GetMasterNode() 1616 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1617 constants.ETC_HOSTS_REMOVE, 1618 self.node.name, None) 1619 result.Raise("Can't update hosts file with new host data") 1620 RedistributeAncillaryFiles(self)
1621 1622
1623 -class LURepairNodeStorage(NoHooksLU):
1624 """Repairs the volume group on a node. 1625 1626 """ 1627 REQ_BGL = False 1628
1629 - def CheckArguments(self):
1630 (self.op.node_uuid, self.op.node_name) = \ 1631 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1632 1633 storage_type = self.op.storage_type 1634 1635 if (constants.SO_FIX_CONSISTENCY not in 1636 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1637 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1638 " repaired" % storage_type, 1639 errors.ECODE_INVAL)
1640
1641 - def ExpandNames(self):
1642 self.needed_locks = { 1643 locking.LEVEL_NODE: [self.op.node_uuid], 1644 }
1645
1646 - def _CheckFaultyDisks(self, instance, node_uuid):
1647 """Ensure faulty disks abort the opcode or at least warn.""" 1648 try: 1649 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1650 node_uuid, True): 1651 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1652 " node '%s'" % 1653 (instance.name, 1654 self.cfg.GetNodeName(node_uuid)), 1655 errors.ECODE_STATE) 1656 except errors.OpPrereqError, err: 1657 if self.op.ignore_consistency: 1658 self.LogWarning(str(err.args[0])) 1659 else: 1660 raise
1661
1662 - def CheckPrereq(self):
1663 """Check prerequisites. 1664 1665 """ 1666 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1667 1668 # Check whether any instance on this node has faulty disks 1669 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1670 if not inst.disks_active: 1671 continue 1672 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid)) 1673 check_nodes.discard(self.op.node_uuid) 1674 for inst_node_uuid in check_nodes: 1675 self._CheckFaultyDisks(inst, inst_node_uuid)
1676
1677 - def Exec(self, feedback_fn):
1678 feedback_fn("Repairing storage unit '%s' on %s ..." % 1679 (self.op.name, self.op.node_name)) 1680 1681 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1682 result = self.rpc.call_storage_execute(self.op.node_uuid, 1683 self.op.storage_type, st_args, 1684 self.op.name, 1685 constants.SO_FIX_CONSISTENCY) 1686 result.Raise("Failed to repair storage unit '%s' on %s" % 1687 (self.op.name, self.op.node_name))
1688