Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with nodes.""" 
  32   
  33  import logging 
  34  import operator 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import netutils 
  40  from ganeti import objects 
  41  from ganeti import opcodes 
  42  import ganeti.rpc.node as rpc 
  43  from ganeti import utils 
  44  from ganeti.masterd import iallocator 
  45   
  46  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
  47  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  48    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  49    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  50    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  51    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  52    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  53    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  54    FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \ 
  55    AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \ 
  56    EnsureKvmdOnNodes, WarnAboutFailedSshUpdates 
  57   
  58   
59 -def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate. 61 62 """ 63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 65 # the new node will increase mc_max with one, so: 66 mc_should = min(mc_should + 1, cp_size) 67 return mc_now < mc_should
68 69
70 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip. 72 73 @type lu: L{LogicalUnit} 74 @param lu: the LU on behalf of which we make the check 75 @type node: L{objects.Node} 76 @param node: the node to check 77 @type secondary_ip: string 78 @param secondary_ip: the ip to check 79 @type prereq: boolean 80 @param prereq: whether to throw a prerequisite or an execute error 81 @raise errors.OpPrereqError: if the node doesn't have the ip, 82 and prereq=True 83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 84 85 """ 86 # this can be called with a new node, which has no UUID yet, so perform the 87 # RPC call using its name 88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 89 result.Raise("Failure checking secondary ip on node %s" % node.name, 90 prereq=prereq, ecode=errors.ECODE_ENVIRON) 91 if not result.payload: 92 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 93 " please fix and re-run this command" % secondary_ip) 94 if prereq: 95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 96 else: 97 raise errors.OpExecError(msg)
98 99
100 -class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster. 102 103 """ 104 HPATH = "node-add" 105 HTYPE = constants.HTYPE_NODE 106 _NFLAGS = ["master_capable", "vm_capable"] 107
108 - def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 110 # validate/normalize the node name 111 self.hostname = netutils.GetHostname(name=self.op.node_name, 112 family=self.primary_ip_family) 113 self.op.node_name = self.hostname.name 114 115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 116 raise errors.OpPrereqError("Cannot readd the master node", 117 errors.ECODE_STATE) 118 119 if self.op.readd and self.op.group: 120 raise errors.OpPrereqError("Cannot pass a node group when a node is" 121 " being readded", errors.ECODE_INVAL)
122
123 - def BuildHooksEnv(self):
124 """Build hooks env. 125 126 This will run on all nodes before, and on all nodes + the new node after. 127 128 """ 129 return { 130 "OP_TARGET": self.op.node_name, 131 "NODE_NAME": self.op.node_name, 132 "NODE_PIP": self.op.primary_ip, 133 "NODE_SIP": self.op.secondary_ip, 134 "MASTER_CAPABLE": str(self.op.master_capable), 135 "VM_CAPABLE": str(self.op.vm_capable), 136 }
137
138 - def BuildHooksNodes(self):
139 """Build hooks nodes. 140 141 """ 142 hook_nodes = self.cfg.GetNodeList() 143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 144 if new_node_info is not None: 145 # Exclude added node 146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 147 148 # add the new node as post hook node by name; it does not have an UUID yet 149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 - def CheckPrereq(self):
155 """Check prerequisites. 156 157 This checks: 158 - the new node is not already in the config 159 - it is resolvable 160 - its parameters (single/dual homed) matches the cluster 161 162 Any errors are signaled by raising errors.OpPrereqError. 163 164 """ 165 node_name = self.hostname.name 166 self.op.primary_ip = self.hostname.ip 167 if self.op.secondary_ip is None: 168 if self.primary_ip_family == netutils.IP6Address.family: 169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 170 " IPv4 address must be given as secondary", 171 errors.ECODE_INVAL) 172 self.op.secondary_ip = self.op.primary_ip 173 174 secondary_ip = self.op.secondary_ip 175 if not netutils.IP4Address.IsValid(secondary_ip): 176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 177 " address" % secondary_ip, errors.ECODE_INVAL) 178 179 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 180 if not self.op.readd and existing_node_info is not None: 181 raise errors.OpPrereqError("Node %s is already in the configuration" % 182 node_name, errors.ECODE_EXISTS) 183 elif self.op.readd and existing_node_info is None: 184 raise errors.OpPrereqError("Node %s is not in the configuration" % 185 node_name, errors.ECODE_NOENT) 186 187 self.changed_primary_ip = False 188 189 for existing_node in self.cfg.GetAllNodesInfo().values(): 190 if self.op.readd and node_name == existing_node.name: 191 if existing_node.secondary_ip != secondary_ip: 192 raise errors.OpPrereqError("Readded node doesn't have the same IP" 193 " address configuration as before", 194 errors.ECODE_INVAL) 195 if existing_node.primary_ip != self.op.primary_ip: 196 self.changed_primary_ip = True 197 198 continue 199 200 if (existing_node.primary_ip == self.op.primary_ip or 201 existing_node.secondary_ip == self.op.primary_ip or 202 existing_node.primary_ip == secondary_ip or 203 existing_node.secondary_ip == secondary_ip): 204 raise errors.OpPrereqError("New node ip address(es) conflict with" 205 " existing node %s" % existing_node.name, 206 errors.ECODE_NOTUNIQUE) 207 208 # After this 'if' block, None is no longer a valid value for the 209 # _capable op attributes 210 if self.op.readd: 211 assert existing_node_info is not None, \ 212 "Can't retrieve locked node %s" % node_name 213 for attr in self._NFLAGS: 214 if getattr(self.op, attr) is None: 215 setattr(self.op, attr, getattr(existing_node_info, attr)) 216 else: 217 for attr in self._NFLAGS: 218 if getattr(self.op, attr) is None: 219 setattr(self.op, attr, True) 220 221 if self.op.readd and not self.op.vm_capable: 222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 223 if pri or sec: 224 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 225 " flag set to false, but it already holds" 226 " instances" % node_name, 227 errors.ECODE_STATE) 228 229 # check that the type of the node (single versus dual homed) is the 230 # same as for the master 231 myself = self.cfg.GetMasterNodeInfo() 232 master_singlehomed = myself.secondary_ip == myself.primary_ip 233 newbie_singlehomed = secondary_ip == self.op.primary_ip 234 if master_singlehomed != newbie_singlehomed: 235 if master_singlehomed: 236 raise errors.OpPrereqError("The master has no secondary ip but the" 237 " new node has one", 238 errors.ECODE_INVAL) 239 else: 240 raise errors.OpPrereqError("The master has a secondary ip but the" 241 " new node doesn't have one", 242 errors.ECODE_INVAL) 243 244 # checks reachability 245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 246 raise errors.OpPrereqError("Node not reachable by ping", 247 errors.ECODE_ENVIRON) 248 249 if not newbie_singlehomed: 250 # check reachability from my secondary ip to newbie's secondary ip 251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 252 source=myself.secondary_ip): 253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 254 " based ping to node daemon port", 255 errors.ECODE_ENVIRON) 256 257 if self.op.readd: 258 exceptions = [existing_node_info.uuid] 259 else: 260 exceptions = [] 261 262 if self.op.master_capable: 263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 264 else: 265 self.master_candidate = False 266 267 self.node_group = None 268 if self.op.readd: 269 self.new_node = existing_node_info 270 self.node_group = existing_node_info.group 271 else: 272 self.node_group = self.cfg.LookupNodeGroup(self.op.group) 273 self.new_node = objects.Node(name=node_name, 274 primary_ip=self.op.primary_ip, 275 secondary_ip=secondary_ip, 276 master_candidate=self.master_candidate, 277 offline=False, drained=False, 278 group=self.node_group, ndparams={}) 279 280 if self.op.ndparams: 281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 283 "node", "cluster or group") 284 285 if self.op.hv_state: 286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 287 288 if self.op.disk_state: 289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 290 291 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 292 # it a property on the base class. 293 rpcrunner = rpc.DnsOnlyRunner() 294 result = rpcrunner.call_version([node_name])[node_name] 295 result.Raise("Can't get version information from node %s" % node_name, 296 prereq=True, ecode=errors.ECODE_ENVIRON) 297 if constants.PROTOCOL_VERSION == result.payload: 298 logging.info("Communication to node %s fine, sw version %s match", 299 node_name, result.payload) 300 else: 301 raise errors.OpPrereqError("Version mismatch master version %s," 302 " node version %s" % 303 (constants.PROTOCOL_VERSION, result.payload), 304 errors.ECODE_ENVIRON) 305 306 vg_name = self.cfg.GetVGName() 307 if vg_name is not None: 308 vparams = {constants.NV_PVLIST: [vg_name]} 309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 310 cname = self.cfg.GetClusterName() 311 result = rpcrunner.call_node_verify_light( 312 [node_name], vparams, cname, 313 self.cfg.GetClusterInfo().hvparams, 314 {node_name: self.node_group}, 315 self.cfg.GetAllNodeGroupsInfoDict() 316 )[node_name] 317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 318 if errmsgs: 319 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
322 - def _InitOpenVSwitch(self):
323 filled_ndparams = self.cfg.GetClusterInfo().FillND( 324 self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) 325 326 ovs = filled_ndparams.get(constants.ND_OVS, None) 327 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) 328 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) 329 330 if ovs: 331 if not ovs_link: 332 self.LogInfo("No physical interface for OpenvSwitch was given." 333 " OpenvSwitch will not have an outside connection. This" 334 " might not be what you want.") 335 336 result = self.rpc.call_node_configure_ovs( 337 self.new_node.name, ovs_name, ovs_link) 338 result.Raise("Failed to initialize OpenVSwitch on new node")
339
340 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate, 341 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
342 """Update the SSH setup of all nodes after adding a new node. 343 344 @type readd: boolean 345 @param readd: whether or not this node is readded 346 347 """ 348 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 349 master_node = self.cfg.GetMasterNode() 350 351 if readd: 352 # clear previous keys 353 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 354 remove_result = rpcrunner.call_node_ssh_key_remove( 355 [master_node], 356 new_node_uuid, new_node_name, 357 master_candidate_uuids, 358 potential_master_candidates, 359 True, # from authorized keys 360 True, # from public keys 361 False, # clear authorized keys 362 True, # clear public keys 363 True) # it's a readd 364 remove_result[master_node].Raise( 365 "Could not remove SSH keys of node %s before readding," 366 " (UUID: %s)." % (new_node_name, new_node_uuid)) 367 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn) 368 369 result = rpcrunner.call_node_ssh_key_add( 370 [master_node], new_node_uuid, new_node_name, 371 potential_master_candidates, 372 is_master_candidate, is_potential_master_candidate, 373 is_potential_master_candidate) 374 375 result[master_node].Raise("Could not update the node's SSH setup.") 376 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
377
378 - def Exec(self, feedback_fn):
379 """Adds the new node to the cluster. 380 381 """ 382 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 383 "Not owning BGL" 384 385 # We adding a new node so we assume it's powered 386 self.new_node.powered = True 387 388 # for re-adds, reset the offline/drained/master-candidate flags; 389 # we need to reset here, otherwise offline would prevent RPC calls 390 # later in the procedure; this also means that if the re-add 391 # fails, we are left with a non-offlined, broken node 392 if self.op.readd: 393 self.new_node.offline = False 394 self.new_node.drained = False 395 self.LogInfo("Readding a node, the offline/drained flags were reset") 396 # if we demote the node, we do cleanup later in the procedure 397 self.new_node.master_candidate = self.master_candidate 398 if self.changed_primary_ip: 399 self.new_node.primary_ip = self.op.primary_ip 400 401 # copy the master/vm_capable flags 402 for attr in self._NFLAGS: 403 setattr(self.new_node, attr, getattr(self.op, attr)) 404 405 # notify the user about any possible mc promotion 406 if self.new_node.master_candidate: 407 self.LogInfo("Node will be a master candidate") 408 409 if self.op.ndparams: 410 self.new_node.ndparams = self.op.ndparams 411 else: 412 self.new_node.ndparams = {} 413 414 if self.op.hv_state: 415 self.new_node.hv_state_static = self.new_hv_state 416 417 if self.op.disk_state: 418 self.new_node.disk_state_static = self.new_disk_state 419 420 # Add node to our /etc/hosts, and add key to known_hosts 421 if self.cfg.GetClusterInfo().modify_etc_hosts: 422 master_node = self.cfg.GetMasterNode() 423 result = self.rpc.call_etc_hosts_modify( 424 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 425 self.hostname.ip) 426 result.Raise("Can't update hosts file with new host data") 427 428 if self.new_node.secondary_ip != self.new_node.primary_ip: 429 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 430 False) 431 432 node_verifier_uuids = [self.cfg.GetMasterNode()] 433 node_verify_param = { 434 constants.NV_NODELIST: ([self.new_node.name], {}, []), 435 # TODO: do a node-net-test as well? 436 } 437 438 result = self.rpc.call_node_verify( 439 node_verifier_uuids, node_verify_param, 440 self.cfg.GetClusterName(), 441 self.cfg.GetClusterInfo().hvparams, 442 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)}, 443 self.cfg.GetAllNodeGroupsInfoDict() 444 ) 445 for verifier in node_verifier_uuids: 446 result[verifier].Raise("Cannot communicate with node %s" % verifier) 447 nl_payload = result[verifier].payload[constants.NV_NODELIST] 448 if nl_payload: 449 for failed in nl_payload: 450 feedback_fn("ssh/hostname verification failed" 451 " (checking from %s): %s" % 452 (verifier, nl_payload[failed])) 453 raise errors.OpExecError("ssh/hostname verification failed") 454 455 self._InitOpenVSwitch() 456 457 if self.op.readd: 458 self.context.ReaddNode(self.new_node) 459 RedistributeAncillaryFiles(self) 460 # make sure we redistribute the config 461 self.cfg.Update(self.new_node, feedback_fn) 462 # and make sure the new node will not have old files around 463 if not self.new_node.master_candidate: 464 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 465 result.Warn("Node failed to demote itself from master candidate status", 466 self.LogWarning) 467 else: 468 self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId()) 469 RedistributeAncillaryFiles(self) 470 471 # We create a new certificate even if the node is readded 472 digest = GetClientCertDigest(self, self.new_node.uuid) 473 if self.new_node.master_candidate: 474 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest) 475 else: 476 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None) 477 478 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid]) 479 480 # Update SSH setup of all nodes 481 if self.op.node_setup: 482 # FIXME: so far, all nodes are considered potential master candidates 483 self._SshUpdate(self.new_node.uuid, self.new_node.name, 484 self.new_node.master_candidate, True, 485 self.rpc, self.op.readd, feedback_fn)
486 487
488 -class LUNodeSetParams(LogicalUnit):
489 """Modifies the parameters of a node. 490 491 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 492 to the node role (as _ROLE_*) 493 @cvar _R2F: a dictionary from node role to tuples of flags 494 @cvar _FLAGS: a list of attribute names corresponding to the flags 495 496 """ 497 HPATH = "node-modify" 498 HTYPE = constants.HTYPE_NODE 499 REQ_BGL = False 500 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 501 _F2R = { 502 (True, False, False): _ROLE_CANDIDATE, 503 (False, True, False): _ROLE_DRAINED, 504 (False, False, True): _ROLE_OFFLINE, 505 (False, False, False): _ROLE_REGULAR, 506 } 507 _R2F = dict((v, k) for k, v in _F2R.items()) 508 _FLAGS = ["master_candidate", "drained", "offline"] 509
510 - def CheckArguments(self):
511 (self.op.node_uuid, self.op.node_name) = \ 512 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 513 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 514 self.op.master_capable, self.op.vm_capable, 515 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 516 self.op.disk_state] 517 if all_mods.count(None) == len(all_mods): 518 raise errors.OpPrereqError("Please pass at least one modification", 519 errors.ECODE_INVAL) 520 if all_mods.count(True) > 1: 521 raise errors.OpPrereqError("Can't set the node into more than one" 522 " state at the same time", 523 errors.ECODE_INVAL) 524 525 # Boolean value that tells us whether we might be demoting from MC 526 self.might_demote = (self.op.master_candidate is False or 527 self.op.offline is True or 528 self.op.drained is True or 529 self.op.master_capable is False) 530 531 if self.op.secondary_ip: 532 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 533 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 534 " address" % self.op.secondary_ip, 535 errors.ECODE_INVAL) 536 537 self.lock_all = self.op.auto_promote and self.might_demote 538 self.lock_instances = self.op.secondary_ip is not None
539
540 - def _InstanceFilter(self, instance):
541 """Filter for getting affected instances. 542 543 """ 544 disks = self.cfg.GetInstanceDisks(instance.uuid) 545 any_mirrored = utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR) 546 return (any_mirrored and 547 self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
548
549 - def ExpandNames(self):
550 if self.lock_all: 551 self.needed_locks = { 552 locking.LEVEL_NODE: locking.ALL_SET, 553 } 554 else: 555 self.needed_locks = { 556 locking.LEVEL_NODE: self.op.node_uuid, 557 } 558 559 # Since modifying a node can have severe effects on currently running 560 # operations the resource lock is at least acquired in shared mode 561 self.needed_locks[locking.LEVEL_NODE_RES] = \ 562 self.needed_locks[locking.LEVEL_NODE] 563 564 # Get all locks except nodes in shared mode; they are not used for anything 565 # but read-only access 566 self.share_locks = ShareAll() 567 self.share_locks[locking.LEVEL_NODE] = 0 568 self.share_locks[locking.LEVEL_NODE_RES] = 0 569 570 if self.lock_instances: 571 self.needed_locks[locking.LEVEL_INSTANCE] = \ 572 self.cfg.GetInstanceNames( 573 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
574
575 - def BuildHooksEnv(self):
576 """Build hooks env. 577 578 This runs on the master node. 579 580 """ 581 return { 582 "OP_TARGET": self.op.node_name, 583 "MASTER_CANDIDATE": str(self.op.master_candidate), 584 "OFFLINE": str(self.op.offline), 585 "DRAINED": str(self.op.drained), 586 "MASTER_CAPABLE": str(self.op.master_capable), 587 "VM_CAPABLE": str(self.op.vm_capable), 588 }
589
590 - def BuildHooksNodes(self):
591 """Build hooks nodes. 592 593 """ 594 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 595 return (nl, nl)
596
597 - def CheckPrereq(self):
598 """Check prerequisites. 599 600 This only checks the instance list against the existing names. 601 602 """ 603 node = self.cfg.GetNodeInfo(self.op.node_uuid) 604 if self.lock_instances: 605 affected_instances = \ 606 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 607 608 # Verify instance locks 609 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 610 wanted_instance_names = frozenset([inst.name for inst in 611 affected_instances.values()]) 612 if wanted_instance_names - owned_instance_names: 613 raise errors.OpPrereqError("Instances affected by changing node %s's" 614 " secondary IP address have changed since" 615 " locks were acquired, wanted '%s', have" 616 " '%s'; retry the operation" % 617 (node.name, 618 utils.CommaJoin(wanted_instance_names), 619 utils.CommaJoin(owned_instance_names)), 620 errors.ECODE_STATE) 621 else: 622 affected_instances = None 623 624 if (self.op.master_candidate is not None or 625 self.op.drained is not None or 626 self.op.offline is not None): 627 # we can't change the master's node flags 628 if node.uuid == self.cfg.GetMasterNode(): 629 raise errors.OpPrereqError("The master role can be changed" 630 " only via master-failover", 631 errors.ECODE_INVAL) 632 633 if self.op.master_candidate and not node.master_capable: 634 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 635 " it a master candidate" % node.name, 636 errors.ECODE_STATE) 637 638 if self.op.vm_capable is False: 639 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 640 if ipri or isec: 641 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 642 " the vm_capable flag" % node.name, 643 errors.ECODE_STATE) 644 645 if node.master_candidate and self.might_demote and not self.lock_all: 646 assert not self.op.auto_promote, "auto_promote set but lock_all not" 647 # check if after removing the current node, we're missing master 648 # candidates 649 (mc_remaining, mc_should, _) = \ 650 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 651 if mc_remaining < mc_should: 652 raise errors.OpPrereqError("Not enough master candidates, please" 653 " pass auto promote option to allow" 654 " promotion (--auto-promote or RAPI" 655 " auto_promote=True)", errors.ECODE_STATE) 656 657 self.old_flags = old_flags = (node.master_candidate, 658 node.drained, node.offline) 659 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 660 self.old_role = old_role = self._F2R[old_flags] 661 662 # Check for ineffective changes 663 for attr in self._FLAGS: 664 if getattr(self.op, attr) is False and getattr(node, attr) is False: 665 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 666 setattr(self.op, attr, None) 667 668 # Past this point, any flag change to False means a transition 669 # away from the respective state, as only real changes are kept 670 671 # TODO: We might query the real power state if it supports OOB 672 if SupportsOob(self.cfg, node): 673 if self.op.offline is False and not (node.powered or 674 self.op.powered is True): 675 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 676 " offline status can be reset") % 677 self.op.node_name, errors.ECODE_STATE) 678 elif self.op.powered is not None: 679 raise errors.OpPrereqError(("Unable to change powered state for node %s" 680 " as it does not support out-of-band" 681 " handling") % self.op.node_name, 682 errors.ECODE_STATE) 683 684 # If we're being deofflined/drained, we'll MC ourself if needed 685 if (self.op.drained is False or self.op.offline is False or 686 (self.op.master_capable and not node.master_capable)): 687 if _DecideSelfPromotion(self): 688 self.op.master_candidate = True 689 self.LogInfo("Auto-promoting node to master candidate") 690 691 # If we're no longer master capable, we'll demote ourselves from MC 692 if self.op.master_capable is False and node.master_candidate: 693 if self.op.node_uuid == self.cfg.GetMasterNode(): 694 raise errors.OpPrereqError("Master must remain master capable", 695 errors.ECODE_STATE) 696 self.LogInfo("Demoting from master candidate") 697 self.op.master_candidate = False 698 699 # Compute new role 700 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 701 if self.op.master_candidate: 702 new_role = self._ROLE_CANDIDATE 703 elif self.op.drained: 704 new_role = self._ROLE_DRAINED 705 elif self.op.offline: 706 new_role = self._ROLE_OFFLINE 707 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 708 # False is still in new flags, which means we're un-setting (the 709 # only) True flag 710 new_role = self._ROLE_REGULAR 711 else: # no new flags, nothing, keep old role 712 new_role = old_role 713 714 self.new_role = new_role 715 716 if old_role == self._ROLE_OFFLINE and new_role != old_role: 717 # Trying to transition out of offline status 718 result = self.rpc.call_version([node.uuid])[node.uuid] 719 if result.fail_msg: 720 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 721 " to report its version: %s" % 722 (node.name, result.fail_msg), 723 errors.ECODE_STATE) 724 else: 725 self.LogWarning("Transitioning node from offline to online state" 726 " without using re-add. Please make sure the node" 727 " is healthy!") 728 729 # When changing the secondary ip, verify if this is a single-homed to 730 # multi-homed transition or vice versa, and apply the relevant 731 # restrictions. 732 if self.op.secondary_ip: 733 # Ok even without locking, because this can't be changed by any LU 734 master = self.cfg.GetMasterNodeInfo() 735 master_singlehomed = master.secondary_ip == master.primary_ip 736 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 737 if self.op.force and node.uuid == master.uuid: 738 self.LogWarning("Transitioning from single-homed to multi-homed" 739 " cluster; all nodes will require a secondary IP" 740 " address") 741 else: 742 raise errors.OpPrereqError("Changing the secondary ip on a" 743 " single-homed cluster requires the" 744 " --force option to be passed, and the" 745 " target node to be the master", 746 errors.ECODE_INVAL) 747 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 748 if self.op.force and node.uuid == master.uuid: 749 self.LogWarning("Transitioning from multi-homed to single-homed" 750 " cluster; secondary IP addresses will have to be" 751 " removed") 752 else: 753 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 754 " same as the primary IP on a multi-homed" 755 " cluster, unless the --force option is" 756 " passed, and the target node is the" 757 " master", errors.ECODE_INVAL) 758 759 assert not (set([inst.name for inst in affected_instances.values()]) - 760 self.owned_locks(locking.LEVEL_INSTANCE)) 761 762 if node.offline: 763 if affected_instances: 764 msg = ("Cannot change secondary IP address: offline node has" 765 " instances (%s) configured to use it" % 766 utils.CommaJoin( 767 [inst.name for inst in affected_instances.values()])) 768 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 769 else: 770 # On online nodes, check that no instances are running, and that 771 # the node has the new ip and we can reach it. 772 for instance in affected_instances.values(): 773 CheckInstanceState(self, instance, INSTANCE_DOWN, 774 msg="cannot change secondary ip") 775 776 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 777 if master.uuid != node.uuid: 778 # check reachability from master secondary ip to new secondary ip 779 if not netutils.TcpPing(self.op.secondary_ip, 780 constants.DEFAULT_NODED_PORT, 781 source=master.secondary_ip): 782 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 783 " based ping to node daemon port", 784 errors.ECODE_ENVIRON) 785 786 if self.op.ndparams: 787 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 788 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 789 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 790 "node", "cluster or group") 791 self.new_ndparams = new_ndparams 792 793 if self.op.hv_state: 794 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 795 node.hv_state_static) 796 797 if self.op.disk_state: 798 self.new_disk_state = \ 799 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
800
801 - def Exec(self, feedback_fn):
802 """Modifies a node. 803 804 """ 805 node = self.cfg.GetNodeInfo(self.op.node_uuid) 806 result = [] 807 808 if self.op.ndparams: 809 node.ndparams = self.new_ndparams 810 811 if self.op.powered is not None: 812 node.powered = self.op.powered 813 814 if self.op.hv_state: 815 node.hv_state_static = self.new_hv_state 816 817 if self.op.disk_state: 818 node.disk_state_static = self.new_disk_state 819 820 for attr in ["master_capable", "vm_capable"]: 821 val = getattr(self.op, attr) 822 if val is not None: 823 setattr(node, attr, val) 824 result.append((attr, str(val))) 825 826 if self.op.secondary_ip: 827 node.secondary_ip = self.op.secondary_ip 828 result.append(("secondary_ip", self.op.secondary_ip)) 829 830 # this will trigger configuration file update, if needed 831 self.cfg.Update(node, feedback_fn) 832 833 if self.new_role != self.old_role: 834 new_flags = self._R2F[self.new_role] 835 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 836 if of != nf: 837 result.append((desc, str(nf))) 838 (node.master_candidate, node.drained, node.offline) = new_flags 839 self.cfg.Update(node, feedback_fn) 840 841 # Tell the node to demote itself, if no longer MC and not offline. 842 # This must be done only after the configuration is updated so that 843 # it's ensured the node won't receive any further configuration updates. 844 if self.old_role == self._ROLE_CANDIDATE and \ 845 self.new_role != self._ROLE_OFFLINE: 846 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 847 if msg: 848 self.LogWarning("Node failed to demote itself: %s", msg) 849 850 # we locked all nodes, we adjust the CP before updating this node 851 if self.lock_all: 852 AdjustCandidatePool(self, [node.uuid]) 853 854 # if node gets promoted, grant RPC priviledges 855 if self.new_role == self._ROLE_CANDIDATE: 856 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid) 857 # if node is demoted, revoke RPC priviledges 858 if self.old_role == self._ROLE_CANDIDATE: 859 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid) 860 861 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid]) 862 863 # this will trigger job queue propagation or cleanup if the mc 864 # flag changed 865 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 866 self.context.ReaddNode(node) 867 868 if self.cfg.GetClusterInfo().modify_ssh_setup: 869 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 870 master_node = self.cfg.GetMasterNode() 871 if self.old_role == self._ROLE_CANDIDATE: 872 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 873 ssh_result = self.rpc.call_node_ssh_key_remove( 874 [master_node], 875 node.uuid, node.name, 876 master_candidate_uuids, potential_master_candidates, 877 True, # remove node's key from all nodes' authorized_keys file 878 False, # currently, all nodes are potential master candidates 879 False, # do not clear node's 'authorized_keys' 880 False, # do not clear node's 'ganeti_pub_keys' 881 False) # no readd 882 ssh_result[master_node].Raise( 883 "Could not adjust the SSH setup after demoting node '%s'" 884 " (UUID: %s)." % (node.name, node.uuid)) 885 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn) 886 887 if self.new_role == self._ROLE_CANDIDATE: 888 ssh_result = self.rpc.call_node_ssh_key_add( 889 [master_node], node.uuid, node.name, 890 potential_master_candidates, 891 True, # add node's key to all node's 'authorized_keys' 892 True, # all nodes are potential master candidates 893 False) # do not update the node's public keys 894 ssh_result[master_node].Raise( 895 "Could not update the SSH setup of node '%s' after promotion" 896 " (UUID: %s)." % (node.name, node.uuid)) 897 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn) 898 899 return result
900 901
902 -class LUNodePowercycle(NoHooksLU):
903 """Powercycles a node. 904 905 """ 906 REQ_BGL = False 907
908 - def CheckArguments(self):
909 (self.op.node_uuid, self.op.node_name) = \ 910 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 911 912 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 913 raise errors.OpPrereqError("The node is the master and the force" 914 " parameter was not set", 915 errors.ECODE_INVAL)
916
917 - def ExpandNames(self):
918 """Locking for PowercycleNode. 919 920 This is a last-resort option and shouldn't block on other 921 jobs. Therefore, we grab no locks. 922 923 """ 924 self.needed_locks = {}
925
926 - def Exec(self, feedback_fn):
927 """Reboots a node. 928 929 """ 930 default_hypervisor = self.cfg.GetHypervisorType() 931 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 932 result = self.rpc.call_node_powercycle(self.op.node_uuid, 933 default_hypervisor, 934 hvparams) 935 result.Raise("Failed to schedule the reboot") 936 return result.payload
937 938
939 -def _GetNodeInstancesInner(cfg, fn):
940 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
941 942
943 -def _GetNodePrimaryInstances(cfg, node_uuid):
944 """Returns primary instances on a node. 945 946 """ 947 return _GetNodeInstancesInner(cfg, 948 lambda inst: node_uuid == inst.primary_node)
949 950
951 -def _GetNodeSecondaryInstances(cfg, node_uuid):
952 """Returns secondary instances on a node. 953 954 """ 955 return _GetNodeInstancesInner(cfg, 956 lambda inst: node_uuid in 957 cfg.GetInstanceSecondaryNodes(inst.uuid))
958 959
960 -def _GetNodeInstances(cfg, node_uuid):
961 """Returns a list of all primary and secondary instances on a node. 962 963 """ 964 965 return _GetNodeInstancesInner(cfg, 966 lambda inst: node_uuid in 967 cfg.GetInstanceNodes(inst.uuid))
968 969
970 -class LUNodeEvacuate(NoHooksLU):
971 """Evacuates instances off a list of nodes. 972 973 """ 974 REQ_BGL = False 975
976 - def CheckArguments(self):
977 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
978
979 - def ExpandNames(self):
980 (self.op.node_uuid, self.op.node_name) = \ 981 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 982 983 if self.op.remote_node is not None: 984 (self.op.remote_node_uuid, self.op.remote_node) = \ 985 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 986 self.op.remote_node) 987 assert self.op.remote_node 988 989 if self.op.node_uuid == self.op.remote_node_uuid: 990 raise errors.OpPrereqError("Can not use evacuated node as a new" 991 " secondary node", errors.ECODE_INVAL) 992 993 if self.op.mode != constants.NODE_EVAC_SEC: 994 raise errors.OpPrereqError("Without the use of an iallocator only" 995 " secondary instances can be evacuated", 996 errors.ECODE_INVAL) 997 998 # Declare locks 999 self.share_locks = ShareAll() 1000 self.needed_locks = { 1001 locking.LEVEL_INSTANCE: [], 1002 locking.LEVEL_NODEGROUP: [], 1003 locking.LEVEL_NODE: [], 1004 } 1005 1006 # Determine nodes (via group) optimistically, needs verification once locks 1007 # have been acquired 1008 self.lock_nodes = self._DetermineNodes()
1009
1010 - def _DetermineNodes(self):
1011 """Gets the list of node UUIDs to operate on. 1012 1013 """ 1014 if self.op.remote_node is None: 1015 # Iallocator will choose any node(s) in the same group 1016 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 1017 else: 1018 group_nodes = frozenset([self.op.remote_node_uuid]) 1019 1020 # Determine nodes to be locked 1021 return set([self.op.node_uuid]) | group_nodes
1022
1023 - def _DetermineInstances(self):
1024 """Builds list of instances to operate on. 1025 1026 """ 1027 assert self.op.mode in constants.NODE_EVAC_MODES 1028 1029 if self.op.mode == constants.NODE_EVAC_PRI: 1030 # Primary instances only 1031 inst_fn = _GetNodePrimaryInstances 1032 assert self.op.remote_node is None, \ 1033 "Evacuating primary instances requires iallocator" 1034 elif self.op.mode == constants.NODE_EVAC_SEC: 1035 # Secondary instances only 1036 inst_fn = _GetNodeSecondaryInstances 1037 else: 1038 # All instances 1039 assert self.op.mode == constants.NODE_EVAC_ALL 1040 inst_fn = _GetNodeInstances 1041 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 1042 # per instance 1043 raise errors.OpPrereqError("Due to an issue with the iallocator" 1044 " interface it is not possible to evacuate" 1045 " all instances at once; specify explicitly" 1046 " whether to evacuate primary or secondary" 1047 " instances", 1048 errors.ECODE_INVAL) 1049 1050 return inst_fn(self.cfg, self.op.node_uuid)
1051
1052 - def DeclareLocks(self, level):
1053 if level == locking.LEVEL_INSTANCE: 1054 # Lock instances optimistically, needs verification once node and group 1055 # locks have been acquired 1056 self.needed_locks[locking.LEVEL_INSTANCE] = \ 1057 set(i.name for i in self._DetermineInstances()) 1058 1059 elif level == locking.LEVEL_NODEGROUP: 1060 # Lock node groups for all potential target nodes optimistically, needs 1061 # verification once nodes have been acquired 1062 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1063 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 1064 1065 elif level == locking.LEVEL_NODE: 1066 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
1067
1068 - def CheckPrereq(self):
1069 # Verify locks 1070 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 1071 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 1072 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1073 1074 need_nodes = self._DetermineNodes() 1075 1076 if not owned_nodes.issuperset(need_nodes): 1077 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 1078 " locks were acquired, current nodes are" 1079 " are '%s', used to be '%s'; retry the" 1080 " operation" % 1081 (self.op.node_name, 1082 utils.CommaJoin(need_nodes), 1083 utils.CommaJoin(owned_nodes)), 1084 errors.ECODE_STATE) 1085 1086 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 1087 if owned_groups != wanted_groups: 1088 raise errors.OpExecError("Node groups changed since locks were acquired," 1089 " current groups are '%s', used to be '%s';" 1090 " retry the operation" % 1091 (utils.CommaJoin(wanted_groups), 1092 utils.CommaJoin(owned_groups))) 1093 1094 # Determine affected instances 1095 self.instances = self._DetermineInstances() 1096 self.instance_names = [i.name for i in self.instances] 1097 1098 if set(self.instance_names) != owned_instance_names: 1099 raise errors.OpExecError("Instances on node '%s' changed since locks" 1100 " were acquired, current instances are '%s'," 1101 " used to be '%s'; retry the operation" % 1102 (self.op.node_name, 1103 utils.CommaJoin(self.instance_names), 1104 utils.CommaJoin(owned_instance_names))) 1105 1106 if self.instance_names: 1107 self.LogInfo("Evacuating instances from node '%s': %s", 1108 self.op.node_name, 1109 utils.CommaJoin(utils.NiceSort(self.instance_names))) 1110 else: 1111 self.LogInfo("No instances to evacuate from node '%s'", 1112 self.op.node_name) 1113 1114 if self.op.remote_node is not None: 1115 for i in self.instances: 1116 if i.primary_node == self.op.remote_node_uuid: 1117 raise errors.OpPrereqError("Node %s is the primary node of" 1118 " instance %s, cannot use it as" 1119 " secondary" % 1120 (self.op.remote_node, i.name), 1121 errors.ECODE_INVAL)
1122
1123 - def Exec(self, feedback_fn):
1124 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 1125 1126 if not self.instance_names: 1127 # No instances to evacuate 1128 jobs = [] 1129 1130 elif self.op.iallocator is not None: 1131 # TODO: Implement relocation to other group 1132 req = iallocator.IAReqNodeEvac( 1133 evac_mode=self.op.mode, instances=list(self.instance_names), 1134 ignore_soft_errors=self.op.ignore_soft_errors) 1135 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1136 1137 ial.Run(self.op.iallocator) 1138 1139 if not ial.success: 1140 raise errors.OpPrereqError("Can't compute node evacuation using" 1141 " iallocator '%s': %s" % 1142 (self.op.iallocator, ial.info), 1143 errors.ECODE_NORES) 1144 1145 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1146 1147 elif self.op.remote_node is not None: 1148 assert self.op.mode == constants.NODE_EVAC_SEC 1149 jobs = [ 1150 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1151 remote_node=self.op.remote_node, 1152 disks=[], 1153 mode=constants.REPLACE_DISK_CHG, 1154 early_release=self.op.early_release)] 1155 for instance_name in self.instance_names] 1156 1157 else: 1158 raise errors.ProgrammerError("No iallocator or remote node") 1159 1160 return ResultWithJobs(jobs)
1161 1162
1163 -class LUNodeMigrate(LogicalUnit):
1164 """Migrate all instances from a node. 1165 1166 """ 1167 HPATH = "node-migrate" 1168 HTYPE = constants.HTYPE_NODE 1169 REQ_BGL = False 1170
1171 - def CheckArguments(self):
1172 pass
1173
1174 - def ExpandNames(self):
1175 (self.op.node_uuid, self.op.node_name) = \ 1176 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1177 1178 self.share_locks = ShareAll() 1179 self.needed_locks = { 1180 locking.LEVEL_NODE: [self.op.node_uuid], 1181 }
1182
1183 - def BuildHooksEnv(self):
1184 """Build hooks env. 1185 1186 This runs on the master, the primary and all the secondaries. 1187 1188 """ 1189 return { 1190 "NODE_NAME": self.op.node_name, 1191 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1192 }
1193
1194 - def BuildHooksNodes(self):
1195 """Build hooks nodes. 1196 1197 """ 1198 nl = [self.cfg.GetMasterNode()] 1199 return (nl, nl)
1200
1201 - def CheckPrereq(self):
1202 pass
1203
1204 - def Exec(self, feedback_fn):
1205 # Prepare jobs for migration instances 1206 jobs = [ 1207 [opcodes.OpInstanceMigrate( 1208 instance_name=inst.name, 1209 mode=self.op.mode, 1210 live=self.op.live, 1211 iallocator=self.op.iallocator, 1212 target_node=self.op.target_node, 1213 allow_runtime_changes=self.op.allow_runtime_changes, 1214 ignore_ipolicy=self.op.ignore_ipolicy)] 1215 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1216 1217 # TODO: Run iallocator in this opcode and pass correct placement options to 1218 # OpInstanceMigrate. Since other jobs can modify the cluster between 1219 # running the iallocator and the actual migration, a good consistency model 1220 # will have to be found. 1221 1222 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1223 frozenset([self.op.node_uuid])) 1224 1225 return ResultWithJobs(jobs)
1226 1227
1228 -def _GetStorageTypeArgs(cfg, storage_type):
1229 """Returns the arguments for a storage type. 1230 1231 """ 1232 # Special case for file storage 1233 1234 if storage_type == constants.ST_FILE: 1235 return [[cfg.GetFileStorageDir()]] 1236 elif storage_type == constants.ST_SHARED_FILE: 1237 return [[cfg.GetSharedFileStorageDir()]] 1238 elif storage_type == constants.ST_GLUSTER: 1239 return [[cfg.GetGlusterStorageDir()]] 1240 else: 1241 return []
1242 1243
1244 -class LUNodeModifyStorage(NoHooksLU):
1245 """Logical unit for modifying a storage volume on a node. 1246 1247 """ 1248 REQ_BGL = False 1249
1250 - def CheckArguments(self):
1251 (self.op.node_uuid, self.op.node_name) = \ 1252 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1253 1254 storage_type = self.op.storage_type 1255 1256 try: 1257 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1258 except KeyError: 1259 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1260 " modified" % storage_type, 1261 errors.ECODE_INVAL) 1262 1263 diff = set(self.op.changes.keys()) - modifiable 1264 if diff: 1265 raise errors.OpPrereqError("The following fields can not be modified for" 1266 " storage units of type '%s': %r" % 1267 (storage_type, list(diff)), 1268 errors.ECODE_INVAL)
1269
1270 - def CheckPrereq(self):
1271 """Check prerequisites. 1272 1273 """ 1274 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1275
1276 - def ExpandNames(self):
1277 self.needed_locks = { 1278 locking.LEVEL_NODE: self.op.node_uuid, 1279 }
1280
1281 - def Exec(self, feedback_fn):
1282 """Computes the list of nodes and their attributes. 1283 1284 """ 1285 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1286 result = self.rpc.call_storage_modify(self.op.node_uuid, 1287 self.op.storage_type, st_args, 1288 self.op.name, self.op.changes) 1289 result.Raise("Failed to modify storage unit '%s' on %s" % 1290 (self.op.name, self.op.node_name))
1291 1292
1293 -def _CheckOutputFields(fields, selected):
1294 """Checks whether all selected fields are valid according to fields. 1295 1296 @type fields: L{utils.FieldSet} 1297 @param fields: fields set 1298 @type selected: L{utils.FieldSet} 1299 @param selected: fields set 1300 1301 """ 1302 delta = fields.NonMatching(selected) 1303 if delta: 1304 raise errors.OpPrereqError("Unknown output fields selected: %s" 1305 % ",".join(delta), errors.ECODE_INVAL)
1306 1307
1308 -class LUNodeQueryvols(NoHooksLU):
1309 """Logical unit for getting volumes on node(s). 1310 1311 """ 1312 REQ_BGL = False 1313
1314 - def CheckArguments(self):
1319
1320 - def ExpandNames(self):
1321 self.share_locks = ShareAll() 1322 1323 if self.op.nodes: 1324 self.needed_locks = { 1325 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1326 } 1327 else: 1328 self.needed_locks = { 1329 locking.LEVEL_NODE: locking.ALL_SET, 1330 }
1331
1332 - def Exec(self, feedback_fn):
1333 """Computes the list of nodes and their attributes. 1334 1335 """ 1336 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1337 volumes = self.rpc.call_node_volumes(node_uuids) 1338 1339 ilist = self.cfg.GetAllInstancesInfo() 1340 vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values()) 1341 1342 output = [] 1343 for node_uuid in node_uuids: 1344 nresult = volumes[node_uuid] 1345 if nresult.offline: 1346 continue 1347 msg = nresult.fail_msg 1348 if msg: 1349 self.LogWarning("Can't compute volume data on node %s: %s", 1350 self.cfg.GetNodeName(node_uuid), msg) 1351 continue 1352 1353 node_vols = sorted(nresult.payload, 1354 key=operator.itemgetter(constants.VF_DEV)) 1355 1356 for vol in node_vols: 1357 node_output = [] 1358 for field in self.op.output_fields: 1359 if field == constants.VF_NODE: 1360 val = self.cfg.GetNodeName(node_uuid) 1361 elif field == constants.VF_PHYS: 1362 val = vol[constants.VF_DEV] 1363 elif field == constants.VF_VG: 1364 val = vol[constants.VF_VG] 1365 elif field == constants.VF_NAME: 1366 val = vol[constants.VF_NAME] 1367 elif field == constants.VF_SIZE: 1368 val = int(float(vol[constants.VF_SIZE])) 1369 elif field == constants.VF_INSTANCE: 1370 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + 1371 vol[constants.VF_NAME]), None) 1372 if inst is not None: 1373 val = inst.name 1374 else: 1375 val = "-" 1376 else: 1377 raise errors.ParameterError(field) 1378 node_output.append(str(val)) 1379 1380 output.append(node_output) 1381 1382 return output
1383 1384
1385 -class LUNodeQueryStorage(NoHooksLU):
1386 """Logical unit for getting information on storage units on node(s). 1387 1388 """ 1389 REQ_BGL = False 1390
1391 - def CheckArguments(self):
1392 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1393 self.op.output_fields)
1394
1395 - def ExpandNames(self):
1396 self.share_locks = ShareAll() 1397 1398 if self.op.nodes: 1399 self.needed_locks = { 1400 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1401 } 1402 else: 1403 self.needed_locks = { 1404 locking.LEVEL_NODE: locking.ALL_SET, 1405 }
1406
1407 - def _DetermineStorageType(self):
1408 """Determines the default storage type of the cluster. 1409 1410 """ 1411 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates 1412 default_storage_type = \ 1413 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] 1414 return default_storage_type
1415
1416 - def CheckPrereq(self):
1417 """Check prerequisites. 1418 1419 """ 1420 if self.op.storage_type: 1421 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1422 self.storage_type = self.op.storage_type 1423 else: 1424 self.storage_type = self._DetermineStorageType() 1425 supported_storage_types = constants.STS_REPORT_NODE_STORAGE 1426 if self.storage_type not in supported_storage_types: 1427 raise errors.OpPrereqError( 1428 "Storage reporting for storage type '%s' is not supported. Please" 1429 " use the --storage-type option to specify one of the supported" 1430 " storage types (%s) or set the default disk template to one that" 1431 " supports storage reporting." % 1432 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1433
1434 - def Exec(self, feedback_fn):
1435 """Computes the list of nodes and their attributes. 1436 1437 """ 1438 if self.op.storage_type: 1439 self.storage_type = self.op.storage_type 1440 else: 1441 self.storage_type = self._DetermineStorageType() 1442 1443 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1444 1445 # Always get name to sort by 1446 if constants.SF_NAME in self.op.output_fields: 1447 fields = self.op.output_fields[:] 1448 else: 1449 fields = [constants.SF_NAME] + self.op.output_fields 1450 1451 # Never ask for node or type as it's only known to the LU 1452 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1453 while extra in fields: 1454 fields.remove(extra) 1455 1456 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1457 name_idx = field_idx[constants.SF_NAME] 1458 1459 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) 1460 data = self.rpc.call_storage_list(self.node_uuids, 1461 self.storage_type, st_args, 1462 self.op.name, fields) 1463 1464 result = [] 1465 1466 for node_uuid in utils.NiceSort(self.node_uuids): 1467 node_name = self.cfg.GetNodeName(node_uuid) 1468 nresult = data[node_uuid] 1469 if nresult.offline: 1470 continue 1471 1472 msg = nresult.fail_msg 1473 if msg: 1474 self.LogWarning("Can't get storage data from node %s: %s", 1475 node_name, msg) 1476 continue 1477 1478 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1479 1480 for name in utils.NiceSort(rows.keys()): 1481 row = rows[name] 1482 1483 out = [] 1484 1485 for field in self.op.output_fields: 1486 if field == constants.SF_NODE: 1487 val = node_name 1488 elif field == constants.SF_TYPE: 1489 val = self.storage_type 1490 elif field in field_idx: 1491 val = row[field_idx[field]] 1492 else: 1493 raise errors.ParameterError(field) 1494 1495 out.append(val) 1496 1497 result.append(out) 1498 1499 return result
1500 1501
1502 -class LUNodeRemove(LogicalUnit):
1503 """Logical unit for removing a node. 1504 1505 """ 1506 HPATH = "node-remove" 1507 HTYPE = constants.HTYPE_NODE 1508
1509 - def BuildHooksEnv(self):
1510 """Build hooks env. 1511 1512 """ 1513 return { 1514 "OP_TARGET": self.op.node_name, 1515 "NODE_NAME": self.op.node_name, 1516 }
1517
1518 - def BuildHooksNodes(self):
1519 """Build hooks nodes. 1520 1521 This doesn't run on the target node in the pre phase as a failed 1522 node would then be impossible to remove. 1523 1524 """ 1525 all_nodes = self.cfg.GetNodeList() 1526 try: 1527 all_nodes.remove(self.op.node_uuid) 1528 except ValueError: 1529 pass 1530 return (all_nodes, all_nodes)
1531
1532 - def CheckPrereq(self):
1533 """Check prerequisites. 1534 1535 This checks: 1536 - the node exists in the configuration 1537 - it does not have primary or secondary instances 1538 - it's not the master 1539 1540 Any errors are signaled by raising errors.OpPrereqError. 1541 1542 """ 1543 (self.op.node_uuid, self.op.node_name) = \ 1544 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1545 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1546 assert node is not None 1547 1548 masternode = self.cfg.GetMasterNode() 1549 if node.uuid == masternode: 1550 raise errors.OpPrereqError("Node is the master node, failover to another" 1551 " node is required", errors.ECODE_INVAL) 1552 1553 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1554 if node.uuid in self.cfg.GetInstanceNodes(instance.uuid): 1555 raise errors.OpPrereqError("Instance %s is still running on the node," 1556 " please remove first" % instance.name, 1557 errors.ECODE_INVAL) 1558 self.op.node_name = node.name 1559 self.node = node
1560
1561 - def Exec(self, feedback_fn):
1562 """Removes the node from the cluster. 1563 1564 """ 1565 logging.info("Stopping the node daemon and removing configs from node %s", 1566 self.node.name) 1567 1568 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1569 1570 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1571 "Not owning BGL" 1572 1573 if modify_ssh_setup: 1574 # retrieve the list of potential master candidates before the node is 1575 # removed 1576 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 1577 potential_master_candidate = \ 1578 self.op.node_name in potential_master_candidates 1579 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 1580 master_node = self.cfg.GetMasterNode() 1581 result = self.rpc.call_node_ssh_key_remove( 1582 [master_node], 1583 self.node.uuid, self.op.node_name, 1584 master_candidate_uuids, potential_master_candidates, 1585 self.node.master_candidate, # from_authorized_keys 1586 potential_master_candidate, # from_public_keys 1587 True, # clear node's 'authorized_keys' 1588 True, # clear node's 'ganeti_public_keys' 1589 False) # no readd 1590 result[master_node].Raise( 1591 "Could not remove the SSH key of node '%s' (UUID: %s)." % 1592 (self.op.node_name, self.node.uuid)) 1593 WarnAboutFailedSshUpdates(result, master_node, feedback_fn) 1594 1595 # Promote nodes to master candidate as needed 1596 AdjustCandidatePool(self, [self.node.uuid]) 1597 self.context.RemoveNode(self.cfg, self.node) 1598 1599 # Run post hooks on the node before it's removed 1600 RunPostHook(self, self.node.name) 1601 1602 # we have to call this by name rather than by UUID, as the node is no longer 1603 # in the config 1604 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1605 msg = result.fail_msg 1606 if msg: 1607 self.LogWarning("Errors encountered on the remote node while leaving" 1608 " the cluster: %s", msg) 1609 1610 cluster = self.cfg.GetClusterInfo() 1611 1612 # Remove node from candidate certificate list 1613 if self.node.master_candidate: 1614 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid) 1615 1616 # Remove node from our /etc/hosts 1617 if cluster.modify_etc_hosts: 1618 master_node_uuid = self.cfg.GetMasterNode() 1619 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1620 constants.ETC_HOSTS_REMOVE, 1621 self.node.name, None) 1622 result.Raise("Can't update hosts file with new host data") 1623 RedistributeAncillaryFiles(self)
1624 1625
1626 -class LURepairNodeStorage(NoHooksLU):
1627 """Repairs the volume group on a node. 1628 1629 """ 1630 REQ_BGL = False 1631
1632 - def CheckArguments(self):
1633 (self.op.node_uuid, self.op.node_name) = \ 1634 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1635 1636 storage_type = self.op.storage_type 1637 1638 if (constants.SO_FIX_CONSISTENCY not in 1639 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1640 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1641 " repaired" % storage_type, 1642 errors.ECODE_INVAL)
1643
1644 - def ExpandNames(self):
1645 self.needed_locks = { 1646 locking.LEVEL_NODE: [self.op.node_uuid], 1647 }
1648
1649 - def _CheckFaultyDisks(self, instance, node_uuid):
1650 """Ensure faulty disks abort the opcode or at least warn.""" 1651 try: 1652 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1653 node_uuid, True): 1654 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1655 " node '%s'" % 1656 (instance.name, 1657 self.cfg.GetNodeName(node_uuid)), 1658 errors.ECODE_STATE) 1659 except errors.OpPrereqError, err: 1660 if self.op.ignore_consistency: 1661 self.LogWarning(str(err.args[0])) 1662 else: 1663 raise
1664
1665 - def CheckPrereq(self):
1666 """Check prerequisites. 1667 1668 """ 1669 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1670 1671 # Check whether any instance on this node has faulty disks 1672 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1673 if not inst.disks_active: 1674 continue 1675 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid)) 1676 check_nodes.discard(self.op.node_uuid) 1677 for inst_node_uuid in check_nodes: 1678 self._CheckFaultyDisks(inst, inst_node_uuid)
1679
1680 - def Exec(self, feedback_fn):
1681 feedback_fn("Repairing storage unit '%s' on %s ..." % 1682 (self.op.name, self.op.node_name)) 1683 1684 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1685 result = self.rpc.call_storage_execute(self.op.node_uuid, 1686 self.op.storage_type, st_args, 1687 self.op.name, 1688 constants.SO_FIX_CONSISTENCY) 1689 result.Raise("Failed to repair storage unit '%s' on %s" % 1690 (self.op.name, self.op.node_name))
1691