Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Logical units dealing with nodes.""" 
  23   
  24  import logging 
  25  import operator 
  26   
  27  from ganeti import constants 
  28  from ganeti import errors 
  29  from ganeti import locking 
  30  from ganeti import netutils 
  31  from ganeti import objects 
  32  from ganeti import opcodes 
  33  from ganeti import qlang 
  34  from ganeti import query 
  35  from ganeti import rpc 
  36  from ganeti import utils 
  37  from ganeti.masterd import iallocator 
  38   
  39  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \ 
  40    ResultWithJobs 
  41  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  42    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  43    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  44    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  45    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  46    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  47    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  48    FindFaultyInstanceDisks, CheckStorageTypeEnabled 
  49   
  50   
51 -def _DecideSelfPromotion(lu, exceptions=None):
52 """Decide whether I should promote myself as a master candidate. 53 54 """ 55 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 56 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 57 # the new node will increase mc_max with one, so: 58 mc_should = min(mc_should + 1, cp_size) 59 return mc_now < mc_should
60 61
62 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63 """Ensure that a node has the given secondary ip. 64 65 @type lu: L{LogicalUnit} 66 @param lu: the LU on behalf of which we make the check 67 @type node: L{objects.Node} 68 @param node: the node to check 69 @type secondary_ip: string 70 @param secondary_ip: the ip to check 71 @type prereq: boolean 72 @param prereq: whether to throw a prerequisite or an execute error 73 @raise errors.OpPrereqError: if the node doesn't have the ip, 74 and prereq=True 75 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 76 77 """ 78 # this can be called with a new node, which has no UUID yet, so perform the 79 # RPC call using its name 80 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 81 result.Raise("Failure checking secondary ip on node %s" % node.name, 82 prereq=prereq, ecode=errors.ECODE_ENVIRON) 83 if not result.payload: 84 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 85 " please fix and re-run this command" % secondary_ip) 86 if prereq: 87 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 88 else: 89 raise errors.OpExecError(msg)
90 91
92 -class LUNodeAdd(LogicalUnit):
93 """Logical unit for adding node to the cluster. 94 95 """ 96 HPATH = "node-add" 97 HTYPE = constants.HTYPE_NODE 98 _NFLAGS = ["master_capable", "vm_capable"] 99
100 - def CheckArguments(self):
101 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 102 # validate/normalize the node name 103 self.hostname = netutils.GetHostname(name=self.op.node_name, 104 family=self.primary_ip_family) 105 self.op.node_name = self.hostname.name 106 107 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 108 raise errors.OpPrereqError("Cannot readd the master node", 109 errors.ECODE_STATE) 110 111 if self.op.readd and self.op.group: 112 raise errors.OpPrereqError("Cannot pass a node group when a node is" 113 " being readded", errors.ECODE_INVAL)
114
115 - def BuildHooksEnv(self):
116 """Build hooks env. 117 118 This will run on all nodes before, and on all nodes + the new node after. 119 120 """ 121 return { 122 "OP_TARGET": self.op.node_name, 123 "NODE_NAME": self.op.node_name, 124 "NODE_PIP": self.op.primary_ip, 125 "NODE_SIP": self.op.secondary_ip, 126 "MASTER_CAPABLE": str(self.op.master_capable), 127 "VM_CAPABLE": str(self.op.vm_capable), 128 }
129
130 - def BuildHooksNodes(self):
131 """Build hooks nodes. 132 133 """ 134 hook_nodes = self.cfg.GetNodeList() 135 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 136 if new_node_info is not None: 137 # Exclude added node 138 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 139 140 # add the new node as post hook node by name; it does not have an UUID yet 141 return (hook_nodes, hook_nodes, [self.op.node_name, ])
142
143 - def CheckPrereq(self):
144 """Check prerequisites. 145 146 This checks: 147 - the new node is not already in the config 148 - it is resolvable 149 - its parameters (single/dual homed) matches the cluster 150 151 Any errors are signaled by raising errors.OpPrereqError. 152 153 """ 154 node_name = self.hostname.name 155 self.op.primary_ip = self.hostname.ip 156 if self.op.secondary_ip is None: 157 if self.primary_ip_family == netutils.IP6Address.family: 158 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 159 " IPv4 address must be given as secondary", 160 errors.ECODE_INVAL) 161 self.op.secondary_ip = self.op.primary_ip 162 163 secondary_ip = self.op.secondary_ip 164 if not netutils.IP4Address.IsValid(secondary_ip): 165 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 166 " address" % secondary_ip, errors.ECODE_INVAL) 167 168 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 169 if not self.op.readd and existing_node_info is not None: 170 raise errors.OpPrereqError("Node %s is already in the configuration" % 171 node_name, errors.ECODE_EXISTS) 172 elif self.op.readd and existing_node_info is None: 173 raise errors.OpPrereqError("Node %s is not in the configuration" % 174 node_name, errors.ECODE_NOENT) 175 176 self.changed_primary_ip = False 177 178 for existing_node in self.cfg.GetAllNodesInfo().values(): 179 if self.op.readd and node_name == existing_node.name: 180 if existing_node.secondary_ip != secondary_ip: 181 raise errors.OpPrereqError("Readded node doesn't have the same IP" 182 " address configuration as before", 183 errors.ECODE_INVAL) 184 if existing_node.primary_ip != self.op.primary_ip: 185 self.changed_primary_ip = True 186 187 continue 188 189 if (existing_node.primary_ip == self.op.primary_ip or 190 existing_node.secondary_ip == self.op.primary_ip or 191 existing_node.primary_ip == secondary_ip or 192 existing_node.secondary_ip == secondary_ip): 193 raise errors.OpPrereqError("New node ip address(es) conflict with" 194 " existing node %s" % existing_node.name, 195 errors.ECODE_NOTUNIQUE) 196 197 # After this 'if' block, None is no longer a valid value for the 198 # _capable op attributes 199 if self.op.readd: 200 assert existing_node_info is not None, \ 201 "Can't retrieve locked node %s" % node_name 202 for attr in self._NFLAGS: 203 if getattr(self.op, attr) is None: 204 setattr(self.op, attr, getattr(existing_node_info, attr)) 205 else: 206 for attr in self._NFLAGS: 207 if getattr(self.op, attr) is None: 208 setattr(self.op, attr, True) 209 210 if self.op.readd and not self.op.vm_capable: 211 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 212 if pri or sec: 213 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 214 " flag set to false, but it already holds" 215 " instances" % node_name, 216 errors.ECODE_STATE) 217 218 # check that the type of the node (single versus dual homed) is the 219 # same as for the master 220 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) 221 master_singlehomed = myself.secondary_ip == myself.primary_ip 222 newbie_singlehomed = secondary_ip == self.op.primary_ip 223 if master_singlehomed != newbie_singlehomed: 224 if master_singlehomed: 225 raise errors.OpPrereqError("The master has no secondary ip but the" 226 " new node has one", 227 errors.ECODE_INVAL) 228 else: 229 raise errors.OpPrereqError("The master has a secondary ip but the" 230 " new node doesn't have one", 231 errors.ECODE_INVAL) 232 233 # checks reachability 234 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 235 raise errors.OpPrereqError("Node not reachable by ping", 236 errors.ECODE_ENVIRON) 237 238 if not newbie_singlehomed: 239 # check reachability from my secondary ip to newbie's secondary ip 240 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 241 source=myself.secondary_ip): 242 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 243 " based ping to node daemon port", 244 errors.ECODE_ENVIRON) 245 246 if self.op.readd: 247 exceptions = [existing_node_info.uuid] 248 else: 249 exceptions = [] 250 251 if self.op.master_capable: 252 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 253 else: 254 self.master_candidate = False 255 256 if self.op.readd: 257 self.new_node = existing_node_info 258 else: 259 node_group = self.cfg.LookupNodeGroup(self.op.group) 260 self.new_node = objects.Node(name=node_name, 261 primary_ip=self.op.primary_ip, 262 secondary_ip=secondary_ip, 263 master_candidate=self.master_candidate, 264 offline=False, drained=False, 265 group=node_group, ndparams={}) 266 267 if self.op.ndparams: 268 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 269 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 270 "node", "cluster or group") 271 272 if self.op.hv_state: 273 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 274 275 if self.op.disk_state: 276 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 277 278 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 279 # it a property on the base class. 280 rpcrunner = rpc.DnsOnlyRunner() 281 result = rpcrunner.call_version([node_name])[node_name] 282 result.Raise("Can't get version information from node %s" % node_name) 283 if constants.PROTOCOL_VERSION == result.payload: 284 logging.info("Communication to node %s fine, sw version %s match", 285 node_name, result.payload) 286 else: 287 raise errors.OpPrereqError("Version mismatch master version %s," 288 " node version %s" % 289 (constants.PROTOCOL_VERSION, result.payload), 290 errors.ECODE_ENVIRON) 291 292 vg_name = self.cfg.GetVGName() 293 if vg_name is not None: 294 vparams = {constants.NV_PVLIST: [vg_name]} 295 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 296 cname = self.cfg.GetClusterName() 297 result = rpcrunner.call_node_verify_light( 298 [node_name], vparams, cname, 299 self.cfg.GetClusterInfo().hvparams)[node_name] 300 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 301 if errmsgs: 302 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 303 "; ".join(errmsgs), errors.ECODE_ENVIRON)
304
305 - def Exec(self, feedback_fn):
306 """Adds the new node to the cluster. 307 308 """ 309 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 310 "Not owning BGL" 311 312 # We adding a new node so we assume it's powered 313 self.new_node.powered = True 314 315 # for re-adds, reset the offline/drained/master-candidate flags; 316 # we need to reset here, otherwise offline would prevent RPC calls 317 # later in the procedure; this also means that if the re-add 318 # fails, we are left with a non-offlined, broken node 319 if self.op.readd: 320 self.new_node.offline = False 321 self.new_node.drained = False 322 self.LogInfo("Readding a node, the offline/drained flags were reset") 323 # if we demote the node, we do cleanup later in the procedure 324 self.new_node.master_candidate = self.master_candidate 325 if self.changed_primary_ip: 326 self.new_node.primary_ip = self.op.primary_ip 327 328 # copy the master/vm_capable flags 329 for attr in self._NFLAGS: 330 setattr(self.new_node, attr, getattr(self.op, attr)) 331 332 # notify the user about any possible mc promotion 333 if self.new_node.master_candidate: 334 self.LogInfo("Node will be a master candidate") 335 336 if self.op.ndparams: 337 self.new_node.ndparams = self.op.ndparams 338 else: 339 self.new_node.ndparams = {} 340 341 if self.op.hv_state: 342 self.new_node.hv_state_static = self.new_hv_state 343 344 if self.op.disk_state: 345 self.new_node.disk_state_static = self.new_disk_state 346 347 # Add node to our /etc/hosts, and add key to known_hosts 348 if self.cfg.GetClusterInfo().modify_etc_hosts: 349 master_node = self.cfg.GetMasterNode() 350 result = self.rpc.call_etc_hosts_modify( 351 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 352 self.hostname.ip) 353 result.Raise("Can't update hosts file with new host data") 354 355 if self.new_node.secondary_ip != self.new_node.primary_ip: 356 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 357 False) 358 359 node_verifier_uuids = [self.cfg.GetMasterNode()] 360 node_verify_param = { 361 constants.NV_NODELIST: ([self.new_node.name], {}), 362 # TODO: do a node-net-test as well? 363 } 364 365 result = self.rpc.call_node_verify( 366 node_verifier_uuids, node_verify_param, 367 self.cfg.GetClusterName(), 368 self.cfg.GetClusterInfo().hvparams) 369 for verifier in node_verifier_uuids: 370 result[verifier].Raise("Cannot communicate with node %s" % verifier) 371 nl_payload = result[verifier].payload[constants.NV_NODELIST] 372 if nl_payload: 373 for failed in nl_payload: 374 feedback_fn("ssh/hostname verification failed" 375 " (checking from %s): %s" % 376 (verifier, nl_payload[failed])) 377 raise errors.OpExecError("ssh/hostname verification failed") 378 379 if self.op.readd: 380 self.context.ReaddNode(self.new_node) 381 RedistributeAncillaryFiles(self) 382 # make sure we redistribute the config 383 self.cfg.Update(self.new_node, feedback_fn) 384 # and make sure the new node will not have old files around 385 if not self.new_node.master_candidate: 386 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 387 result.Warn("Node failed to demote itself from master candidate status", 388 self.LogWarning) 389 else: 390 self.context.AddNode(self.new_node, self.proc.GetECId()) 391 RedistributeAncillaryFiles(self)
392 393
394 -class LUNodeSetParams(LogicalUnit):
395 """Modifies the parameters of a node. 396 397 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 398 to the node role (as _ROLE_*) 399 @cvar _R2F: a dictionary from node role to tuples of flags 400 @cvar _FLAGS: a list of attribute names corresponding to the flags 401 402 """ 403 HPATH = "node-modify" 404 HTYPE = constants.HTYPE_NODE 405 REQ_BGL = False 406 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 407 _F2R = { 408 (True, False, False): _ROLE_CANDIDATE, 409 (False, True, False): _ROLE_DRAINED, 410 (False, False, True): _ROLE_OFFLINE, 411 (False, False, False): _ROLE_REGULAR, 412 } 413 _R2F = dict((v, k) for k, v in _F2R.items()) 414 _FLAGS = ["master_candidate", "drained", "offline"] 415
416 - def CheckArguments(self):
417 (self.op.node_uuid, self.op.node_name) = \ 418 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 419 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 420 self.op.master_capable, self.op.vm_capable, 421 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 422 self.op.disk_state] 423 if all_mods.count(None) == len(all_mods): 424 raise errors.OpPrereqError("Please pass at least one modification", 425 errors.ECODE_INVAL) 426 if all_mods.count(True) > 1: 427 raise errors.OpPrereqError("Can't set the node into more than one" 428 " state at the same time", 429 errors.ECODE_INVAL) 430 431 # Boolean value that tells us whether we might be demoting from MC 432 self.might_demote = (self.op.master_candidate is False or 433 self.op.offline is True or 434 self.op.drained is True or 435 self.op.master_capable is False) 436 437 if self.op.secondary_ip: 438 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 439 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 440 " address" % self.op.secondary_ip, 441 errors.ECODE_INVAL) 442 443 self.lock_all = self.op.auto_promote and self.might_demote 444 self.lock_instances = self.op.secondary_ip is not None
445
446 - def _InstanceFilter(self, instance):
447 """Filter for getting affected instances. 448 449 """ 450 return (instance.disk_template in constants.DTS_INT_MIRROR and 451 self.op.node_uuid in instance.all_nodes)
452
453 - def ExpandNames(self):
454 if self.lock_all: 455 self.needed_locks = { 456 locking.LEVEL_NODE: locking.ALL_SET, 457 458 # Block allocations when all nodes are locked 459 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 460 } 461 else: 462 self.needed_locks = { 463 locking.LEVEL_NODE: self.op.node_uuid, 464 } 465 466 # Since modifying a node can have severe effects on currently running 467 # operations the resource lock is at least acquired in shared mode 468 self.needed_locks[locking.LEVEL_NODE_RES] = \ 469 self.needed_locks[locking.LEVEL_NODE] 470 471 # Get all locks except nodes in shared mode; they are not used for anything 472 # but read-only access 473 self.share_locks = ShareAll() 474 self.share_locks[locking.LEVEL_NODE] = 0 475 self.share_locks[locking.LEVEL_NODE_RES] = 0 476 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 477 478 if self.lock_instances: 479 self.needed_locks[locking.LEVEL_INSTANCE] = \ 480 self.cfg.GetInstanceNames( 481 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
482
483 - def BuildHooksEnv(self):
484 """Build hooks env. 485 486 This runs on the master node. 487 488 """ 489 return { 490 "OP_TARGET": self.op.node_name, 491 "MASTER_CANDIDATE": str(self.op.master_candidate), 492 "OFFLINE": str(self.op.offline), 493 "DRAINED": str(self.op.drained), 494 "MASTER_CAPABLE": str(self.op.master_capable), 495 "VM_CAPABLE": str(self.op.vm_capable), 496 }
497
498 - def BuildHooksNodes(self):
499 """Build hooks nodes. 500 501 """ 502 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 503 return (nl, nl)
504
505 - def CheckPrereq(self):
506 """Check prerequisites. 507 508 This only checks the instance list against the existing names. 509 510 """ 511 node = self.cfg.GetNodeInfo(self.op.node_uuid) 512 if self.lock_instances: 513 affected_instances = \ 514 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 515 516 # Verify instance locks 517 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 518 wanted_instance_names = frozenset([inst.name for inst in 519 affected_instances.values()]) 520 if wanted_instance_names - owned_instance_names: 521 raise errors.OpPrereqError("Instances affected by changing node %s's" 522 " secondary IP address have changed since" 523 " locks were acquired, wanted '%s', have" 524 " '%s'; retry the operation" % 525 (node.name, 526 utils.CommaJoin(wanted_instance_names), 527 utils.CommaJoin(owned_instance_names)), 528 errors.ECODE_STATE) 529 else: 530 affected_instances = None 531 532 if (self.op.master_candidate is not None or 533 self.op.drained is not None or 534 self.op.offline is not None): 535 # we can't change the master's node flags 536 if node.uuid == self.cfg.GetMasterNode(): 537 raise errors.OpPrereqError("The master role can be changed" 538 " only via master-failover", 539 errors.ECODE_INVAL) 540 541 if self.op.master_candidate and not node.master_capable: 542 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 543 " it a master candidate" % node.name, 544 errors.ECODE_STATE) 545 546 if self.op.vm_capable is False: 547 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 548 if ipri or isec: 549 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 550 " the vm_capable flag" % node.name, 551 errors.ECODE_STATE) 552 553 if node.master_candidate and self.might_demote and not self.lock_all: 554 assert not self.op.auto_promote, "auto_promote set but lock_all not" 555 # check if after removing the current node, we're missing master 556 # candidates 557 (mc_remaining, mc_should, _) = \ 558 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 559 if mc_remaining < mc_should: 560 raise errors.OpPrereqError("Not enough master candidates, please" 561 " pass auto promote option to allow" 562 " promotion (--auto-promote or RAPI" 563 " auto_promote=True)", errors.ECODE_STATE) 564 565 self.old_flags = old_flags = (node.master_candidate, 566 node.drained, node.offline) 567 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 568 self.old_role = old_role = self._F2R[old_flags] 569 570 # Check for ineffective changes 571 for attr in self._FLAGS: 572 if getattr(self.op, attr) is False and getattr(node, attr) is False: 573 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 574 setattr(self.op, attr, None) 575 576 # Past this point, any flag change to False means a transition 577 # away from the respective state, as only real changes are kept 578 579 # TODO: We might query the real power state if it supports OOB 580 if SupportsOob(self.cfg, node): 581 if self.op.offline is False and not (node.powered or 582 self.op.powered is True): 583 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 584 " offline status can be reset") % 585 self.op.node_name, errors.ECODE_STATE) 586 elif self.op.powered is not None: 587 raise errors.OpPrereqError(("Unable to change powered state for node %s" 588 " as it does not support out-of-band" 589 " handling") % self.op.node_name, 590 errors.ECODE_STATE) 591 592 # If we're being deofflined/drained, we'll MC ourself if needed 593 if (self.op.drained is False or self.op.offline is False or 594 (self.op.master_capable and not node.master_capable)): 595 if _DecideSelfPromotion(self): 596 self.op.master_candidate = True 597 self.LogInfo("Auto-promoting node to master candidate") 598 599 # If we're no longer master capable, we'll demote ourselves from MC 600 if self.op.master_capable is False and node.master_candidate: 601 self.LogInfo("Demoting from master candidate") 602 self.op.master_candidate = False 603 604 # Compute new role 605 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 606 if self.op.master_candidate: 607 new_role = self._ROLE_CANDIDATE 608 elif self.op.drained: 609 new_role = self._ROLE_DRAINED 610 elif self.op.offline: 611 new_role = self._ROLE_OFFLINE 612 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 613 # False is still in new flags, which means we're un-setting (the 614 # only) True flag 615 new_role = self._ROLE_REGULAR 616 else: # no new flags, nothing, keep old role 617 new_role = old_role 618 619 self.new_role = new_role 620 621 if old_role == self._ROLE_OFFLINE and new_role != old_role: 622 # Trying to transition out of offline status 623 result = self.rpc.call_version([node.uuid])[node.uuid] 624 if result.fail_msg: 625 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 626 " to report its version: %s" % 627 (node.name, result.fail_msg), 628 errors.ECODE_STATE) 629 else: 630 self.LogWarning("Transitioning node from offline to online state" 631 " without using re-add. Please make sure the node" 632 " is healthy!") 633 634 # When changing the secondary ip, verify if this is a single-homed to 635 # multi-homed transition or vice versa, and apply the relevant 636 # restrictions. 637 if self.op.secondary_ip: 638 # Ok even without locking, because this can't be changed by any LU 639 master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) 640 master_singlehomed = master.secondary_ip == master.primary_ip 641 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 642 if self.op.force and node.uuid == master.uuid: 643 self.LogWarning("Transitioning from single-homed to multi-homed" 644 " cluster; all nodes will require a secondary IP" 645 " address") 646 else: 647 raise errors.OpPrereqError("Changing the secondary ip on a" 648 " single-homed cluster requires the" 649 " --force option to be passed, and the" 650 " target node to be the master", 651 errors.ECODE_INVAL) 652 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 653 if self.op.force and node.uuid == master.uuid: 654 self.LogWarning("Transitioning from multi-homed to single-homed" 655 " cluster; secondary IP addresses will have to be" 656 " removed") 657 else: 658 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 659 " same as the primary IP on a multi-homed" 660 " cluster, unless the --force option is" 661 " passed, and the target node is the" 662 " master", errors.ECODE_INVAL) 663 664 assert not (set([inst.name for inst in affected_instances.values()]) - 665 self.owned_locks(locking.LEVEL_INSTANCE)) 666 667 if node.offline: 668 if affected_instances: 669 msg = ("Cannot change secondary IP address: offline node has" 670 " instances (%s) configured to use it" % 671 utils.CommaJoin( 672 [inst.name for inst in affected_instances.values()])) 673 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 674 else: 675 # On online nodes, check that no instances are running, and that 676 # the node has the new ip and we can reach it. 677 for instance in affected_instances.values(): 678 CheckInstanceState(self, instance, INSTANCE_DOWN, 679 msg="cannot change secondary ip") 680 681 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 682 if master.uuid != node.uuid: 683 # check reachability from master secondary ip to new secondary ip 684 if not netutils.TcpPing(self.op.secondary_ip, 685 constants.DEFAULT_NODED_PORT, 686 source=master.secondary_ip): 687 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 688 " based ping to node daemon port", 689 errors.ECODE_ENVIRON) 690 691 if self.op.ndparams: 692 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 693 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 694 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 695 "node", "cluster or group") 696 self.new_ndparams = new_ndparams 697 698 if self.op.hv_state: 699 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 700 node.hv_state_static) 701 702 if self.op.disk_state: 703 self.new_disk_state = \ 704 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
705
706 - def Exec(self, feedback_fn):
707 """Modifies a node. 708 709 """ 710 node = self.cfg.GetNodeInfo(self.op.node_uuid) 711 result = [] 712 713 if self.op.ndparams: 714 node.ndparams = self.new_ndparams 715 716 if self.op.powered is not None: 717 node.powered = self.op.powered 718 719 if self.op.hv_state: 720 node.hv_state_static = self.new_hv_state 721 722 if self.op.disk_state: 723 node.disk_state_static = self.new_disk_state 724 725 for attr in ["master_capable", "vm_capable"]: 726 val = getattr(self.op, attr) 727 if val is not None: 728 setattr(node, attr, val) 729 result.append((attr, str(val))) 730 731 if self.new_role != self.old_role: 732 # Tell the node to demote itself, if no longer MC and not offline 733 if self.old_role == self._ROLE_CANDIDATE and \ 734 self.new_role != self._ROLE_OFFLINE: 735 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 736 if msg: 737 self.LogWarning("Node failed to demote itself: %s", msg) 738 739 new_flags = self._R2F[self.new_role] 740 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 741 if of != nf: 742 result.append((desc, str(nf))) 743 (node.master_candidate, node.drained, node.offline) = new_flags 744 745 # we locked all nodes, we adjust the CP before updating this node 746 if self.lock_all: 747 AdjustCandidatePool(self, [node.uuid]) 748 749 if self.op.secondary_ip: 750 node.secondary_ip = self.op.secondary_ip 751 result.append(("secondary_ip", self.op.secondary_ip)) 752 753 # this will trigger configuration file update, if needed 754 self.cfg.Update(node, feedback_fn) 755 756 # this will trigger job queue propagation or cleanup if the mc 757 # flag changed 758 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 759 self.context.ReaddNode(node) 760 761 return result
762 763
764 -class LUNodePowercycle(NoHooksLU):
765 """Powercycles a node. 766 767 """ 768 REQ_BGL = False 769
770 - def CheckArguments(self):
771 (self.op.node_uuid, self.op.node_name) = \ 772 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 773 774 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 775 raise errors.OpPrereqError("The node is the master and the force" 776 " parameter was not set", 777 errors.ECODE_INVAL)
778
779 - def ExpandNames(self):
780 """Locking for PowercycleNode. 781 782 This is a last-resort option and shouldn't block on other 783 jobs. Therefore, we grab no locks. 784 785 """ 786 self.needed_locks = {}
787
788 - def Exec(self, feedback_fn):
789 """Reboots a node. 790 791 """ 792 default_hypervisor = self.cfg.GetHypervisorType() 793 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 794 result = self.rpc.call_node_powercycle(self.op.node_uuid, 795 default_hypervisor, 796 hvparams) 797 result.Raise("Failed to schedule the reboot") 798 return result.payload
799 800
801 -def _GetNodeInstancesInner(cfg, fn):
802 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
803 804
805 -def _GetNodePrimaryInstances(cfg, node_uuid):
806 """Returns primary instances on a node. 807 808 """ 809 return _GetNodeInstancesInner(cfg, 810 lambda inst: node_uuid == inst.primary_node)
811 812
813 -def _GetNodeSecondaryInstances(cfg, node_uuid):
814 """Returns secondary instances on a node. 815 816 """ 817 return _GetNodeInstancesInner(cfg, 818 lambda inst: node_uuid in inst.secondary_nodes)
819 820
821 -def _GetNodeInstances(cfg, node_uuid):
822 """Returns a list of all primary and secondary instances on a node. 823 824 """ 825 826 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
827 828
829 -class LUNodeEvacuate(NoHooksLU):
830 """Evacuates instances off a list of nodes. 831 832 """ 833 REQ_BGL = False 834 835 _MODE2IALLOCATOR = { 836 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI, 837 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC, 838 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL, 839 } 840 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES 841 assert (frozenset(_MODE2IALLOCATOR.values()) == 842 constants.IALLOCATOR_NEVAC_MODES) 843
844 - def CheckArguments(self):
845 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
846
847 - def ExpandNames(self):
848 (self.op.node_uuid, self.op.node_name) = \ 849 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 850 851 if self.op.remote_node is not None: 852 (self.op.remote_node_uuid, self.op.remote_node) = \ 853 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 854 self.op.remote_node) 855 assert self.op.remote_node 856 857 if self.op.node_uuid == self.op.remote_node_uuid: 858 raise errors.OpPrereqError("Can not use evacuated node as a new" 859 " secondary node", errors.ECODE_INVAL) 860 861 if self.op.mode != constants.NODE_EVAC_SEC: 862 raise errors.OpPrereqError("Without the use of an iallocator only" 863 " secondary instances can be evacuated", 864 errors.ECODE_INVAL) 865 866 # Declare locks 867 self.share_locks = ShareAll() 868 self.needed_locks = { 869 locking.LEVEL_INSTANCE: [], 870 locking.LEVEL_NODEGROUP: [], 871 locking.LEVEL_NODE: [], 872 } 873 874 # Determine nodes (via group) optimistically, needs verification once locks 875 # have been acquired 876 self.lock_nodes = self._DetermineNodes()
877
878 - def _DetermineNodes(self):
879 """Gets the list of node UUIDs to operate on. 880 881 """ 882 if self.op.remote_node is None: 883 # Iallocator will choose any node(s) in the same group 884 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 885 else: 886 group_nodes = frozenset([self.op.remote_node_uuid]) 887 888 # Determine nodes to be locked 889 return set([self.op.node_uuid]) | group_nodes
890
891 - def _DetermineInstances(self):
892 """Builds list of instances to operate on. 893 894 """ 895 assert self.op.mode in constants.NODE_EVAC_MODES 896 897 if self.op.mode == constants.NODE_EVAC_PRI: 898 # Primary instances only 899 inst_fn = _GetNodePrimaryInstances 900 assert self.op.remote_node is None, \ 901 "Evacuating primary instances requires iallocator" 902 elif self.op.mode == constants.NODE_EVAC_SEC: 903 # Secondary instances only 904 inst_fn = _GetNodeSecondaryInstances 905 else: 906 # All instances 907 assert self.op.mode == constants.NODE_EVAC_ALL 908 inst_fn = _GetNodeInstances 909 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 910 # per instance 911 raise errors.OpPrereqError("Due to an issue with the iallocator" 912 " interface it is not possible to evacuate" 913 " all instances at once; specify explicitly" 914 " whether to evacuate primary or secondary" 915 " instances", 916 errors.ECODE_INVAL) 917 918 return inst_fn(self.cfg, self.op.node_uuid)
919
920 - def DeclareLocks(self, level):
921 if level == locking.LEVEL_INSTANCE: 922 # Lock instances optimistically, needs verification once node and group 923 # locks have been acquired 924 self.needed_locks[locking.LEVEL_INSTANCE] = \ 925 set(i.name for i in self._DetermineInstances()) 926 927 elif level == locking.LEVEL_NODEGROUP: 928 # Lock node groups for all potential target nodes optimistically, needs 929 # verification once nodes have been acquired 930 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 931 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 932 933 elif level == locking.LEVEL_NODE: 934 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
935
936 - def CheckPrereq(self):
937 # Verify locks 938 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 939 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 940 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 941 942 need_nodes = self._DetermineNodes() 943 944 if not owned_nodes.issuperset(need_nodes): 945 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 946 " locks were acquired, current nodes are" 947 " are '%s', used to be '%s'; retry the" 948 " operation" % 949 (self.op.node_name, 950 utils.CommaJoin(need_nodes), 951 utils.CommaJoin(owned_nodes)), 952 errors.ECODE_STATE) 953 954 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 955 if owned_groups != wanted_groups: 956 raise errors.OpExecError("Node groups changed since locks were acquired," 957 " current groups are '%s', used to be '%s';" 958 " retry the operation" % 959 (utils.CommaJoin(wanted_groups), 960 utils.CommaJoin(owned_groups))) 961 962 # Determine affected instances 963 self.instances = self._DetermineInstances() 964 self.instance_names = [i.name for i in self.instances] 965 966 if set(self.instance_names) != owned_instance_names: 967 raise errors.OpExecError("Instances on node '%s' changed since locks" 968 " were acquired, current instances are '%s'," 969 " used to be '%s'; retry the operation" % 970 (self.op.node_name, 971 utils.CommaJoin(self.instance_names), 972 utils.CommaJoin(owned_instance_names))) 973 974 if self.instance_names: 975 self.LogInfo("Evacuating instances from node '%s': %s", 976 self.op.node_name, 977 utils.CommaJoin(utils.NiceSort(self.instance_names))) 978 else: 979 self.LogInfo("No instances to evacuate from node '%s'", 980 self.op.node_name) 981 982 if self.op.remote_node is not None: 983 for i in self.instances: 984 if i.primary_node == self.op.remote_node_uuid: 985 raise errors.OpPrereqError("Node %s is the primary node of" 986 " instance %s, cannot use it as" 987 " secondary" % 988 (self.op.remote_node, i.name), 989 errors.ECODE_INVAL)
990
991 - def Exec(self, feedback_fn):
992 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 993 994 if not self.instance_names: 995 # No instances to evacuate 996 jobs = [] 997 998 elif self.op.iallocator is not None: 999 # TODO: Implement relocation to other group 1000 evac_mode = self._MODE2IALLOCATOR[self.op.mode] 1001 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode, 1002 instances=list(self.instance_names)) 1003 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1004 1005 ial.Run(self.op.iallocator) 1006 1007 if not ial.success: 1008 raise errors.OpPrereqError("Can't compute node evacuation using" 1009 " iallocator '%s': %s" % 1010 (self.op.iallocator, ial.info), 1011 errors.ECODE_NORES) 1012 1013 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1014 1015 elif self.op.remote_node is not None: 1016 assert self.op.mode == constants.NODE_EVAC_SEC 1017 jobs = [ 1018 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1019 remote_node=self.op.remote_node, 1020 disks=[], 1021 mode=constants.REPLACE_DISK_CHG, 1022 early_release=self.op.early_release)] 1023 for instance_name in self.instance_names] 1024 1025 else: 1026 raise errors.ProgrammerError("No iallocator or remote node") 1027 1028 return ResultWithJobs(jobs)
1029 1030
1031 -class LUNodeMigrate(LogicalUnit):
1032 """Migrate all instances from a node. 1033 1034 """ 1035 HPATH = "node-migrate" 1036 HTYPE = constants.HTYPE_NODE 1037 REQ_BGL = False 1038
1039 - def CheckArguments(self):
1040 pass
1041
1042 - def ExpandNames(self):
1043 (self.op.node_uuid, self.op.node_name) = \ 1044 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1045 1046 self.share_locks = ShareAll() 1047 self.needed_locks = { 1048 locking.LEVEL_NODE: [self.op.node_uuid], 1049 }
1050
1051 - def BuildHooksEnv(self):
1052 """Build hooks env. 1053 1054 This runs on the master, the primary and all the secondaries. 1055 1056 """ 1057 return { 1058 "NODE_NAME": self.op.node_name, 1059 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1060 }
1061
1062 - def BuildHooksNodes(self):
1063 """Build hooks nodes. 1064 1065 """ 1066 nl = [self.cfg.GetMasterNode()] 1067 return (nl, nl)
1068
1069 - def CheckPrereq(self):
1070 pass
1071
1072 - def Exec(self, feedback_fn):
1073 # Prepare jobs for migration instances 1074 jobs = [ 1075 [opcodes.OpInstanceMigrate( 1076 instance_name=inst.name, 1077 mode=self.op.mode, 1078 live=self.op.live, 1079 iallocator=self.op.iallocator, 1080 target_node=self.op.target_node, 1081 allow_runtime_changes=self.op.allow_runtime_changes, 1082 ignore_ipolicy=self.op.ignore_ipolicy)] 1083 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1084 1085 # TODO: Run iallocator in this opcode and pass correct placement options to 1086 # OpInstanceMigrate. Since other jobs can modify the cluster between 1087 # running the iallocator and the actual migration, a good consistency model 1088 # will have to be found. 1089 1090 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1091 frozenset([self.op.node_uuid])) 1092 1093 return ResultWithJobs(jobs)
1094 1095
1096 -def _GetStorageTypeArgs(cfg, storage_type):
1097 """Returns the arguments for a storage type. 1098 1099 """ 1100 # Special case for file storage 1101 if storage_type == constants.ST_FILE: 1102 # storage.FileStorage wants a list of storage directories 1103 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]] 1104 1105 return []
1106 1107
1108 -class LUNodeModifyStorage(NoHooksLU):
1109 """Logical unit for modifying a storage volume on a node. 1110 1111 """ 1112 REQ_BGL = False 1113
1114 - def CheckArguments(self):
1115 (self.op.node_uuid, self.op.node_name) = \ 1116 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1117 1118 storage_type = self.op.storage_type 1119 1120 try: 1121 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1122 except KeyError: 1123 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1124 " modified" % storage_type, 1125 errors.ECODE_INVAL) 1126 1127 diff = set(self.op.changes.keys()) - modifiable 1128 if diff: 1129 raise errors.OpPrereqError("The following fields can not be modified for" 1130 " storage units of type '%s': %r" % 1131 (storage_type, list(diff)), 1132 errors.ECODE_INVAL)
1133
1134 - def CheckPrereq(self):
1135 """Check prerequisites. 1136 1137 """ 1138 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1139
1140 - def ExpandNames(self):
1141 self.needed_locks = { 1142 locking.LEVEL_NODE: self.op.node_uuid, 1143 }
1144
1145 - def Exec(self, feedback_fn):
1146 """Computes the list of nodes and their attributes. 1147 1148 """ 1149 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1150 result = self.rpc.call_storage_modify(self.op.node_uuid, 1151 self.op.storage_type, st_args, 1152 self.op.name, self.op.changes) 1153 result.Raise("Failed to modify storage unit '%s' on %s" % 1154 (self.op.name, self.op.node_name))
1155 1156
1157 -class NodeQuery(QueryBase):
1158 FIELDS = query.NODE_FIELDS 1159
1160 - def ExpandNames(self, lu):
1161 lu.needed_locks = {} 1162 lu.share_locks = ShareAll() 1163 1164 if self.names: 1165 (self.wanted, _) = GetWantedNodes(lu, self.names) 1166 else: 1167 self.wanted = locking.ALL_SET 1168 1169 self.do_locking = (self.use_locking and 1170 query.NQ_LIVE in self.requested_data) 1171 1172 if self.do_locking: 1173 # If any non-static field is requested we need to lock the nodes 1174 lu.needed_locks[locking.LEVEL_NODE] = self.wanted 1175 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1176
1177 - def DeclareLocks(self, lu, level):
1178 pass
1179
1180 - def _GetQueryData(self, lu):
1181 """Computes the list of nodes and their attributes. 1182 1183 """ 1184 all_info = lu.cfg.GetAllNodesInfo() 1185 1186 node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE) 1187 1188 # Gather data as requested 1189 if query.NQ_LIVE in self.requested_data: 1190 # filter out non-vm_capable nodes 1191 toquery_node_uuids = [node.uuid for node in all_info.values() 1192 if node.vm_capable and node.uuid in node_uuids] 1193 lvm_enabled = utils.storage.IsLvmEnabled( 1194 lu.cfg.GetClusterInfo().enabled_disk_templates) 1195 # FIXME: this per default asks for storage space information for all 1196 # enabled disk templates. Fix this by making it possible to specify 1197 # space report fields for specific disk templates. 1198 raw_storage_units = utils.storage.GetStorageUnitsOfCluster( 1199 lu.cfg, include_spindles=lvm_enabled) 1200 storage_units = rpc.PrepareStorageUnitsForNodes( 1201 lu.cfg, raw_storage_units, toquery_node_uuids) 1202 default_hypervisor = lu.cfg.GetHypervisorType() 1203 hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor] 1204 hvspecs = [(default_hypervisor, hvparams)] 1205 node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units, 1206 hvspecs) 1207 live_data = dict( 1208 (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, 1209 require_spindles=lvm_enabled)) 1210 for (uuid, nresult) in node_data.items() 1211 if not nresult.fail_msg and nresult.payload) 1212 else: 1213 live_data = None 1214 1215 if query.NQ_INST in self.requested_data: 1216 node_to_primary = dict([(uuid, set()) for uuid in node_uuids]) 1217 node_to_secondary = dict([(uuid, set()) for uuid in node_uuids]) 1218 1219 inst_data = lu.cfg.GetAllInstancesInfo() 1220 inst_uuid_to_inst_name = {} 1221 1222 for inst in inst_data.values(): 1223 inst_uuid_to_inst_name[inst.uuid] = inst.name 1224 if inst.primary_node in node_to_primary: 1225 node_to_primary[inst.primary_node].add(inst.uuid) 1226 for secnode in inst.secondary_nodes: 1227 if secnode in node_to_secondary: 1228 node_to_secondary[secnode].add(inst.uuid) 1229 else: 1230 node_to_primary = None 1231 node_to_secondary = None 1232 inst_uuid_to_inst_name = None 1233 1234 if query.NQ_OOB in self.requested_data: 1235 oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node))) 1236 for uuid, node in all_info.iteritems()) 1237 else: 1238 oob_support = None 1239 1240 if query.NQ_GROUP in self.requested_data: 1241 groups = lu.cfg.GetAllNodeGroupsInfo() 1242 else: 1243 groups = {} 1244 1245 return query.NodeQueryData([all_info[uuid] for uuid in node_uuids], 1246 live_data, lu.cfg.GetMasterNode(), 1247 node_to_primary, node_to_secondary, 1248 inst_uuid_to_inst_name, groups, oob_support, 1249 lu.cfg.GetClusterInfo())
1250 1251
1252 -class LUNodeQuery(NoHooksLU):
1253 """Logical unit for querying nodes. 1254 1255 """ 1256 # pylint: disable=W0142 1257 REQ_BGL = False 1258
1259 - def CheckArguments(self):
1260 self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names), 1261 self.op.output_fields, self.op.use_locking)
1262
1263 - def ExpandNames(self):
1264 self.nq.ExpandNames(self)
1265
1266 - def DeclareLocks(self, level):
1267 self.nq.DeclareLocks(self, level)
1268
1269 - def Exec(self, feedback_fn):
1270 return self.nq.OldStyleQuery(self)
1271 1272
1273 -def _CheckOutputFields(static, dynamic, selected):
1274 """Checks whether all selected fields are valid. 1275 1276 @type static: L{utils.FieldSet} 1277 @param static: static fields set 1278 @type dynamic: L{utils.FieldSet} 1279 @param dynamic: dynamic fields set 1280 1281 """ 1282 f = utils.FieldSet() 1283 f.Extend(static) 1284 f.Extend(dynamic) 1285 1286 delta = f.NonMatching(selected) 1287 if delta: 1288 raise errors.OpPrereqError("Unknown output fields selected: %s" 1289 % ",".join(delta), errors.ECODE_INVAL)
1290 1291
1292 -class LUNodeQueryvols(NoHooksLU):
1293 """Logical unit for getting volumes on node(s). 1294 1295 """ 1296 REQ_BGL = False 1297 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance") 1298 _FIELDS_STATIC = utils.FieldSet("node") 1299
1300 - def CheckArguments(self):
1301 _CheckOutputFields(static=self._FIELDS_STATIC, 1302 dynamic=self._FIELDS_DYNAMIC, 1303 selected=self.op.output_fields)
1304
1305 - def ExpandNames(self):
1306 self.share_locks = ShareAll() 1307 1308 if self.op.nodes: 1309 self.needed_locks = { 1310 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1311 } 1312 else: 1313 self.needed_locks = { 1314 locking.LEVEL_NODE: locking.ALL_SET, 1315 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1316 }
1317
1318 - def Exec(self, feedback_fn):
1319 """Computes the list of nodes and their attributes. 1320 1321 """ 1322 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1323 volumes = self.rpc.call_node_volumes(node_uuids) 1324 1325 ilist = self.cfg.GetAllInstancesInfo() 1326 vol2inst = MapInstanceLvsToNodes(ilist.values()) 1327 1328 output = [] 1329 for node_uuid in node_uuids: 1330 nresult = volumes[node_uuid] 1331 if nresult.offline: 1332 continue 1333 msg = nresult.fail_msg 1334 if msg: 1335 self.LogWarning("Can't compute volume data on node %s: %s", 1336 self.cfg.GetNodeName(node_uuid), msg) 1337 continue 1338 1339 node_vols = sorted(nresult.payload, 1340 key=operator.itemgetter("dev")) 1341 1342 for vol in node_vols: 1343 node_output = [] 1344 for field in self.op.output_fields: 1345 if field == "node": 1346 val = self.cfg.GetNodeName(node_uuid) 1347 elif field == "phys": 1348 val = vol["dev"] 1349 elif field == "vg": 1350 val = vol["vg"] 1351 elif field == "name": 1352 val = vol["name"] 1353 elif field == "size": 1354 val = int(float(vol["size"])) 1355 elif field == "instance": 1356 inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]), 1357 None) 1358 if inst is not None: 1359 val = inst.name 1360 else: 1361 val = "-" 1362 else: 1363 raise errors.ParameterError(field) 1364 node_output.append(str(val)) 1365 1366 output.append(node_output) 1367 1368 return output
1369 1370
1371 -class LUNodeQueryStorage(NoHooksLU):
1372 """Logical unit for getting information on storage units on node(s). 1373 1374 """ 1375 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) 1376 REQ_BGL = False 1377
1378 - def CheckArguments(self):
1379 _CheckOutputFields(static=self._FIELDS_STATIC, 1380 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1381 selected=self.op.output_fields)
1382
1383 - def ExpandNames(self):
1384 self.share_locks = ShareAll() 1385 1386 if self.op.nodes: 1387 self.needed_locks = { 1388 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1389 } 1390 else: 1391 self.needed_locks = { 1392 locking.LEVEL_NODE: locking.ALL_SET, 1393 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1394 }
1395
1396 - def CheckPrereq(self):
1397 """Check prerequisites. 1398 1399 """ 1400 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1401
1402 - def Exec(self, feedback_fn):
1403 """Computes the list of nodes and their attributes. 1404 1405 """ 1406 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1407 1408 # Always get name to sort by 1409 if constants.SF_NAME in self.op.output_fields: 1410 fields = self.op.output_fields[:] 1411 else: 1412 fields = [constants.SF_NAME] + self.op.output_fields 1413 1414 # Never ask for node or type as it's only known to the LU 1415 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1416 while extra in fields: 1417 fields.remove(extra) 1418 1419 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1420 name_idx = field_idx[constants.SF_NAME] 1421 1422 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1423 data = self.rpc.call_storage_list(self.node_uuids, 1424 self.op.storage_type, st_args, 1425 self.op.name, fields) 1426 1427 result = [] 1428 1429 for node_uuid in utils.NiceSort(self.node_uuids): 1430 node_name = self.cfg.GetNodeName(node_uuid) 1431 nresult = data[node_uuid] 1432 if nresult.offline: 1433 continue 1434 1435 msg = nresult.fail_msg 1436 if msg: 1437 self.LogWarning("Can't get storage data from node %s: %s", 1438 node_name, msg) 1439 continue 1440 1441 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1442 1443 for name in utils.NiceSort(rows.keys()): 1444 row = rows[name] 1445 1446 out = [] 1447 1448 for field in self.op.output_fields: 1449 if field == constants.SF_NODE: 1450 val = node_name 1451 elif field == constants.SF_TYPE: 1452 val = self.op.storage_type 1453 elif field in field_idx: 1454 val = row[field_idx[field]] 1455 else: 1456 raise errors.ParameterError(field) 1457 1458 out.append(val) 1459 1460 result.append(out) 1461 1462 return result
1463 1464
1465 -class LUNodeRemove(LogicalUnit):
1466 """Logical unit for removing a node. 1467 1468 """ 1469 HPATH = "node-remove" 1470 HTYPE = constants.HTYPE_NODE 1471
1472 - def BuildHooksEnv(self):
1473 """Build hooks env. 1474 1475 """ 1476 return { 1477 "OP_TARGET": self.op.node_name, 1478 "NODE_NAME": self.op.node_name, 1479 }
1480
1481 - def BuildHooksNodes(self):
1482 """Build hooks nodes. 1483 1484 This doesn't run on the target node in the pre phase as a failed 1485 node would then be impossible to remove. 1486 1487 """ 1488 all_nodes = self.cfg.GetNodeList() 1489 try: 1490 all_nodes.remove(self.op.node_uuid) 1491 except ValueError: 1492 pass 1493 return (all_nodes, all_nodes)
1494
1495 - def CheckPrereq(self):
1496 """Check prerequisites. 1497 1498 This checks: 1499 - the node exists in the configuration 1500 - it does not have primary or secondary instances 1501 - it's not the master 1502 1503 Any errors are signaled by raising errors.OpPrereqError. 1504 1505 """ 1506 (self.op.node_uuid, self.op.node_name) = \ 1507 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1508 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1509 assert node is not None 1510 1511 masternode = self.cfg.GetMasterNode() 1512 if node.uuid == masternode: 1513 raise errors.OpPrereqError("Node is the master node, failover to another" 1514 " node is required", errors.ECODE_INVAL) 1515 1516 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1517 if node.uuid in instance.all_nodes: 1518 raise errors.OpPrereqError("Instance %s is still running on the node," 1519 " please remove first" % instance.name, 1520 errors.ECODE_INVAL) 1521 self.op.node_name = node.name 1522 self.node = node
1523
1524 - def Exec(self, feedback_fn):
1525 """Removes the node from the cluster. 1526 1527 """ 1528 logging.info("Stopping the node daemon and removing configs from node %s", 1529 self.node.name) 1530 1531 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1532 1533 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1534 "Not owning BGL" 1535 1536 # Promote nodes to master candidate as needed 1537 AdjustCandidatePool(self, exceptions=[self.node.uuid]) 1538 self.context.RemoveNode(self.node) 1539 1540 # Run post hooks on the node before it's removed 1541 RunPostHook(self, self.node.name) 1542 1543 # we have to call this by name rather than by UUID, as the node is no longer 1544 # in the config 1545 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1546 msg = result.fail_msg 1547 if msg: 1548 self.LogWarning("Errors encountered on the remote node while leaving" 1549 " the cluster: %s", msg) 1550 1551 # Remove node from our /etc/hosts 1552 if self.cfg.GetClusterInfo().modify_etc_hosts: 1553 master_node_uuid = self.cfg.GetMasterNode() 1554 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1555 constants.ETC_HOSTS_REMOVE, 1556 self.node.name, None) 1557 result.Raise("Can't update hosts file with new host data") 1558 RedistributeAncillaryFiles(self)
1559 1560
1561 -class LURepairNodeStorage(NoHooksLU):
1562 """Repairs the volume group on a node. 1563 1564 """ 1565 REQ_BGL = False 1566
1567 - def CheckArguments(self):
1568 (self.op.node_uuid, self.op.node_name) = \ 1569 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1570 1571 storage_type = self.op.storage_type 1572 1573 if (constants.SO_FIX_CONSISTENCY not in 1574 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1575 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1576 " repaired" % storage_type, 1577 errors.ECODE_INVAL)
1578
1579 - def ExpandNames(self):
1580 self.needed_locks = { 1581 locking.LEVEL_NODE: [self.op.node_uuid], 1582 }
1583
1584 - def _CheckFaultyDisks(self, instance, node_uuid):
1585 """Ensure faulty disks abort the opcode or at least warn.""" 1586 try: 1587 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1588 node_uuid, True): 1589 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1590 " node '%s'" % 1591 (instance.name, 1592 self.cfg.GetNodeName(node_uuid)), 1593 errors.ECODE_STATE) 1594 except errors.OpPrereqError, err: 1595 if self.op.ignore_consistency: 1596 self.LogWarning(str(err.args[0])) 1597 else: 1598 raise
1599
1600 - def CheckPrereq(self):
1601 """Check prerequisites. 1602 1603 """ 1604 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1605 1606 # Check whether any instance on this node has faulty disks 1607 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1608 if not inst.disks_active: 1609 continue 1610 check_nodes = set(inst.all_nodes) 1611 check_nodes.discard(self.op.node_uuid) 1612 for inst_node_uuid in check_nodes: 1613 self._CheckFaultyDisks(inst, inst_node_uuid)
1614
1615 - def Exec(self, feedback_fn):
1616 feedback_fn("Repairing storage unit '%s' on %s ..." % 1617 (self.op.name, self.op.node_name)) 1618 1619 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1620 result = self.rpc.call_storage_execute(self.op.node_uuid, 1621 self.op.storage_type, st_args, 1622 self.op.name, 1623 constants.SO_FIX_CONSISTENCY) 1624 result.Raise("Failed to repair storage unit '%s' on %s" % 1625 (self.op.name, self.op.node_name))
1626