Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with nodes.""" 
  32   
  33  import logging 
  34  import operator 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import netutils 
  40  from ganeti import objects 
  41  from ganeti import opcodes 
  42  import ganeti.rpc.node as rpc 
  43  from ganeti import utils 
  44  from ganeti.masterd import iallocator 
  45   
  46  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
  47  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  48    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  49    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  50    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  51    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  52    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  53    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  54    FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \ 
  55    AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \ 
  56    EnsureKvmdOnNodes, WarnAboutFailedSshUpdates 
  57   
  58   
59 -def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate. 61 62 """ 63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 65 # the new node will increase mc_max with one, so: 66 mc_should = min(mc_should + 1, cp_size) 67 return mc_now < mc_should
68 69
70 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip. 72 73 @type lu: L{LogicalUnit} 74 @param lu: the LU on behalf of which we make the check 75 @type node: L{objects.Node} 76 @param node: the node to check 77 @type secondary_ip: string 78 @param secondary_ip: the ip to check 79 @type prereq: boolean 80 @param prereq: whether to throw a prerequisite or an execute error 81 @raise errors.OpPrereqError: if the node doesn't have the ip, 82 and prereq=True 83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 84 85 """ 86 # this can be called with a new node, which has no UUID yet, so perform the 87 # RPC call using its name 88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 89 result.Raise("Failure checking secondary ip on node %s" % node.name, 90 prereq=prereq, ecode=errors.ECODE_ENVIRON) 91 if not result.payload: 92 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 93 " please fix and re-run this command" % secondary_ip) 94 if prereq: 95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 96 else: 97 raise errors.OpExecError(msg)
98 99
100 -class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster. 102 103 """ 104 HPATH = "node-add" 105 HTYPE = constants.HTYPE_NODE 106 _NFLAGS = ["master_capable", "vm_capable"] 107
108 - def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 110 # validate/normalize the node name 111 self.hostname = netutils.GetHostname(name=self.op.node_name, 112 family=self.primary_ip_family) 113 self.op.node_name = self.hostname.name 114 115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 116 raise errors.OpPrereqError("Cannot readd the master node", 117 errors.ECODE_STATE) 118 119 if self.op.readd and self.op.group: 120 raise errors.OpPrereqError("Cannot pass a node group when a node is" 121 " being readded", errors.ECODE_INVAL)
122
123 - def BuildHooksEnv(self):
124 """Build hooks env. 125 126 This will run on all nodes before, and on all nodes + the new node after. 127 128 """ 129 return { 130 "OP_TARGET": self.op.node_name, 131 "NODE_NAME": self.op.node_name, 132 "NODE_PIP": self.op.primary_ip, 133 "NODE_SIP": self.op.secondary_ip, 134 "MASTER_CAPABLE": str(self.op.master_capable), 135 "VM_CAPABLE": str(self.op.vm_capable), 136 }
137
138 - def BuildHooksNodes(self):
139 """Build hooks nodes. 140 141 """ 142 hook_nodes = self.cfg.GetNodeList() 143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 144 if new_node_info is not None: 145 # Exclude added node 146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 147 148 # add the new node as post hook node by name; it does not have an UUID yet 149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 - def CheckPrereq(self):
155 """Check prerequisites. 156 157 This checks: 158 - the new node is not already in the config 159 - it is resolvable 160 - its parameters (single/dual homed) matches the cluster 161 162 Any errors are signaled by raising errors.OpPrereqError. 163 164 """ 165 node_name = self.hostname.name 166 self.op.primary_ip = self.hostname.ip 167 if self.op.secondary_ip is None: 168 if self.primary_ip_family == netutils.IP6Address.family: 169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 170 " IPv4 address must be given as secondary", 171 errors.ECODE_INVAL) 172 self.op.secondary_ip = self.op.primary_ip 173 174 secondary_ip = self.op.secondary_ip 175 if not netutils.IP4Address.IsValid(secondary_ip): 176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 177 " address" % secondary_ip, errors.ECODE_INVAL) 178 179 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 180 if not self.op.readd and existing_node_info is not None: 181 raise errors.OpPrereqError("Node %s is already in the configuration" % 182 node_name, errors.ECODE_EXISTS) 183 elif self.op.readd and existing_node_info is None: 184 raise errors.OpPrereqError("Node %s is not in the configuration" % 185 node_name, errors.ECODE_NOENT) 186 187 self.changed_primary_ip = False 188 189 for existing_node in self.cfg.GetAllNodesInfo().values(): 190 if self.op.readd and node_name == existing_node.name: 191 if existing_node.secondary_ip != secondary_ip: 192 raise errors.OpPrereqError("Readded node doesn't have the same IP" 193 " address configuration as before", 194 errors.ECODE_INVAL) 195 if existing_node.primary_ip != self.op.primary_ip: 196 self.changed_primary_ip = True 197 198 continue 199 200 if (existing_node.primary_ip == self.op.primary_ip or 201 existing_node.secondary_ip == self.op.primary_ip or 202 existing_node.primary_ip == secondary_ip or 203 existing_node.secondary_ip == secondary_ip): 204 raise errors.OpPrereqError("New node ip address(es) conflict with" 205 " existing node %s" % existing_node.name, 206 errors.ECODE_NOTUNIQUE) 207 208 # After this 'if' block, None is no longer a valid value for the 209 # _capable op attributes 210 if self.op.readd: 211 assert existing_node_info is not None, \ 212 "Can't retrieve locked node %s" % node_name 213 for attr in self._NFLAGS: 214 if getattr(self.op, attr) is None: 215 setattr(self.op, attr, getattr(existing_node_info, attr)) 216 else: 217 for attr in self._NFLAGS: 218 if getattr(self.op, attr) is None: 219 setattr(self.op, attr, True) 220 221 if self.op.readd and not self.op.vm_capable: 222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 223 if pri or sec: 224 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 225 " flag set to false, but it already holds" 226 " instances" % node_name, 227 errors.ECODE_STATE) 228 229 # check that the type of the node (single versus dual homed) is the 230 # same as for the master 231 myself = self.cfg.GetMasterNodeInfo() 232 master_singlehomed = myself.secondary_ip == myself.primary_ip 233 newbie_singlehomed = secondary_ip == self.op.primary_ip 234 if master_singlehomed != newbie_singlehomed: 235 if master_singlehomed: 236 raise errors.OpPrereqError("The master has no secondary ip but the" 237 " new node has one", 238 errors.ECODE_INVAL) 239 else: 240 raise errors.OpPrereqError("The master has a secondary ip but the" 241 " new node doesn't have one", 242 errors.ECODE_INVAL) 243 244 # checks reachability 245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 246 raise errors.OpPrereqError("Node not reachable by ping", 247 errors.ECODE_ENVIRON) 248 249 if not newbie_singlehomed: 250 # check reachability from my secondary ip to newbie's secondary ip 251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 252 source=myself.secondary_ip): 253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 254 " based ping to node daemon port", 255 errors.ECODE_ENVIRON) 256 257 if self.op.readd: 258 exceptions = [existing_node_info.uuid] 259 else: 260 exceptions = [] 261 262 if self.op.master_capable: 263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 264 else: 265 self.master_candidate = False 266 267 self.node_group = None 268 if self.op.readd: 269 self.new_node = existing_node_info 270 self.node_group = existing_node_info.group 271 else: 272 self.node_group = self.cfg.LookupNodeGroup(self.op.group) 273 self.new_node = objects.Node(name=node_name, 274 primary_ip=self.op.primary_ip, 275 secondary_ip=secondary_ip, 276 master_candidate=self.master_candidate, 277 offline=False, drained=False, 278 group=self.node_group, ndparams={}) 279 280 if self.op.ndparams: 281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 283 "node", "cluster or group") 284 285 if self.op.hv_state: 286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 287 288 if self.op.disk_state: 289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 290 291 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 292 # it a property on the base class. 293 rpcrunner = rpc.DnsOnlyRunner() 294 result = rpcrunner.call_version([node_name])[node_name] 295 result.Raise("Can't get version information from node %s" % node_name, 296 prereq=True, ecode=errors.ECODE_ENVIRON) 297 if constants.PROTOCOL_VERSION == result.payload: 298 logging.info("Communication to node %s fine, sw version %s match", 299 node_name, result.payload) 300 else: 301 raise errors.OpPrereqError("Version mismatch master version %s," 302 " node version %s" % 303 (constants.PROTOCOL_VERSION, result.payload), 304 errors.ECODE_ENVIRON) 305 306 vg_name = self.cfg.GetVGName() 307 if vg_name is not None: 308 vparams = {constants.NV_PVLIST: [vg_name]} 309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 310 cname = self.cfg.GetClusterName() 311 result = rpcrunner.call_node_verify_light( 312 [node_name], vparams, cname, 313 self.cfg.GetClusterInfo().hvparams, 314 {node_name: self.node_group}, 315 self.cfg.GetAllNodeGroupsInfoDict() 316 )[node_name] 317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 318 if errmsgs: 319 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
322 - def _InitOpenVSwitch(self):
323 filled_ndparams = self.cfg.GetClusterInfo().FillND( 324 self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) 325 326 ovs = filled_ndparams.get(constants.ND_OVS, None) 327 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) 328 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) 329 330 if ovs: 331 if not ovs_link: 332 self.LogInfo("No physical interface for OpenvSwitch was given." 333 " OpenvSwitch will not have an outside connection. This" 334 " might not be what you want.") 335 336 result = self.rpc.call_node_configure_ovs( 337 self.new_node.name, ovs_name, ovs_link) 338 result.Raise("Failed to initialize OpenVSwitch on new node")
339
340 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate, 341 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
342 """Update the SSH setup of all nodes after adding a new node. 343 344 @type readd: boolean 345 @param readd: whether or not this node is readded 346 347 """ 348 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 349 master_node = self.cfg.GetMasterNode() 350 351 if readd: 352 # clear previous keys 353 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 354 remove_result = rpcrunner.call_node_ssh_key_remove( 355 [master_node], 356 new_node_uuid, new_node_name, 357 master_candidate_uuids, 358 potential_master_candidates, 359 True, # from authorized keys 360 True, # from public keys 361 False, # clear authorized keys 362 True, # clear public keys 363 True) # it's a readd 364 remove_result[master_node].Raise( 365 "Could not remove SSH keys of node %s before readding," 366 " (UUID: %s)." % (new_node_name, new_node_uuid)) 367 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn) 368 369 result = rpcrunner.call_node_ssh_key_add( 370 [master_node], new_node_uuid, new_node_name, 371 potential_master_candidates, 372 is_master_candidate, is_potential_master_candidate, 373 is_potential_master_candidate) 374 375 result[master_node].Raise("Could not update the node's SSH setup.") 376 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
377
378 - def Exec(self, feedback_fn):
379 """Adds the new node to the cluster. 380 381 """ 382 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 383 "Not owning BGL" 384 385 # We adding a new node so we assume it's powered 386 self.new_node.powered = True 387 388 # for re-adds, reset the offline/drained/master-candidate flags; 389 # we need to reset here, otherwise offline would prevent RPC calls 390 # later in the procedure; this also means that if the re-add 391 # fails, we are left with a non-offlined, broken node 392 if self.op.readd: 393 self.new_node.offline = False 394 self.new_node.drained = False 395 self.LogInfo("Readding a node, the offline/drained flags were reset") 396 # if we demote the node, we do cleanup later in the procedure 397 self.new_node.master_candidate = self.master_candidate 398 if self.changed_primary_ip: 399 self.new_node.primary_ip = self.op.primary_ip 400 401 # copy the master/vm_capable flags 402 for attr in self._NFLAGS: 403 setattr(self.new_node, attr, getattr(self.op, attr)) 404 405 # notify the user about any possible mc promotion 406 if self.new_node.master_candidate: 407 self.LogInfo("Node will be a master candidate") 408 409 if self.op.ndparams: 410 self.new_node.ndparams = self.op.ndparams 411 else: 412 self.new_node.ndparams = {} 413 414 if self.op.hv_state: 415 self.new_node.hv_state_static = self.new_hv_state 416 417 if self.op.disk_state: 418 self.new_node.disk_state_static = self.new_disk_state 419 420 # Add node to our /etc/hosts, and add key to known_hosts 421 if self.cfg.GetClusterInfo().modify_etc_hosts: 422 master_node = self.cfg.GetMasterNode() 423 result = self.rpc.call_etc_hosts_modify( 424 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 425 self.hostname.ip) 426 result.Raise("Can't update hosts file with new host data") 427 428 if self.new_node.secondary_ip != self.new_node.primary_ip: 429 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 430 False) 431 432 node_verifier_uuids = [self.cfg.GetMasterNode()] 433 node_verify_param = { 434 constants.NV_NODELIST: ([self.new_node.name], {}, []), 435 # TODO: do a node-net-test as well? 436 } 437 438 result = self.rpc.call_node_verify( 439 node_verifier_uuids, node_verify_param, 440 self.cfg.GetClusterName(), 441 self.cfg.GetClusterInfo().hvparams, 442 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)}, 443 self.cfg.GetAllNodeGroupsInfoDict() 444 ) 445 for verifier in node_verifier_uuids: 446 result[verifier].Raise("Cannot communicate with node %s" % verifier) 447 nl_payload = result[verifier].payload[constants.NV_NODELIST] 448 if nl_payload: 449 for failed in nl_payload: 450 feedback_fn("ssh/hostname verification failed" 451 " (checking from %s): %s" % 452 (verifier, nl_payload[failed])) 453 raise errors.OpExecError("ssh/hostname verification failed") 454 455 self._InitOpenVSwitch() 456 457 if self.op.readd: 458 self.context.ReaddNode(self.new_node) 459 RedistributeAncillaryFiles(self) 460 # make sure we redistribute the config 461 self.cfg.Update(self.new_node, feedback_fn) 462 # and make sure the new node will not have old files around 463 if not self.new_node.master_candidate: 464 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 465 result.Warn("Node failed to demote itself from master candidate status", 466 self.LogWarning) 467 else: 468 self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId()) 469 RedistributeAncillaryFiles(self) 470 471 # We create a new certificate even if the node is readded 472 digest = GetClientCertDigest(self, self.new_node.uuid) 473 if self.new_node.master_candidate: 474 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest) 475 else: 476 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None) 477 478 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid]) 479 480 # Update SSH setup of all nodes 481 if self.op.node_setup: 482 # FIXME: so far, all nodes are considered potential master candidates 483 self._SshUpdate(self.new_node.uuid, self.new_node.name, 484 self.new_node.master_candidate, True, 485 self.rpc, self.op.readd, feedback_fn)
486 487
488 -class LUNodeSetParams(LogicalUnit):
489 """Modifies the parameters of a node. 490 491 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 492 to the node role (as _ROLE_*) 493 @cvar _R2F: a dictionary from node role to tuples of flags 494 @cvar _FLAGS: a list of attribute names corresponding to the flags 495 496 """ 497 HPATH = "node-modify" 498 HTYPE = constants.HTYPE_NODE 499 REQ_BGL = False 500 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 501 _F2R = { 502 (True, False, False): _ROLE_CANDIDATE, 503 (False, True, False): _ROLE_DRAINED, 504 (False, False, True): _ROLE_OFFLINE, 505 (False, False, False): _ROLE_REGULAR, 506 } 507 _R2F = dict((v, k) for k, v in _F2R.items()) 508 _FLAGS = ["master_candidate", "drained", "offline"] 509
510 - def CheckArguments(self):
511 (self.op.node_uuid, self.op.node_name) = \ 512 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 513 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 514 self.op.master_capable, self.op.vm_capable, 515 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 516 self.op.disk_state] 517 if all_mods.count(None) == len(all_mods): 518 raise errors.OpPrereqError("Please pass at least one modification", 519 errors.ECODE_INVAL) 520 if all_mods.count(True) > 1: 521 raise errors.OpPrereqError("Can't set the node into more than one" 522 " state at the same time", 523 errors.ECODE_INVAL) 524 525 # Boolean value that tells us whether we might be demoting from MC 526 self.might_demote = (self.op.master_candidate is False or 527 self.op.offline is True or 528 self.op.drained is True or 529 self.op.master_capable is False) 530 531 if self.op.secondary_ip: 532 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 533 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 534 " address" % self.op.secondary_ip, 535 errors.ECODE_INVAL) 536 537 self.lock_all = self.op.auto_promote and self.might_demote 538 self.lock_instances = self.op.secondary_ip is not None
539
540 - def _InstanceFilter(self, instance):
541 """Filter for getting affected instances. 542 543 """ 544 return (instance.disk_template in constants.DTS_INT_MIRROR and 545 self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
546
547 - def ExpandNames(self):
548 if self.lock_all: 549 self.needed_locks = { 550 locking.LEVEL_NODE: locking.ALL_SET, 551 552 # Block allocations when all nodes are locked 553 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 554 } 555 else: 556 self.needed_locks = { 557 locking.LEVEL_NODE: self.op.node_uuid, 558 } 559 560 # Since modifying a node can have severe effects on currently running 561 # operations the resource lock is at least acquired in shared mode 562 self.needed_locks[locking.LEVEL_NODE_RES] = \ 563 self.needed_locks[locking.LEVEL_NODE] 564 565 # Get all locks except nodes in shared mode; they are not used for anything 566 # but read-only access 567 self.share_locks = ShareAll() 568 self.share_locks[locking.LEVEL_NODE] = 0 569 self.share_locks[locking.LEVEL_NODE_RES] = 0 570 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 571 572 if self.lock_instances: 573 self.needed_locks[locking.LEVEL_INSTANCE] = \ 574 self.cfg.GetInstanceNames( 575 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
576
577 - def BuildHooksEnv(self):
578 """Build hooks env. 579 580 This runs on the master node. 581 582 """ 583 return { 584 "OP_TARGET": self.op.node_name, 585 "MASTER_CANDIDATE": str(self.op.master_candidate), 586 "OFFLINE": str(self.op.offline), 587 "DRAINED": str(self.op.drained), 588 "MASTER_CAPABLE": str(self.op.master_capable), 589 "VM_CAPABLE": str(self.op.vm_capable), 590 }
591
592 - def BuildHooksNodes(self):
593 """Build hooks nodes. 594 595 """ 596 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 597 return (nl, nl)
598
599 - def CheckPrereq(self):
600 """Check prerequisites. 601 602 This only checks the instance list against the existing names. 603 604 """ 605 node = self.cfg.GetNodeInfo(self.op.node_uuid) 606 if self.lock_instances: 607 affected_instances = \ 608 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 609 610 # Verify instance locks 611 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 612 wanted_instance_names = frozenset([inst.name for inst in 613 affected_instances.values()]) 614 if wanted_instance_names - owned_instance_names: 615 raise errors.OpPrereqError("Instances affected by changing node %s's" 616 " secondary IP address have changed since" 617 " locks were acquired, wanted '%s', have" 618 " '%s'; retry the operation" % 619 (node.name, 620 utils.CommaJoin(wanted_instance_names), 621 utils.CommaJoin(owned_instance_names)), 622 errors.ECODE_STATE) 623 else: 624 affected_instances = None 625 626 if (self.op.master_candidate is not None or 627 self.op.drained is not None or 628 self.op.offline is not None): 629 # we can't change the master's node flags 630 if node.uuid == self.cfg.GetMasterNode(): 631 raise errors.OpPrereqError("The master role can be changed" 632 " only via master-failover", 633 errors.ECODE_INVAL) 634 635 if self.op.master_candidate and not node.master_capable: 636 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 637 " it a master candidate" % node.name, 638 errors.ECODE_STATE) 639 640 if self.op.vm_capable is False: 641 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 642 if ipri or isec: 643 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 644 " the vm_capable flag" % node.name, 645 errors.ECODE_STATE) 646 647 if node.master_candidate and self.might_demote and not self.lock_all: 648 assert not self.op.auto_promote, "auto_promote set but lock_all not" 649 # check if after removing the current node, we're missing master 650 # candidates 651 (mc_remaining, mc_should, _) = \ 652 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 653 if mc_remaining < mc_should: 654 raise errors.OpPrereqError("Not enough master candidates, please" 655 " pass auto promote option to allow" 656 " promotion (--auto-promote or RAPI" 657 " auto_promote=True)", errors.ECODE_STATE) 658 659 self.old_flags = old_flags = (node.master_candidate, 660 node.drained, node.offline) 661 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 662 self.old_role = old_role = self._F2R[old_flags] 663 664 # Check for ineffective changes 665 for attr in self._FLAGS: 666 if getattr(self.op, attr) is False and getattr(node, attr) is False: 667 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 668 setattr(self.op, attr, None) 669 670 # Past this point, any flag change to False means a transition 671 # away from the respective state, as only real changes are kept 672 673 # TODO: We might query the real power state if it supports OOB 674 if SupportsOob(self.cfg, node): 675 if self.op.offline is False and not (node.powered or 676 self.op.powered is True): 677 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 678 " offline status can be reset") % 679 self.op.node_name, errors.ECODE_STATE) 680 elif self.op.powered is not None: 681 raise errors.OpPrereqError(("Unable to change powered state for node %s" 682 " as it does not support out-of-band" 683 " handling") % self.op.node_name, 684 errors.ECODE_STATE) 685 686 # If we're being deofflined/drained, we'll MC ourself if needed 687 if (self.op.drained is False or self.op.offline is False or 688 (self.op.master_capable and not node.master_capable)): 689 if _DecideSelfPromotion(self): 690 self.op.master_candidate = True 691 self.LogInfo("Auto-promoting node to master candidate") 692 693 # If we're no longer master capable, we'll demote ourselves from MC 694 if self.op.master_capable is False and node.master_candidate: 695 if self.op.node_uuid == self.cfg.GetMasterNode(): 696 raise errors.OpPrereqError("Master must remain master capable", 697 errors.ECODE_STATE) 698 self.LogInfo("Demoting from master candidate") 699 self.op.master_candidate = False 700 701 # Compute new role 702 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 703 if self.op.master_candidate: 704 new_role = self._ROLE_CANDIDATE 705 elif self.op.drained: 706 new_role = self._ROLE_DRAINED 707 elif self.op.offline: 708 new_role = self._ROLE_OFFLINE 709 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 710 # False is still in new flags, which means we're un-setting (the 711 # only) True flag 712 new_role = self._ROLE_REGULAR 713 else: # no new flags, nothing, keep old role 714 new_role = old_role 715 716 self.new_role = new_role 717 718 if old_role == self._ROLE_OFFLINE and new_role != old_role: 719 # Trying to transition out of offline status 720 result = self.rpc.call_version([node.uuid])[node.uuid] 721 if result.fail_msg: 722 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 723 " to report its version: %s" % 724 (node.name, result.fail_msg), 725 errors.ECODE_STATE) 726 else: 727 self.LogWarning("Transitioning node from offline to online state" 728 " without using re-add. Please make sure the node" 729 " is healthy!") 730 731 # When changing the secondary ip, verify if this is a single-homed to 732 # multi-homed transition or vice versa, and apply the relevant 733 # restrictions. 734 if self.op.secondary_ip: 735 # Ok even without locking, because this can't be changed by any LU 736 master = self.cfg.GetMasterNodeInfo() 737 master_singlehomed = master.secondary_ip == master.primary_ip 738 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 739 if self.op.force and node.uuid == master.uuid: 740 self.LogWarning("Transitioning from single-homed to multi-homed" 741 " cluster; all nodes will require a secondary IP" 742 " address") 743 else: 744 raise errors.OpPrereqError("Changing the secondary ip on a" 745 " single-homed cluster requires the" 746 " --force option to be passed, and the" 747 " target node to be the master", 748 errors.ECODE_INVAL) 749 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 750 if self.op.force and node.uuid == master.uuid: 751 self.LogWarning("Transitioning from multi-homed to single-homed" 752 " cluster; secondary IP addresses will have to be" 753 " removed") 754 else: 755 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 756 " same as the primary IP on a multi-homed" 757 " cluster, unless the --force option is" 758 " passed, and the target node is the" 759 " master", errors.ECODE_INVAL) 760 761 assert not (set([inst.name for inst in affected_instances.values()]) - 762 self.owned_locks(locking.LEVEL_INSTANCE)) 763 764 if node.offline: 765 if affected_instances: 766 msg = ("Cannot change secondary IP address: offline node has" 767 " instances (%s) configured to use it" % 768 utils.CommaJoin( 769 [inst.name for inst in affected_instances.values()])) 770 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 771 else: 772 # On online nodes, check that no instances are running, and that 773 # the node has the new ip and we can reach it. 774 for instance in affected_instances.values(): 775 CheckInstanceState(self, instance, INSTANCE_DOWN, 776 msg="cannot change secondary ip") 777 778 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 779 if master.uuid != node.uuid: 780 # check reachability from master secondary ip to new secondary ip 781 if not netutils.TcpPing(self.op.secondary_ip, 782 constants.DEFAULT_NODED_PORT, 783 source=master.secondary_ip): 784 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 785 " based ping to node daemon port", 786 errors.ECODE_ENVIRON) 787 788 if self.op.ndparams: 789 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 790 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 791 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 792 "node", "cluster or group") 793 self.new_ndparams = new_ndparams 794 795 if self.op.hv_state: 796 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 797 node.hv_state_static) 798 799 if self.op.disk_state: 800 self.new_disk_state = \ 801 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
802
803 - def Exec(self, feedback_fn):
804 """Modifies a node. 805 806 """ 807 node = self.cfg.GetNodeInfo(self.op.node_uuid) 808 result = [] 809 810 if self.op.ndparams: 811 node.ndparams = self.new_ndparams 812 813 if self.op.powered is not None: 814 node.powered = self.op.powered 815 816 if self.op.hv_state: 817 node.hv_state_static = self.new_hv_state 818 819 if self.op.disk_state: 820 node.disk_state_static = self.new_disk_state 821 822 for attr in ["master_capable", "vm_capable"]: 823 val = getattr(self.op, attr) 824 if val is not None: 825 setattr(node, attr, val) 826 result.append((attr, str(val))) 827 828 if self.op.secondary_ip: 829 node.secondary_ip = self.op.secondary_ip 830 result.append(("secondary_ip", self.op.secondary_ip)) 831 832 # this will trigger configuration file update, if needed 833 self.cfg.Update(node, feedback_fn) 834 835 if self.new_role != self.old_role: 836 new_flags = self._R2F[self.new_role] 837 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 838 if of != nf: 839 result.append((desc, str(nf))) 840 (node.master_candidate, node.drained, node.offline) = new_flags 841 self.cfg.Update(node, feedback_fn) 842 843 # Tell the node to demote itself, if no longer MC and not offline. 844 # This must be done only after the configuration is updated so that 845 # it's ensured the node won't receive any further configuration updates. 846 if self.old_role == self._ROLE_CANDIDATE and \ 847 self.new_role != self._ROLE_OFFLINE: 848 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 849 if msg: 850 self.LogWarning("Node failed to demote itself: %s", msg) 851 852 # we locked all nodes, we adjust the CP before updating this node 853 if self.lock_all: 854 AdjustCandidatePool(self, [node.uuid]) 855 856 # if node gets promoted, grant RPC priviledges 857 if self.new_role == self._ROLE_CANDIDATE: 858 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid) 859 # if node is demoted, revoke RPC priviledges 860 if self.old_role == self._ROLE_CANDIDATE: 861 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid) 862 863 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid]) 864 865 # this will trigger job queue propagation or cleanup if the mc 866 # flag changed 867 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 868 self.context.ReaddNode(node) 869 870 if self.cfg.GetClusterInfo().modify_ssh_setup: 871 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 872 master_node = self.cfg.GetMasterNode() 873 if self.old_role == self._ROLE_CANDIDATE: 874 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 875 ssh_result = self.rpc.call_node_ssh_key_remove( 876 [master_node], 877 node.uuid, node.name, 878 master_candidate_uuids, potential_master_candidates, 879 True, # remove node's key from all nodes' authorized_keys file 880 False, # currently, all nodes are potential master candidates 881 False, # do not clear node's 'authorized_keys' 882 False, # do not clear node's 'ganeti_pub_keys' 883 False) # no readd 884 ssh_result[master_node].Raise( 885 "Could not adjust the SSH setup after demoting node '%s'" 886 " (UUID: %s)." % (node.name, node.uuid)) 887 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn) 888 889 if self.new_role == self._ROLE_CANDIDATE: 890 ssh_result = self.rpc.call_node_ssh_key_add( 891 [master_node], node.uuid, node.name, 892 potential_master_candidates, 893 True, # add node's key to all node's 'authorized_keys' 894 True, # all nodes are potential master candidates 895 False) # do not update the node's public keys 896 ssh_result[master_node].Raise( 897 "Could not update the SSH setup of node '%s' after promotion" 898 " (UUID: %s)." % (node.name, node.uuid)) 899 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn) 900 901 return result
902 903
904 -class LUNodePowercycle(NoHooksLU):
905 """Powercycles a node. 906 907 """ 908 REQ_BGL = False 909
910 - def CheckArguments(self):
911 (self.op.node_uuid, self.op.node_name) = \ 912 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 913 914 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 915 raise errors.OpPrereqError("The node is the master and the force" 916 " parameter was not set", 917 errors.ECODE_INVAL)
918
919 - def ExpandNames(self):
920 """Locking for PowercycleNode. 921 922 This is a last-resort option and shouldn't block on other 923 jobs. Therefore, we grab no locks. 924 925 """ 926 self.needed_locks = {}
927
928 - def Exec(self, feedback_fn):
929 """Reboots a node. 930 931 """ 932 default_hypervisor = self.cfg.GetHypervisorType() 933 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 934 result = self.rpc.call_node_powercycle(self.op.node_uuid, 935 default_hypervisor, 936 hvparams) 937 result.Raise("Failed to schedule the reboot") 938 return result.payload
939 940
941 -def _GetNodeInstancesInner(cfg, fn):
942 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
943 944
945 -def _GetNodePrimaryInstances(cfg, node_uuid):
946 """Returns primary instances on a node. 947 948 """ 949 return _GetNodeInstancesInner(cfg, 950 lambda inst: node_uuid == inst.primary_node)
951 952
953 -def _GetNodeSecondaryInstances(cfg, node_uuid):
954 """Returns secondary instances on a node. 955 956 """ 957 return _GetNodeInstancesInner(cfg, 958 lambda inst: node_uuid in 959 cfg.GetInstanceSecondaryNodes(inst.uuid))
960 961
962 -def _GetNodeInstances(cfg, node_uuid):
963 """Returns a list of all primary and secondary instances on a node. 964 965 """ 966 967 return _GetNodeInstancesInner(cfg, 968 lambda inst: node_uuid in 969 cfg.GetInstanceNodes(inst.uuid.uuid))
970 971
972 -class LUNodeEvacuate(NoHooksLU):
973 """Evacuates instances off a list of nodes. 974 975 """ 976 REQ_BGL = False 977
978 - def CheckArguments(self):
979 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
980
981 - def ExpandNames(self):
982 (self.op.node_uuid, self.op.node_name) = \ 983 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 984 985 if self.op.remote_node is not None: 986 (self.op.remote_node_uuid, self.op.remote_node) = \ 987 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 988 self.op.remote_node) 989 assert self.op.remote_node 990 991 if self.op.node_uuid == self.op.remote_node_uuid: 992 raise errors.OpPrereqError("Can not use evacuated node as a new" 993 " secondary node", errors.ECODE_INVAL) 994 995 if self.op.mode != constants.NODE_EVAC_SEC: 996 raise errors.OpPrereqError("Without the use of an iallocator only" 997 " secondary instances can be evacuated", 998 errors.ECODE_INVAL) 999 1000 # Declare locks 1001 self.share_locks = ShareAll() 1002 self.needed_locks = { 1003 locking.LEVEL_INSTANCE: [], 1004 locking.LEVEL_NODEGROUP: [], 1005 locking.LEVEL_NODE: [], 1006 } 1007 1008 # Determine nodes (via group) optimistically, needs verification once locks 1009 # have been acquired 1010 self.lock_nodes = self._DetermineNodes()
1011
1012 - def _DetermineNodes(self):
1013 """Gets the list of node UUIDs to operate on. 1014 1015 """ 1016 if self.op.remote_node is None: 1017 # Iallocator will choose any node(s) in the same group 1018 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 1019 else: 1020 group_nodes = frozenset([self.op.remote_node_uuid]) 1021 1022 # Determine nodes to be locked 1023 return set([self.op.node_uuid]) | group_nodes
1024
1025 - def _DetermineInstances(self):
1026 """Builds list of instances to operate on. 1027 1028 """ 1029 assert self.op.mode in constants.NODE_EVAC_MODES 1030 1031 if self.op.mode == constants.NODE_EVAC_PRI: 1032 # Primary instances only 1033 inst_fn = _GetNodePrimaryInstances 1034 assert self.op.remote_node is None, \ 1035 "Evacuating primary instances requires iallocator" 1036 elif self.op.mode == constants.NODE_EVAC_SEC: 1037 # Secondary instances only 1038 inst_fn = _GetNodeSecondaryInstances 1039 else: 1040 # All instances 1041 assert self.op.mode == constants.NODE_EVAC_ALL 1042 inst_fn = _GetNodeInstances 1043 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 1044 # per instance 1045 raise errors.OpPrereqError("Due to an issue with the iallocator" 1046 " interface it is not possible to evacuate" 1047 " all instances at once; specify explicitly" 1048 " whether to evacuate primary or secondary" 1049 " instances", 1050 errors.ECODE_INVAL) 1051 1052 return inst_fn(self.cfg, self.op.node_uuid)
1053
1054 - def DeclareLocks(self, level):
1055 if level == locking.LEVEL_INSTANCE: 1056 # Lock instances optimistically, needs verification once node and group 1057 # locks have been acquired 1058 self.needed_locks[locking.LEVEL_INSTANCE] = \ 1059 set(i.name for i in self._DetermineInstances()) 1060 1061 elif level == locking.LEVEL_NODEGROUP: 1062 # Lock node groups for all potential target nodes optimistically, needs 1063 # verification once nodes have been acquired 1064 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1065 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 1066 1067 elif level == locking.LEVEL_NODE: 1068 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
1069
1070 - def CheckPrereq(self):
1071 # Verify locks 1072 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 1073 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 1074 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1075 1076 need_nodes = self._DetermineNodes() 1077 1078 if not owned_nodes.issuperset(need_nodes): 1079 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 1080 " locks were acquired, current nodes are" 1081 " are '%s', used to be '%s'; retry the" 1082 " operation" % 1083 (self.op.node_name, 1084 utils.CommaJoin(need_nodes), 1085 utils.CommaJoin(owned_nodes)), 1086 errors.ECODE_STATE) 1087 1088 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 1089 if owned_groups != wanted_groups: 1090 raise errors.OpExecError("Node groups changed since locks were acquired," 1091 " current groups are '%s', used to be '%s';" 1092 " retry the operation" % 1093 (utils.CommaJoin(wanted_groups), 1094 utils.CommaJoin(owned_groups))) 1095 1096 # Determine affected instances 1097 self.instances = self._DetermineInstances() 1098 self.instance_names = [i.name for i in self.instances] 1099 1100 if set(self.instance_names) != owned_instance_names: 1101 raise errors.OpExecError("Instances on node '%s' changed since locks" 1102 " were acquired, current instances are '%s'," 1103 " used to be '%s'; retry the operation" % 1104 (self.op.node_name, 1105 utils.CommaJoin(self.instance_names), 1106 utils.CommaJoin(owned_instance_names))) 1107 1108 if self.instance_names: 1109 self.LogInfo("Evacuating instances from node '%s': %s", 1110 self.op.node_name, 1111 utils.CommaJoin(utils.NiceSort(self.instance_names))) 1112 else: 1113 self.LogInfo("No instances to evacuate from node '%s'", 1114 self.op.node_name) 1115 1116 if self.op.remote_node is not None: 1117 for i in self.instances: 1118 if i.primary_node == self.op.remote_node_uuid: 1119 raise errors.OpPrereqError("Node %s is the primary node of" 1120 " instance %s, cannot use it as" 1121 " secondary" % 1122 (self.op.remote_node, i.name), 1123 errors.ECODE_INVAL)
1124
1125 - def Exec(self, feedback_fn):
1126 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 1127 1128 if not self.instance_names: 1129 # No instances to evacuate 1130 jobs = [] 1131 1132 elif self.op.iallocator is not None: 1133 # TODO: Implement relocation to other group 1134 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode, 1135 instances=list(self.instance_names)) 1136 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1137 1138 ial.Run(self.op.iallocator) 1139 1140 if not ial.success: 1141 raise errors.OpPrereqError("Can't compute node evacuation using" 1142 " iallocator '%s': %s" % 1143 (self.op.iallocator, ial.info), 1144 errors.ECODE_NORES) 1145 1146 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1147 1148 elif self.op.remote_node is not None: 1149 assert self.op.mode == constants.NODE_EVAC_SEC 1150 jobs = [ 1151 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1152 remote_node=self.op.remote_node, 1153 disks=[], 1154 mode=constants.REPLACE_DISK_CHG, 1155 early_release=self.op.early_release)] 1156 for instance_name in self.instance_names] 1157 1158 else: 1159 raise errors.ProgrammerError("No iallocator or remote node") 1160 1161 return ResultWithJobs(jobs)
1162 1163
1164 -class LUNodeMigrate(LogicalUnit):
1165 """Migrate all instances from a node. 1166 1167 """ 1168 HPATH = "node-migrate" 1169 HTYPE = constants.HTYPE_NODE 1170 REQ_BGL = False 1171
1172 - def CheckArguments(self):
1173 pass
1174
1175 - def ExpandNames(self):
1176 (self.op.node_uuid, self.op.node_name) = \ 1177 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1178 1179 self.share_locks = ShareAll() 1180 self.needed_locks = { 1181 locking.LEVEL_NODE: [self.op.node_uuid], 1182 }
1183
1184 - def BuildHooksEnv(self):
1185 """Build hooks env. 1186 1187 This runs on the master, the primary and all the secondaries. 1188 1189 """ 1190 return { 1191 "NODE_NAME": self.op.node_name, 1192 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1193 }
1194
1195 - def BuildHooksNodes(self):
1196 """Build hooks nodes. 1197 1198 """ 1199 nl = [self.cfg.GetMasterNode()] 1200 return (nl, nl)
1201
1202 - def CheckPrereq(self):
1203 pass
1204
1205 - def Exec(self, feedback_fn):
1206 # Prepare jobs for migration instances 1207 jobs = [ 1208 [opcodes.OpInstanceMigrate( 1209 instance_name=inst.name, 1210 mode=self.op.mode, 1211 live=self.op.live, 1212 iallocator=self.op.iallocator, 1213 target_node=self.op.target_node, 1214 allow_runtime_changes=self.op.allow_runtime_changes, 1215 ignore_ipolicy=self.op.ignore_ipolicy)] 1216 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1217 1218 # TODO: Run iallocator in this opcode and pass correct placement options to 1219 # OpInstanceMigrate. Since other jobs can modify the cluster between 1220 # running the iallocator and the actual migration, a good consistency model 1221 # will have to be found. 1222 1223 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1224 frozenset([self.op.node_uuid])) 1225 1226 return ResultWithJobs(jobs)
1227 1228
1229 -def _GetStorageTypeArgs(cfg, storage_type):
1230 """Returns the arguments for a storage type. 1231 1232 """ 1233 # Special case for file storage 1234 1235 if storage_type == constants.ST_FILE: 1236 return [[cfg.GetFileStorageDir()]] 1237 elif storage_type == constants.ST_SHARED_FILE: 1238 return [[cfg.GetSharedFileStorageDir()]] 1239 elif storage_type == constants.ST_GLUSTER: 1240 return [[cfg.GetGlusterStorageDir()]] 1241 else: 1242 return []
1243 1244
1245 -class LUNodeModifyStorage(NoHooksLU):
1246 """Logical unit for modifying a storage volume on a node. 1247 1248 """ 1249 REQ_BGL = False 1250
1251 - def CheckArguments(self):
1252 (self.op.node_uuid, self.op.node_name) = \ 1253 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1254 1255 storage_type = self.op.storage_type 1256 1257 try: 1258 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1259 except KeyError: 1260 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1261 " modified" % storage_type, 1262 errors.ECODE_INVAL) 1263 1264 diff = set(self.op.changes.keys()) - modifiable 1265 if diff: 1266 raise errors.OpPrereqError("The following fields can not be modified for" 1267 " storage units of type '%s': %r" % 1268 (storage_type, list(diff)), 1269 errors.ECODE_INVAL)
1270
1271 - def CheckPrereq(self):
1272 """Check prerequisites. 1273 1274 """ 1275 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1276
1277 - def ExpandNames(self):
1278 self.needed_locks = { 1279 locking.LEVEL_NODE: self.op.node_uuid, 1280 }
1281
1282 - def Exec(self, feedback_fn):
1283 """Computes the list of nodes and their attributes. 1284 1285 """ 1286 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1287 result = self.rpc.call_storage_modify(self.op.node_uuid, 1288 self.op.storage_type, st_args, 1289 self.op.name, self.op.changes) 1290 result.Raise("Failed to modify storage unit '%s' on %s" % 1291 (self.op.name, self.op.node_name))
1292 1293
1294 -def _CheckOutputFields(fields, selected):
1295 """Checks whether all selected fields are valid according to fields. 1296 1297 @type fields: L{utils.FieldSet} 1298 @param fields: fields set 1299 @type selected: L{utils.FieldSet} 1300 @param selected: fields set 1301 1302 """ 1303 delta = fields.NonMatching(selected) 1304 if delta: 1305 raise errors.OpPrereqError("Unknown output fields selected: %s" 1306 % ",".join(delta), errors.ECODE_INVAL)
1307 1308
1309 -class LUNodeQueryvols(NoHooksLU):
1310 """Logical unit for getting volumes on node(s). 1311 1312 """ 1313 REQ_BGL = False 1314
1315 - def CheckArguments(self):
1320
1321 - def ExpandNames(self):
1322 self.share_locks = ShareAll() 1323 1324 if self.op.nodes: 1325 self.needed_locks = { 1326 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1327 } 1328 else: 1329 self.needed_locks = { 1330 locking.LEVEL_NODE: locking.ALL_SET, 1331 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1332 }
1333
1334 - def Exec(self, feedback_fn):
1335 """Computes the list of nodes and their attributes. 1336 1337 """ 1338 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1339 volumes = self.rpc.call_node_volumes(node_uuids) 1340 1341 ilist = self.cfg.GetAllInstancesInfo() 1342 vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values()) 1343 1344 output = [] 1345 for node_uuid in node_uuids: 1346 nresult = volumes[node_uuid] 1347 if nresult.offline: 1348 continue 1349 msg = nresult.fail_msg 1350 if msg: 1351 self.LogWarning("Can't compute volume data on node %s: %s", 1352 self.cfg.GetNodeName(node_uuid), msg) 1353 continue 1354 1355 node_vols = sorted(nresult.payload, 1356 key=operator.itemgetter(constants.VF_DEV)) 1357 1358 for vol in node_vols: 1359 node_output = [] 1360 for field in self.op.output_fields: 1361 if field == constants.VF_NODE: 1362 val = self.cfg.GetNodeName(node_uuid) 1363 elif field == constants.VF_PHYS: 1364 val = vol[constants.VF_DEV] 1365 elif field == constants.VF_VG: 1366 val = vol[constants.VF_VG] 1367 elif field == constants.VF_NAME: 1368 val = vol[constants.VF_NAME] 1369 elif field == constants.VF_SIZE: 1370 val = int(float(vol[constants.VF_SIZE])) 1371 elif field == constants.VF_INSTANCE: 1372 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + 1373 vol[constants.VF_NAME]), None) 1374 if inst is not None: 1375 val = inst.name 1376 else: 1377 val = "-" 1378 else: 1379 raise errors.ParameterError(field) 1380 node_output.append(str(val)) 1381 1382 output.append(node_output) 1383 1384 return output
1385 1386
1387 -class LUNodeQueryStorage(NoHooksLU):
1388 """Logical unit for getting information on storage units on node(s). 1389 1390 """ 1391 REQ_BGL = False 1392
1393 - def CheckArguments(self):
1394 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1395 self.op.output_fields)
1396
1397 - def ExpandNames(self):
1398 self.share_locks = ShareAll() 1399 1400 if self.op.nodes: 1401 self.needed_locks = { 1402 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1403 } 1404 else: 1405 self.needed_locks = { 1406 locking.LEVEL_NODE: locking.ALL_SET, 1407 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1408 }
1409
1410 - def _DetermineStorageType(self):
1411 """Determines the default storage type of the cluster. 1412 1413 """ 1414 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates 1415 default_storage_type = \ 1416 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] 1417 return default_storage_type
1418
1419 - def CheckPrereq(self):
1420 """Check prerequisites. 1421 1422 """ 1423 if self.op.storage_type: 1424 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1425 self.storage_type = self.op.storage_type 1426 else: 1427 self.storage_type = self._DetermineStorageType() 1428 supported_storage_types = constants.STS_REPORT_NODE_STORAGE 1429 if self.storage_type not in supported_storage_types: 1430 raise errors.OpPrereqError( 1431 "Storage reporting for storage type '%s' is not supported. Please" 1432 " use the --storage-type option to specify one of the supported" 1433 " storage types (%s) or set the default disk template to one that" 1434 " supports storage reporting." % 1435 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1436
1437 - def Exec(self, feedback_fn):
1438 """Computes the list of nodes and their attributes. 1439 1440 """ 1441 if self.op.storage_type: 1442 self.storage_type = self.op.storage_type 1443 else: 1444 self.storage_type = self._DetermineStorageType() 1445 1446 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1447 1448 # Always get name to sort by 1449 if constants.SF_NAME in self.op.output_fields: 1450 fields = self.op.output_fields[:] 1451 else: 1452 fields = [constants.SF_NAME] + self.op.output_fields 1453 1454 # Never ask for node or type as it's only known to the LU 1455 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1456 while extra in fields: 1457 fields.remove(extra) 1458 1459 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1460 name_idx = field_idx[constants.SF_NAME] 1461 1462 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) 1463 data = self.rpc.call_storage_list(self.node_uuids, 1464 self.storage_type, st_args, 1465 self.op.name, fields) 1466 1467 result = [] 1468 1469 for node_uuid in utils.NiceSort(self.node_uuids): 1470 node_name = self.cfg.GetNodeName(node_uuid) 1471 nresult = data[node_uuid] 1472 if nresult.offline: 1473 continue 1474 1475 msg = nresult.fail_msg 1476 if msg: 1477 self.LogWarning("Can't get storage data from node %s: %s", 1478 node_name, msg) 1479 continue 1480 1481 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1482 1483 for name in utils.NiceSort(rows.keys()): 1484 row = rows[name] 1485 1486 out = [] 1487 1488 for field in self.op.output_fields: 1489 if field == constants.SF_NODE: 1490 val = node_name 1491 elif field == constants.SF_TYPE: 1492 val = self.storage_type 1493 elif field in field_idx: 1494 val = row[field_idx[field]] 1495 else: 1496 raise errors.ParameterError(field) 1497 1498 out.append(val) 1499 1500 result.append(out) 1501 1502 return result
1503 1504
1505 -class LUNodeRemove(LogicalUnit):
1506 """Logical unit for removing a node. 1507 1508 """ 1509 HPATH = "node-remove" 1510 HTYPE = constants.HTYPE_NODE 1511
1512 - def BuildHooksEnv(self):
1513 """Build hooks env. 1514 1515 """ 1516 return { 1517 "OP_TARGET": self.op.node_name, 1518 "NODE_NAME": self.op.node_name, 1519 }
1520
1521 - def BuildHooksNodes(self):
1522 """Build hooks nodes. 1523 1524 This doesn't run on the target node in the pre phase as a failed 1525 node would then be impossible to remove. 1526 1527 """ 1528 all_nodes = self.cfg.GetNodeList() 1529 try: 1530 all_nodes.remove(self.op.node_uuid) 1531 except ValueError: 1532 pass 1533 return (all_nodes, all_nodes)
1534
1535 - def CheckPrereq(self):
1536 """Check prerequisites. 1537 1538 This checks: 1539 - the node exists in the configuration 1540 - it does not have primary or secondary instances 1541 - it's not the master 1542 1543 Any errors are signaled by raising errors.OpPrereqError. 1544 1545 """ 1546 (self.op.node_uuid, self.op.node_name) = \ 1547 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1548 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1549 assert node is not None 1550 1551 masternode = self.cfg.GetMasterNode() 1552 if node.uuid == masternode: 1553 raise errors.OpPrereqError("Node is the master node, failover to another" 1554 " node is required", errors.ECODE_INVAL) 1555 1556 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1557 if node.uuid in self.cfg.GetInstanceNodes(instance.uuid): 1558 raise errors.OpPrereqError("Instance %s is still running on the node," 1559 " please remove first" % instance.name, 1560 errors.ECODE_INVAL) 1561 self.op.node_name = node.name 1562 self.node = node
1563
1564 - def Exec(self, feedback_fn):
1565 """Removes the node from the cluster. 1566 1567 """ 1568 logging.info("Stopping the node daemon and removing configs from node %s", 1569 self.node.name) 1570 1571 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1572 1573 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1574 "Not owning BGL" 1575 1576 if modify_ssh_setup: 1577 # retrieve the list of potential master candidates before the node is 1578 # removed 1579 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 1580 potential_master_candidate = \ 1581 self.op.node_name in potential_master_candidates 1582 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 1583 master_node = self.cfg.GetMasterNode() 1584 result = self.rpc.call_node_ssh_key_remove( 1585 [master_node], 1586 self.node.uuid, self.op.node_name, 1587 master_candidate_uuids, potential_master_candidates, 1588 self.node.master_candidate, # from_authorized_keys 1589 potential_master_candidate, # from_public_keys 1590 True, # clear node's 'authorized_keys' 1591 True, # clear node's 'ganeti_public_keys' 1592 False) # no readd 1593 result[master_node].Raise( 1594 "Could not remove the SSH key of node '%s' (UUID: %s)." % 1595 (self.op.node_name, self.node.uuid)) 1596 WarnAboutFailedSshUpdates(result, master_node, feedback_fn) 1597 1598 # Promote nodes to master candidate as needed 1599 AdjustCandidatePool(self, [self.node.uuid]) 1600 self.context.RemoveNode(self.cfg, self.node) 1601 1602 # Run post hooks on the node before it's removed 1603 RunPostHook(self, self.node.name) 1604 1605 # we have to call this by name rather than by UUID, as the node is no longer 1606 # in the config 1607 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1608 msg = result.fail_msg 1609 if msg: 1610 self.LogWarning("Errors encountered on the remote node while leaving" 1611 " the cluster: %s", msg) 1612 1613 cluster = self.cfg.GetClusterInfo() 1614 1615 # Remove node from candidate certificate list 1616 if self.node.master_candidate: 1617 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid) 1618 1619 # Remove node from our /etc/hosts 1620 if cluster.modify_etc_hosts: 1621 master_node_uuid = self.cfg.GetMasterNode() 1622 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1623 constants.ETC_HOSTS_REMOVE, 1624 self.node.name, None) 1625 result.Raise("Can't update hosts file with new host data") 1626 RedistributeAncillaryFiles(self)
1627 1628
1629 -class LURepairNodeStorage(NoHooksLU):
1630 """Repairs the volume group on a node. 1631 1632 """ 1633 REQ_BGL = False 1634
1635 - def CheckArguments(self):
1636 (self.op.node_uuid, self.op.node_name) = \ 1637 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1638 1639 storage_type = self.op.storage_type 1640 1641 if (constants.SO_FIX_CONSISTENCY not in 1642 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1643 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1644 " repaired" % storage_type, 1645 errors.ECODE_INVAL)
1646
1647 - def ExpandNames(self):
1648 self.needed_locks = { 1649 locking.LEVEL_NODE: [self.op.node_uuid], 1650 }
1651
1652 - def _CheckFaultyDisks(self, instance, node_uuid):
1653 """Ensure faulty disks abort the opcode or at least warn.""" 1654 try: 1655 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1656 node_uuid, True): 1657 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1658 " node '%s'" % 1659 (instance.name, 1660 self.cfg.GetNodeName(node_uuid)), 1661 errors.ECODE_STATE) 1662 except errors.OpPrereqError, err: 1663 if self.op.ignore_consistency: 1664 self.LogWarning(str(err.args[0])) 1665 else: 1666 raise
1667
1668 - def CheckPrereq(self):
1669 """Check prerequisites. 1670 1671 """ 1672 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1673 1674 # Check whether any instance on this node has faulty disks 1675 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1676 if not inst.disks_active: 1677 continue 1678 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid)) 1679 check_nodes.discard(self.op.node_uuid) 1680 for inst_node_uuid in check_nodes: 1681 self._CheckFaultyDisks(inst, inst_node_uuid)
1682
1683 - def Exec(self, feedback_fn):
1684 feedback_fn("Repairing storage unit '%s' on %s ..." % 1685 (self.op.name, self.op.node_name)) 1686 1687 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1688 result = self.rpc.call_storage_execute(self.op.node_uuid, 1689 self.op.storage_type, st_args, 1690 self.op.name, 1691 constants.SO_FIX_CONSISTENCY) 1692 result.Raise("Failed to repair storage unit '%s' on %s" % 1693 (self.op.name, self.op.node_name))
1694