Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Logical units dealing with nodes.""" 
  23   
  24  import logging 
  25  import operator 
  26   
  27  from ganeti import constants 
  28  from ganeti import errors 
  29  from ganeti import locking 
  30  from ganeti import netutils 
  31  from ganeti import objects 
  32  from ganeti import opcodes 
  33  from ganeti import qlang 
  34  from ganeti import query 
  35  from ganeti import rpc 
  36  from ganeti import utils 
  37  from ganeti.masterd import iallocator 
  38   
  39  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \ 
  40    ResultWithJobs 
  41  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  42    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  43    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  44    RedistributeAncillaryFiles, ExpandNodeName, ShareAll, SupportsOob, \ 
  45    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  46    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  47    GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \ 
  48    FindFaultyInstanceDisks 
  49   
  50   
51 -def _DecideSelfPromotion(lu, exceptions=None):
52 """Decide whether I should promote myself as a master candidate. 53 54 """ 55 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 56 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 57 # the new node will increase mc_max with one, so: 58 mc_should = min(mc_should + 1, cp_size) 59 return mc_now < mc_should
60 61
62 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63 """Ensure that a node has the given secondary ip. 64 65 @type lu: L{LogicalUnit} 66 @param lu: the LU on behalf of which we make the check 67 @type node: string 68 @param node: the node to check 69 @type secondary_ip: string 70 @param secondary_ip: the ip to check 71 @type prereq: boolean 72 @param prereq: whether to throw a prerequisite or an execute error 73 @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True 74 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 75 76 """ 77 result = lu.rpc.call_node_has_ip_address(node, secondary_ip) 78 result.Raise("Failure checking secondary ip on node %s" % node, 79 prereq=prereq, ecode=errors.ECODE_ENVIRON) 80 if not result.payload: 81 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 82 " please fix and re-run this command" % secondary_ip) 83 if prereq: 84 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 85 else: 86 raise errors.OpExecError(msg)
87 88
89 -class LUNodeAdd(LogicalUnit):
90 """Logical unit for adding node to the cluster. 91 92 """ 93 HPATH = "node-add" 94 HTYPE = constants.HTYPE_NODE 95 _NFLAGS = ["master_capable", "vm_capable"] 96
97 - def CheckArguments(self):
98 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 99 # validate/normalize the node name 100 self.hostname = netutils.GetHostname(name=self.op.node_name, 101 family=self.primary_ip_family) 102 self.op.node_name = self.hostname.name 103 104 if self.op.readd and self.op.node_name == self.cfg.GetMasterNode(): 105 raise errors.OpPrereqError("Cannot readd the master node", 106 errors.ECODE_STATE) 107 108 if self.op.readd and self.op.group: 109 raise errors.OpPrereqError("Cannot pass a node group when a node is" 110 " being readded", errors.ECODE_INVAL)
111
112 - def BuildHooksEnv(self):
113 """Build hooks env. 114 115 This will run on all nodes before, and on all nodes + the new node after. 116 117 """ 118 return { 119 "OP_TARGET": self.op.node_name, 120 "NODE_NAME": self.op.node_name, 121 "NODE_PIP": self.op.primary_ip, 122 "NODE_SIP": self.op.secondary_ip, 123 "MASTER_CAPABLE": str(self.op.master_capable), 124 "VM_CAPABLE": str(self.op.vm_capable), 125 }
126
127 - def BuildHooksNodes(self):
128 """Build hooks nodes. 129 130 """ 131 # Exclude added node 132 pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name])) 133 post_nodes = pre_nodes + [self.op.node_name, ] 134 135 return (pre_nodes, post_nodes)
136
137 - def CheckPrereq(self):
138 """Check prerequisites. 139 140 This checks: 141 - the new node is not already in the config 142 - it is resolvable 143 - its parameters (single/dual homed) matches the cluster 144 145 Any errors are signaled by raising errors.OpPrereqError. 146 147 """ 148 cfg = self.cfg 149 hostname = self.hostname 150 node = hostname.name 151 primary_ip = self.op.primary_ip = hostname.ip 152 if self.op.secondary_ip is None: 153 if self.primary_ip_family == netutils.IP6Address.family: 154 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 155 " IPv4 address must be given as secondary", 156 errors.ECODE_INVAL) 157 self.op.secondary_ip = primary_ip 158 159 secondary_ip = self.op.secondary_ip 160 if not netutils.IP4Address.IsValid(secondary_ip): 161 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 162 " address" % secondary_ip, errors.ECODE_INVAL) 163 164 node_list = cfg.GetNodeList() 165 if not self.op.readd and node in node_list: 166 raise errors.OpPrereqError("Node %s is already in the configuration" % 167 node, errors.ECODE_EXISTS) 168 elif self.op.readd and node not in node_list: 169 raise errors.OpPrereqError("Node %s is not in the configuration" % node, 170 errors.ECODE_NOENT) 171 172 self.changed_primary_ip = False 173 174 for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list): 175 if self.op.readd and node == existing_node_name: 176 if existing_node.secondary_ip != secondary_ip: 177 raise errors.OpPrereqError("Readded node doesn't have the same IP" 178 " address configuration as before", 179 errors.ECODE_INVAL) 180 if existing_node.primary_ip != primary_ip: 181 self.changed_primary_ip = True 182 183 continue 184 185 if (existing_node.primary_ip == primary_ip or 186 existing_node.secondary_ip == primary_ip or 187 existing_node.primary_ip == secondary_ip or 188 existing_node.secondary_ip == secondary_ip): 189 raise errors.OpPrereqError("New node ip address(es) conflict with" 190 " existing node %s" % existing_node.name, 191 errors.ECODE_NOTUNIQUE) 192 193 # After this 'if' block, None is no longer a valid value for the 194 # _capable op attributes 195 if self.op.readd: 196 old_node = self.cfg.GetNodeInfo(node) 197 assert old_node is not None, "Can't retrieve locked node %s" % node 198 for attr in self._NFLAGS: 199 if getattr(self.op, attr) is None: 200 setattr(self.op, attr, getattr(old_node, attr)) 201 else: 202 for attr in self._NFLAGS: 203 if getattr(self.op, attr) is None: 204 setattr(self.op, attr, True) 205 206 if self.op.readd and not self.op.vm_capable: 207 pri, sec = cfg.GetNodeInstances(node) 208 if pri or sec: 209 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 210 " flag set to false, but it already holds" 211 " instances" % node, 212 errors.ECODE_STATE) 213 214 # check that the type of the node (single versus dual homed) is the 215 # same as for the master 216 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode()) 217 master_singlehomed = myself.secondary_ip == myself.primary_ip 218 newbie_singlehomed = secondary_ip == primary_ip 219 if master_singlehomed != newbie_singlehomed: 220 if master_singlehomed: 221 raise errors.OpPrereqError("The master has no secondary ip but the" 222 " new node has one", 223 errors.ECODE_INVAL) 224 else: 225 raise errors.OpPrereqError("The master has a secondary ip but the" 226 " new node doesn't have one", 227 errors.ECODE_INVAL) 228 229 # checks reachability 230 if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): 231 raise errors.OpPrereqError("Node not reachable by ping", 232 errors.ECODE_ENVIRON) 233 234 if not newbie_singlehomed: 235 # check reachability from my secondary ip to newbie's secondary ip 236 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 237 source=myself.secondary_ip): 238 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 239 " based ping to node daemon port", 240 errors.ECODE_ENVIRON) 241 242 if self.op.readd: 243 exceptions = [node] 244 else: 245 exceptions = [] 246 247 if self.op.master_capable: 248 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 249 else: 250 self.master_candidate = False 251 252 if self.op.readd: 253 self.new_node = old_node 254 else: 255 node_group = cfg.LookupNodeGroup(self.op.group) 256 self.new_node = objects.Node(name=node, 257 primary_ip=primary_ip, 258 secondary_ip=secondary_ip, 259 master_candidate=self.master_candidate, 260 offline=False, drained=False, 261 group=node_group, ndparams={}) 262 263 if self.op.ndparams: 264 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 265 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 266 "node", "cluster or group") 267 268 if self.op.hv_state: 269 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 270 271 if self.op.disk_state: 272 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 273 274 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 275 # it a property on the base class. 276 rpcrunner = rpc.DnsOnlyRunner() 277 result = rpcrunner.call_version([node])[node] 278 result.Raise("Can't get version information from node %s" % node) 279 if constants.PROTOCOL_VERSION == result.payload: 280 logging.info("Communication to node %s fine, sw version %s match", 281 node, result.payload) 282 else: 283 raise errors.OpPrereqError("Version mismatch master version %s," 284 " node version %s" % 285 (constants.PROTOCOL_VERSION, result.payload), 286 errors.ECODE_ENVIRON) 287 288 vg_name = cfg.GetVGName() 289 if vg_name is not None: 290 vparams = {constants.NV_PVLIST: [vg_name]} 291 excl_stor = IsExclusiveStorageEnabledNode(cfg, self.new_node) 292 cname = self.cfg.GetClusterName() 293 result = rpcrunner.call_node_verify_light([node], vparams, cname)[node] 294 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 295 if errmsgs: 296 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 297 "; ".join(errmsgs), errors.ECODE_ENVIRON)
298
299 - def Exec(self, feedback_fn):
300 """Adds the new node to the cluster. 301 302 """ 303 new_node = self.new_node 304 node = new_node.name 305 306 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 307 "Not owning BGL" 308 309 # We adding a new node so we assume it's powered 310 new_node.powered = True 311 312 # for re-adds, reset the offline/drained/master-candidate flags; 313 # we need to reset here, otherwise offline would prevent RPC calls 314 # later in the procedure; this also means that if the re-add 315 # fails, we are left with a non-offlined, broken node 316 if self.op.readd: 317 new_node.drained = new_node.offline = False # pylint: disable=W0201 318 self.LogInfo("Readding a node, the offline/drained flags were reset") 319 # if we demote the node, we do cleanup later in the procedure 320 new_node.master_candidate = self.master_candidate 321 if self.changed_primary_ip: 322 new_node.primary_ip = self.op.primary_ip 323 324 # copy the master/vm_capable flags 325 for attr in self._NFLAGS: 326 setattr(new_node, attr, getattr(self.op, attr)) 327 328 # notify the user about any possible mc promotion 329 if new_node.master_candidate: 330 self.LogInfo("Node will be a master candidate") 331 332 if self.op.ndparams: 333 new_node.ndparams = self.op.ndparams 334 else: 335 new_node.ndparams = {} 336 337 if self.op.hv_state: 338 new_node.hv_state_static = self.new_hv_state 339 340 if self.op.disk_state: 341 new_node.disk_state_static = self.new_disk_state 342 343 # Add node to our /etc/hosts, and add key to known_hosts 344 if self.cfg.GetClusterInfo().modify_etc_hosts: 345 master_node = self.cfg.GetMasterNode() 346 result = self.rpc.call_etc_hosts_modify(master_node, 347 constants.ETC_HOSTS_ADD, 348 self.hostname.name, 349 self.hostname.ip) 350 result.Raise("Can't update hosts file with new host data") 351 352 if new_node.secondary_ip != new_node.primary_ip: 353 _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip, 354 False) 355 356 node_verify_list = [self.cfg.GetMasterNode()] 357 node_verify_param = { 358 constants.NV_NODELIST: ([node], {}), 359 # TODO: do a node-net-test as well? 360 } 361 362 result = self.rpc.call_node_verify(node_verify_list, node_verify_param, 363 self.cfg.GetClusterName()) 364 for verifier in node_verify_list: 365 result[verifier].Raise("Cannot communicate with node %s" % verifier) 366 nl_payload = result[verifier].payload[constants.NV_NODELIST] 367 if nl_payload: 368 for failed in nl_payload: 369 feedback_fn("ssh/hostname verification failed" 370 " (checking from %s): %s" % 371 (verifier, nl_payload[failed])) 372 raise errors.OpExecError("ssh/hostname verification failed") 373 374 if self.op.readd: 375 RedistributeAncillaryFiles(self) 376 self.context.ReaddNode(new_node) 377 # make sure we redistribute the config 378 self.cfg.Update(new_node, feedback_fn) 379 # and make sure the new node will not have old files around 380 if not new_node.master_candidate: 381 result = self.rpc.call_node_demote_from_mc(new_node.name) 382 msg = result.fail_msg 383 if msg: 384 self.LogWarning("Node failed to demote itself from master" 385 " candidate status: %s" % msg) 386 else: 387 RedistributeAncillaryFiles(self, additional_nodes=[node], 388 additional_vm=self.op.vm_capable) 389 self.context.AddNode(new_node, self.proc.GetECId())
390 391
392 -class LUNodeSetParams(LogicalUnit):
393 """Modifies the parameters of a node. 394 395 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 396 to the node role (as _ROLE_*) 397 @cvar _R2F: a dictionary from node role to tuples of flags 398 @cvar _FLAGS: a list of attribute names corresponding to the flags 399 400 """ 401 HPATH = "node-modify" 402 HTYPE = constants.HTYPE_NODE 403 REQ_BGL = False 404 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 405 _F2R = { 406 (True, False, False): _ROLE_CANDIDATE, 407 (False, True, False): _ROLE_DRAINED, 408 (False, False, True): _ROLE_OFFLINE, 409 (False, False, False): _ROLE_REGULAR, 410 } 411 _R2F = dict((v, k) for k, v in _F2R.items()) 412 _FLAGS = ["master_candidate", "drained", "offline"] 413
414 - def CheckArguments(self):
415 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name) 416 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 417 self.op.master_capable, self.op.vm_capable, 418 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 419 self.op.disk_state] 420 if all_mods.count(None) == len(all_mods): 421 raise errors.OpPrereqError("Please pass at least one modification", 422 errors.ECODE_INVAL) 423 if all_mods.count(True) > 1: 424 raise errors.OpPrereqError("Can't set the node into more than one" 425 " state at the same time", 426 errors.ECODE_INVAL) 427 428 # Boolean value that tells us whether we might be demoting from MC 429 self.might_demote = (self.op.master_candidate is False or 430 self.op.offline is True or 431 self.op.drained is True or 432 self.op.master_capable is False) 433 434 if self.op.secondary_ip: 435 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 436 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 437 " address" % self.op.secondary_ip, 438 errors.ECODE_INVAL) 439 440 self.lock_all = self.op.auto_promote and self.might_demote 441 self.lock_instances = self.op.secondary_ip is not None
442
443 - def _InstanceFilter(self, instance):
444 """Filter for getting affected instances. 445 446 """ 447 return (instance.disk_template in constants.DTS_INT_MIRROR and 448 self.op.node_name in instance.all_nodes)
449
450 - def ExpandNames(self):
451 if self.lock_all: 452 self.needed_locks = { 453 locking.LEVEL_NODE: locking.ALL_SET, 454 455 # Block allocations when all nodes are locked 456 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 457 } 458 else: 459 self.needed_locks = { 460 locking.LEVEL_NODE: self.op.node_name, 461 } 462 463 # Since modifying a node can have severe effects on currently running 464 # operations the resource lock is at least acquired in shared mode 465 self.needed_locks[locking.LEVEL_NODE_RES] = \ 466 self.needed_locks[locking.LEVEL_NODE] 467 468 # Get all locks except nodes in shared mode; they are not used for anything 469 # but read-only access 470 self.share_locks = ShareAll() 471 self.share_locks[locking.LEVEL_NODE] = 0 472 self.share_locks[locking.LEVEL_NODE_RES] = 0 473 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 474 475 if self.lock_instances: 476 self.needed_locks[locking.LEVEL_INSTANCE] = \ 477 frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
478
479 - def BuildHooksEnv(self):
480 """Build hooks env. 481 482 This runs on the master node. 483 484 """ 485 return { 486 "OP_TARGET": self.op.node_name, 487 "MASTER_CANDIDATE": str(self.op.master_candidate), 488 "OFFLINE": str(self.op.offline), 489 "DRAINED": str(self.op.drained), 490 "MASTER_CAPABLE": str(self.op.master_capable), 491 "VM_CAPABLE": str(self.op.vm_capable), 492 }
493
494 - def BuildHooksNodes(self):
495 """Build hooks nodes. 496 497 """ 498 nl = [self.cfg.GetMasterNode(), self.op.node_name] 499 return (nl, nl)
500
501 - def CheckPrereq(self):
502 """Check prerequisites. 503 504 This only checks the instance list against the existing names. 505 506 """ 507 node = self.node = self.cfg.GetNodeInfo(self.op.node_name) 508 509 if self.lock_instances: 510 affected_instances = \ 511 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 512 513 # Verify instance locks 514 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) 515 wanted_instances = frozenset(affected_instances.keys()) 516 if wanted_instances - owned_instances: 517 raise errors.OpPrereqError("Instances affected by changing node %s's" 518 " secondary IP address have changed since" 519 " locks were acquired, wanted '%s', have" 520 " '%s'; retry the operation" % 521 (self.op.node_name, 522 utils.CommaJoin(wanted_instances), 523 utils.CommaJoin(owned_instances)), 524 errors.ECODE_STATE) 525 else: 526 affected_instances = None 527 528 if (self.op.master_candidate is not None or 529 self.op.drained is not None or 530 self.op.offline is not None): 531 # we can't change the master's node flags 532 if self.op.node_name == self.cfg.GetMasterNode(): 533 raise errors.OpPrereqError("The master role can be changed" 534 " only via master-failover", 535 errors.ECODE_INVAL) 536 537 if self.op.master_candidate and not node.master_capable: 538 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 539 " it a master candidate" % node.name, 540 errors.ECODE_STATE) 541 542 if self.op.vm_capable is False: 543 (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name) 544 if ipri or isec: 545 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 546 " the vm_capable flag" % node.name, 547 errors.ECODE_STATE) 548 549 if node.master_candidate and self.might_demote and not self.lock_all: 550 assert not self.op.auto_promote, "auto_promote set but lock_all not" 551 # check if after removing the current node, we're missing master 552 # candidates 553 (mc_remaining, mc_should, _) = \ 554 self.cfg.GetMasterCandidateStats(exceptions=[node.name]) 555 if mc_remaining < mc_should: 556 raise errors.OpPrereqError("Not enough master candidates, please" 557 " pass auto promote option to allow" 558 " promotion (--auto-promote or RAPI" 559 " auto_promote=True)", errors.ECODE_STATE) 560 561 self.old_flags = old_flags = (node.master_candidate, 562 node.drained, node.offline) 563 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 564 self.old_role = old_role = self._F2R[old_flags] 565 566 # Check for ineffective changes 567 for attr in self._FLAGS: 568 if (getattr(self.op, attr) is False and getattr(node, attr) is False): 569 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 570 setattr(self.op, attr, None) 571 572 # Past this point, any flag change to False means a transition 573 # away from the respective state, as only real changes are kept 574 575 # TODO: We might query the real power state if it supports OOB 576 if SupportsOob(self.cfg, node): 577 if self.op.offline is False and not (node.powered or 578 self.op.powered is True): 579 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 580 " offline status can be reset") % 581 self.op.node_name, errors.ECODE_STATE) 582 elif self.op.powered is not None: 583 raise errors.OpPrereqError(("Unable to change powered state for node %s" 584 " as it does not support out-of-band" 585 " handling") % self.op.node_name, 586 errors.ECODE_STATE) 587 588 # If we're being deofflined/drained, we'll MC ourself if needed 589 if (self.op.drained is False or self.op.offline is False or 590 (self.op.master_capable and not node.master_capable)): 591 if _DecideSelfPromotion(self): 592 self.op.master_candidate = True 593 self.LogInfo("Auto-promoting node to master candidate") 594 595 # If we're no longer master capable, we'll demote ourselves from MC 596 if self.op.master_capable is False and node.master_candidate: 597 self.LogInfo("Demoting from master candidate") 598 self.op.master_candidate = False 599 600 # Compute new role 601 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 602 if self.op.master_candidate: 603 new_role = self._ROLE_CANDIDATE 604 elif self.op.drained: 605 new_role = self._ROLE_DRAINED 606 elif self.op.offline: 607 new_role = self._ROLE_OFFLINE 608 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 609 # False is still in new flags, which means we're un-setting (the 610 # only) True flag 611 new_role = self._ROLE_REGULAR 612 else: # no new flags, nothing, keep old role 613 new_role = old_role 614 615 self.new_role = new_role 616 617 if old_role == self._ROLE_OFFLINE and new_role != old_role: 618 # Trying to transition out of offline status 619 result = self.rpc.call_version([node.name])[node.name] 620 if result.fail_msg: 621 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 622 " to report its version: %s" % 623 (node.name, result.fail_msg), 624 errors.ECODE_STATE) 625 else: 626 self.LogWarning("Transitioning node from offline to online state" 627 " without using re-add. Please make sure the node" 628 " is healthy!") 629 630 # When changing the secondary ip, verify if this is a single-homed to 631 # multi-homed transition or vice versa, and apply the relevant 632 # restrictions. 633 if self.op.secondary_ip: 634 # Ok even without locking, because this can't be changed by any LU 635 master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) 636 master_singlehomed = master.secondary_ip == master.primary_ip 637 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 638 if self.op.force and node.name == master.name: 639 self.LogWarning("Transitioning from single-homed to multi-homed" 640 " cluster; all nodes will require a secondary IP" 641 " address") 642 else: 643 raise errors.OpPrereqError("Changing the secondary ip on a" 644 " single-homed cluster requires the" 645 " --force option to be passed, and the" 646 " target node to be the master", 647 errors.ECODE_INVAL) 648 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 649 if self.op.force and node.name == master.name: 650 self.LogWarning("Transitioning from multi-homed to single-homed" 651 " cluster; secondary IP addresses will have to be" 652 " removed") 653 else: 654 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 655 " same as the primary IP on a multi-homed" 656 " cluster, unless the --force option is" 657 " passed, and the target node is the" 658 " master", errors.ECODE_INVAL) 659 660 assert not (frozenset(affected_instances) - 661 self.owned_locks(locking.LEVEL_INSTANCE)) 662 663 if node.offline: 664 if affected_instances: 665 msg = ("Cannot change secondary IP address: offline node has" 666 " instances (%s) configured to use it" % 667 utils.CommaJoin(affected_instances.keys())) 668 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 669 else: 670 # On online nodes, check that no instances are running, and that 671 # the node has the new ip and we can reach it. 672 for instance in affected_instances.values(): 673 CheckInstanceState(self, instance, INSTANCE_DOWN, 674 msg="cannot change secondary ip") 675 676 _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True) 677 if master.name != node.name: 678 # check reachability from master secondary ip to new secondary ip 679 if not netutils.TcpPing(self.op.secondary_ip, 680 constants.DEFAULT_NODED_PORT, 681 source=master.secondary_ip): 682 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 683 " based ping to node daemon port", 684 errors.ECODE_ENVIRON) 685 686 if self.op.ndparams: 687 new_ndparams = GetUpdatedParams(self.node.ndparams, self.op.ndparams) 688 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 689 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 690 "node", "cluster or group") 691 self.new_ndparams = new_ndparams 692 693 if self.op.hv_state: 694 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 695 self.node.hv_state_static) 696 697 if self.op.disk_state: 698 self.new_disk_state = \ 699 MergeAndVerifyDiskState(self.op.disk_state, 700 self.node.disk_state_static)
701
702 - def Exec(self, feedback_fn):
703 """Modifies a node. 704 705 """ 706 node = self.node 707 old_role = self.old_role 708 new_role = self.new_role 709 710 result = [] 711 712 if self.op.ndparams: 713 node.ndparams = self.new_ndparams 714 715 if self.op.powered is not None: 716 node.powered = self.op.powered 717 718 if self.op.hv_state: 719 node.hv_state_static = self.new_hv_state 720 721 if self.op.disk_state: 722 node.disk_state_static = self.new_disk_state 723 724 for attr in ["master_capable", "vm_capable"]: 725 val = getattr(self.op, attr) 726 if val is not None: 727 setattr(node, attr, val) 728 result.append((attr, str(val))) 729 730 if new_role != old_role: 731 # Tell the node to demote itself, if no longer MC and not offline 732 if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE: 733 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 734 if msg: 735 self.LogWarning("Node failed to demote itself: %s", msg) 736 737 new_flags = self._R2F[new_role] 738 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 739 if of != nf: 740 result.append((desc, str(nf))) 741 (node.master_candidate, node.drained, node.offline) = new_flags 742 743 # we locked all nodes, we adjust the CP before updating this node 744 if self.lock_all: 745 AdjustCandidatePool(self, [node.name]) 746 747 if self.op.secondary_ip: 748 node.secondary_ip = self.op.secondary_ip 749 result.append(("secondary_ip", self.op.secondary_ip)) 750 751 # this will trigger configuration file update, if needed 752 self.cfg.Update(node, feedback_fn) 753 754 # this will trigger job queue propagation or cleanup if the mc 755 # flag changed 756 if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1: 757 self.context.ReaddNode(node) 758 759 return result
760 761
762 -class LUNodePowercycle(NoHooksLU):
763 """Powercycles a node. 764 765 """ 766 REQ_BGL = False 767
768 - def CheckArguments(self):
769 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name) 770 if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force: 771 raise errors.OpPrereqError("The node is the master and the force" 772 " parameter was not set", 773 errors.ECODE_INVAL)
774
775 - def ExpandNames(self):
776 """Locking for PowercycleNode. 777 778 This is a last-resort option and shouldn't block on other 779 jobs. Therefore, we grab no locks. 780 781 """ 782 self.needed_locks = {}
783
784 - def Exec(self, feedback_fn):
785 """Reboots a node. 786 787 """ 788 result = self.rpc.call_node_powercycle(self.op.node_name, 789 self.cfg.GetHypervisorType()) 790 result.Raise("Failed to schedule the reboot") 791 return result.payload
792 793
794 -def _GetNodeInstancesInner(cfg, fn):
795 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
796 797
798 -def _GetNodePrimaryInstances(cfg, node_name):
799 """Returns primary instances on a node. 800 801 """ 802 return _GetNodeInstancesInner(cfg, 803 lambda inst: node_name == inst.primary_node)
804 805
806 -def _GetNodeSecondaryInstances(cfg, node_name):
807 """Returns secondary instances on a node. 808 809 """ 810 return _GetNodeInstancesInner(cfg, 811 lambda inst: node_name in inst.secondary_nodes)
812 813
814 -def _GetNodeInstances(cfg, node_name):
815 """Returns a list of all primary and secondary instances on a node. 816 817 """ 818 819 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
820 821
822 -class LUNodeEvacuate(NoHooksLU):
823 """Evacuates instances off a list of nodes. 824 825 """ 826 REQ_BGL = False 827 828 _MODE2IALLOCATOR = { 829 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI, 830 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC, 831 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL, 832 } 833 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES 834 assert (frozenset(_MODE2IALLOCATOR.values()) == 835 constants.IALLOCATOR_NEVAC_MODES) 836
837 - def CheckArguments(self):
838 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
839
840 - def ExpandNames(self):
841 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name) 842 843 if self.op.remote_node is not None: 844 self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node) 845 assert self.op.remote_node 846 847 if self.op.remote_node == self.op.node_name: 848 raise errors.OpPrereqError("Can not use evacuated node as a new" 849 " secondary node", errors.ECODE_INVAL) 850 851 if self.op.mode != constants.NODE_EVAC_SEC: 852 raise errors.OpPrereqError("Without the use of an iallocator only" 853 " secondary instances can be evacuated", 854 errors.ECODE_INVAL) 855 856 # Declare locks 857 self.share_locks = ShareAll() 858 self.needed_locks = { 859 locking.LEVEL_INSTANCE: [], 860 locking.LEVEL_NODEGROUP: [], 861 locking.LEVEL_NODE: [], 862 } 863 864 # Determine nodes (via group) optimistically, needs verification once locks 865 # have been acquired 866 self.lock_nodes = self._DetermineNodes()
867
868 - def _DetermineNodes(self):
869 """Gets the list of nodes to operate on. 870 871 """ 872 if self.op.remote_node is None: 873 # Iallocator will choose any node(s) in the same group 874 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name]) 875 else: 876 group_nodes = frozenset([self.op.remote_node]) 877 878 # Determine nodes to be locked 879 return set([self.op.node_name]) | group_nodes
880
881 - def _DetermineInstances(self):
882 """Builds list of instances to operate on. 883 884 """ 885 assert self.op.mode in constants.NODE_EVAC_MODES 886 887 if self.op.mode == constants.NODE_EVAC_PRI: 888 # Primary instances only 889 inst_fn = _GetNodePrimaryInstances 890 assert self.op.remote_node is None, \ 891 "Evacuating primary instances requires iallocator" 892 elif self.op.mode == constants.NODE_EVAC_SEC: 893 # Secondary instances only 894 inst_fn = _GetNodeSecondaryInstances 895 else: 896 # All instances 897 assert self.op.mode == constants.NODE_EVAC_ALL 898 inst_fn = _GetNodeInstances 899 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 900 # per instance 901 raise errors.OpPrereqError("Due to an issue with the iallocator" 902 " interface it is not possible to evacuate" 903 " all instances at once; specify explicitly" 904 " whether to evacuate primary or secondary" 905 " instances", 906 errors.ECODE_INVAL) 907 908 return inst_fn(self.cfg, self.op.node_name)
909
910 - def DeclareLocks(self, level):
911 if level == locking.LEVEL_INSTANCE: 912 # Lock instances optimistically, needs verification once node and group 913 # locks have been acquired 914 self.needed_locks[locking.LEVEL_INSTANCE] = \ 915 set(i.name for i in self._DetermineInstances()) 916 917 elif level == locking.LEVEL_NODEGROUP: 918 # Lock node groups for all potential target nodes optimistically, needs 919 # verification once nodes have been acquired 920 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 921 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 922 923 elif level == locking.LEVEL_NODE: 924 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
925
926 - def CheckPrereq(self):
927 # Verify locks 928 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) 929 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 930 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 931 932 need_nodes = self._DetermineNodes() 933 934 if not owned_nodes.issuperset(need_nodes): 935 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 936 " locks were acquired, current nodes are" 937 " are '%s', used to be '%s'; retry the" 938 " operation" % 939 (self.op.node_name, 940 utils.CommaJoin(need_nodes), 941 utils.CommaJoin(owned_nodes)), 942 errors.ECODE_STATE) 943 944 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 945 if owned_groups != wanted_groups: 946 raise errors.OpExecError("Node groups changed since locks were acquired," 947 " current groups are '%s', used to be '%s';" 948 " retry the operation" % 949 (utils.CommaJoin(wanted_groups), 950 utils.CommaJoin(owned_groups))) 951 952 # Determine affected instances 953 self.instances = self._DetermineInstances() 954 self.instance_names = [i.name for i in self.instances] 955 956 if set(self.instance_names) != owned_instances: 957 raise errors.OpExecError("Instances on node '%s' changed since locks" 958 " were acquired, current instances are '%s'," 959 " used to be '%s'; retry the operation" % 960 (self.op.node_name, 961 utils.CommaJoin(self.instance_names), 962 utils.CommaJoin(owned_instances))) 963 964 if self.instance_names: 965 self.LogInfo("Evacuating instances from node '%s': %s", 966 self.op.node_name, 967 utils.CommaJoin(utils.NiceSort(self.instance_names))) 968 else: 969 self.LogInfo("No instances to evacuate from node '%s'", 970 self.op.node_name) 971 972 if self.op.remote_node is not None: 973 for i in self.instances: 974 if i.primary_node == self.op.remote_node: 975 raise errors.OpPrereqError("Node %s is the primary node of" 976 " instance %s, cannot use it as" 977 " secondary" % 978 (self.op.remote_node, i.name), 979 errors.ECODE_INVAL)
980
981 - def Exec(self, feedback_fn):
982 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 983 984 if not self.instance_names: 985 # No instances to evacuate 986 jobs = [] 987 988 elif self.op.iallocator is not None: 989 # TODO: Implement relocation to other group 990 evac_mode = self._MODE2IALLOCATOR[self.op.mode] 991 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode, 992 instances=list(self.instance_names)) 993 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 994 995 ial.Run(self.op.iallocator) 996 997 if not ial.success: 998 raise errors.OpPrereqError("Can't compute node evacuation using" 999 " iallocator '%s': %s" % 1000 (self.op.iallocator, ial.info), 1001 errors.ECODE_NORES) 1002 1003 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1004 1005 elif self.op.remote_node is not None: 1006 assert self.op.mode == constants.NODE_EVAC_SEC 1007 jobs = [ 1008 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1009 remote_node=self.op.remote_node, 1010 disks=[], 1011 mode=constants.REPLACE_DISK_CHG, 1012 early_release=self.op.early_release)] 1013 for instance_name in self.instance_names] 1014 1015 else: 1016 raise errors.ProgrammerError("No iallocator or remote node") 1017 1018 return ResultWithJobs(jobs)
1019 1020
1021 -class LUNodeMigrate(LogicalUnit):
1022 """Migrate all instances from a node. 1023 1024 """ 1025 HPATH = "node-migrate" 1026 HTYPE = constants.HTYPE_NODE 1027 REQ_BGL = False 1028
1029 - def CheckArguments(self):
1030 pass
1031
1032 - def ExpandNames(self):
1033 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name) 1034 1035 self.share_locks = ShareAll() 1036 self.needed_locks = { 1037 locking.LEVEL_NODE: [self.op.node_name], 1038 }
1039
1040 - def BuildHooksEnv(self):
1041 """Build hooks env. 1042 1043 This runs on the master, the primary and all the secondaries. 1044 1045 """ 1046 return { 1047 "NODE_NAME": self.op.node_name, 1048 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1049 }
1050
1051 - def BuildHooksNodes(self):
1052 """Build hooks nodes. 1053 1054 """ 1055 nl = [self.cfg.GetMasterNode()] 1056 return (nl, nl)
1057
1058 - def CheckPrereq(self):
1059 pass
1060
1061 - def Exec(self, feedback_fn):
1062 # Prepare jobs for migration instances 1063 allow_runtime_changes = self.op.allow_runtime_changes 1064 jobs = [ 1065 [opcodes.OpInstanceMigrate(instance_name=inst.name, 1066 mode=self.op.mode, 1067 live=self.op.live, 1068 iallocator=self.op.iallocator, 1069 target_node=self.op.target_node, 1070 allow_runtime_changes=allow_runtime_changes, 1071 ignore_ipolicy=self.op.ignore_ipolicy)] 1072 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)] 1073 1074 # TODO: Run iallocator in this opcode and pass correct placement options to 1075 # OpInstanceMigrate. Since other jobs can modify the cluster between 1076 # running the iallocator and the actual migration, a good consistency model 1077 # will have to be found. 1078 1079 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1080 frozenset([self.op.node_name])) 1081 1082 return ResultWithJobs(jobs)
1083 1084
1085 -def _GetStorageTypeArgs(cfg, storage_type):
1086 """Returns the arguments for a storage type. 1087 1088 """ 1089 # Special case for file storage 1090 if storage_type == constants.ST_FILE: 1091 # storage.FileStorage wants a list of storage directories 1092 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]] 1093 1094 return []
1095 1096
1097 -class LUNodeModifyStorage(NoHooksLU):
1098 """Logical unit for modifying a storage volume on a node. 1099 1100 """ 1101 REQ_BGL = False 1102
1103 - def CheckArguments(self):
1104 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name) 1105 1106 storage_type = self.op.storage_type 1107 1108 try: 1109 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1110 except KeyError: 1111 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1112 " modified" % storage_type, 1113 errors.ECODE_INVAL) 1114 1115 diff = set(self.op.changes.keys()) - modifiable 1116 if diff: 1117 raise errors.OpPrereqError("The following fields can not be modified for" 1118 " storage units of type '%s': %r" % 1119 (storage_type, list(diff)), 1120 errors.ECODE_INVAL)
1121
1122 - def ExpandNames(self):
1123 self.needed_locks = { 1124 locking.LEVEL_NODE: self.op.node_name, 1125 }
1126
1127 - def Exec(self, feedback_fn):
1128 """Computes the list of nodes and their attributes. 1129 1130 """ 1131 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1132 result = self.rpc.call_storage_modify(self.op.node_name, 1133 self.op.storage_type, st_args, 1134 self.op.name, self.op.changes) 1135 result.Raise("Failed to modify storage unit '%s' on %s" % 1136 (self.op.name, self.op.node_name))
1137 1138
1139 -class NodeQuery(QueryBase):
1140 FIELDS = query.NODE_FIELDS 1141
1142 - def ExpandNames(self, lu):
1143 lu.needed_locks = {} 1144 lu.share_locks = ShareAll() 1145 1146 if self.names: 1147 self.wanted = GetWantedNodes(lu, self.names) 1148 else: 1149 self.wanted = locking.ALL_SET 1150 1151 self.do_locking = (self.use_locking and 1152 query.NQ_LIVE in self.requested_data) 1153 1154 if self.do_locking: 1155 # If any non-static field is requested we need to lock the nodes 1156 lu.needed_locks[locking.LEVEL_NODE] = self.wanted 1157 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1158
1159 - def DeclareLocks(self, lu, level):
1160 pass
1161
1162 - def _GetQueryData(self, lu):
1163 """Computes the list of nodes and their attributes. 1164 1165 """ 1166 all_info = lu.cfg.GetAllNodesInfo() 1167 1168 nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE) 1169 1170 # Gather data as requested 1171 if query.NQ_LIVE in self.requested_data: 1172 # filter out non-vm_capable nodes 1173 toquery_nodes = [name for name in nodenames if all_info[name].vm_capable] 1174 1175 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes) 1176 node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()], 1177 [lu.cfg.GetHypervisorType()], es_flags) 1178 live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload)) 1179 for (name, nresult) in node_data.items() 1180 if not nresult.fail_msg and nresult.payload) 1181 else: 1182 live_data = None 1183 1184 if query.NQ_INST in self.requested_data: 1185 node_to_primary = dict([(name, set()) for name in nodenames]) 1186 node_to_secondary = dict([(name, set()) for name in nodenames]) 1187 1188 inst_data = lu.cfg.GetAllInstancesInfo() 1189 1190 for inst in inst_data.values(): 1191 if inst.primary_node in node_to_primary: 1192 node_to_primary[inst.primary_node].add(inst.name) 1193 for secnode in inst.secondary_nodes: 1194 if secnode in node_to_secondary: 1195 node_to_secondary[secnode].add(inst.name) 1196 else: 1197 node_to_primary = None 1198 node_to_secondary = None 1199 1200 if query.NQ_OOB in self.requested_data: 1201 oob_support = dict((name, bool(SupportsOob(lu.cfg, node))) 1202 for name, node in all_info.iteritems()) 1203 else: 1204 oob_support = None 1205 1206 if query.NQ_GROUP in self.requested_data: 1207 groups = lu.cfg.GetAllNodeGroupsInfo() 1208 else: 1209 groups = {} 1210 1211 return query.NodeQueryData([all_info[name] for name in nodenames], 1212 live_data, lu.cfg.GetMasterNode(), 1213 node_to_primary, node_to_secondary, groups, 1214 oob_support, lu.cfg.GetClusterInfo())
1215 1216
1217 -class LUNodeQuery(NoHooksLU):
1218 """Logical unit for querying nodes. 1219 1220 """ 1221 # pylint: disable=W0142 1222 REQ_BGL = False 1223
1224 - def CheckArguments(self):
1225 self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names), 1226 self.op.output_fields, self.op.use_locking)
1227
1228 - def ExpandNames(self):
1229 self.nq.ExpandNames(self)
1230
1231 - def DeclareLocks(self, level):
1232 self.nq.DeclareLocks(self, level)
1233
1234 - def Exec(self, feedback_fn):
1235 return self.nq.OldStyleQuery(self)
1236 1237
1238 -def _CheckOutputFields(static, dynamic, selected):
1239 """Checks whether all selected fields are valid. 1240 1241 @type static: L{utils.FieldSet} 1242 @param static: static fields set 1243 @type dynamic: L{utils.FieldSet} 1244 @param dynamic: dynamic fields set 1245 1246 """ 1247 f = utils.FieldSet() 1248 f.Extend(static) 1249 f.Extend(dynamic) 1250 1251 delta = f.NonMatching(selected) 1252 if delta: 1253 raise errors.OpPrereqError("Unknown output fields selected: %s" 1254 % ",".join(delta), errors.ECODE_INVAL)
1255 1256
1257 -class LUNodeQueryvols(NoHooksLU):
1258 """Logical unit for getting volumes on node(s). 1259 1260 """ 1261 REQ_BGL = False 1262 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance") 1263 _FIELDS_STATIC = utils.FieldSet("node") 1264
1265 - def CheckArguments(self):
1266 _CheckOutputFields(static=self._FIELDS_STATIC, 1267 dynamic=self._FIELDS_DYNAMIC, 1268 selected=self.op.output_fields)
1269
1270 - def ExpandNames(self):
1271 self.share_locks = ShareAll() 1272 1273 if self.op.nodes: 1274 self.needed_locks = { 1275 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes), 1276 } 1277 else: 1278 self.needed_locks = { 1279 locking.LEVEL_NODE: locking.ALL_SET, 1280 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1281 }
1282
1283 - def Exec(self, feedback_fn):
1284 """Computes the list of nodes and their attributes. 1285 1286 """ 1287 nodenames = self.owned_locks(locking.LEVEL_NODE) 1288 volumes = self.rpc.call_node_volumes(nodenames) 1289 1290 ilist = self.cfg.GetAllInstancesInfo() 1291 vol2inst = MapInstanceDisksToNodes(ilist.values()) 1292 1293 output = [] 1294 for node in nodenames: 1295 nresult = volumes[node] 1296 if nresult.offline: 1297 continue 1298 msg = nresult.fail_msg 1299 if msg: 1300 self.LogWarning("Can't compute volume data on node %s: %s", node, msg) 1301 continue 1302 1303 node_vols = sorted(nresult.payload, 1304 key=operator.itemgetter("dev")) 1305 1306 for vol in node_vols: 1307 node_output = [] 1308 for field in self.op.output_fields: 1309 if field == "node": 1310 val = node 1311 elif field == "phys": 1312 val = vol["dev"] 1313 elif field == "vg": 1314 val = vol["vg"] 1315 elif field == "name": 1316 val = vol["name"] 1317 elif field == "size": 1318 val = int(float(vol["size"])) 1319 elif field == "instance": 1320 val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-") 1321 else: 1322 raise errors.ParameterError(field) 1323 node_output.append(str(val)) 1324 1325 output.append(node_output) 1326 1327 return output
1328 1329
1330 -class LUNodeQueryStorage(NoHooksLU):
1331 """Logical unit for getting information on storage units on node(s). 1332 1333 """ 1334 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) 1335 REQ_BGL = False 1336
1337 - def CheckArguments(self):
1338 _CheckOutputFields(static=self._FIELDS_STATIC, 1339 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1340 selected=self.op.output_fields)
1341
1342 - def ExpandNames(self):
1343 self.share_locks = ShareAll() 1344 1345 if self.op.nodes: 1346 self.needed_locks = { 1347 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes), 1348 } 1349 else: 1350 self.needed_locks = { 1351 locking.LEVEL_NODE: locking.ALL_SET, 1352 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1353 }
1354
1355 - def Exec(self, feedback_fn):
1356 """Computes the list of nodes and their attributes. 1357 1358 """ 1359 self.nodes = self.owned_locks(locking.LEVEL_NODE) 1360 1361 # Always get name to sort by 1362 if constants.SF_NAME in self.op.output_fields: 1363 fields = self.op.output_fields[:] 1364 else: 1365 fields = [constants.SF_NAME] + self.op.output_fields 1366 1367 # Never ask for node or type as it's only known to the LU 1368 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1369 while extra in fields: 1370 fields.remove(extra) 1371 1372 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1373 name_idx = field_idx[constants.SF_NAME] 1374 1375 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1376 data = self.rpc.call_storage_list(self.nodes, 1377 self.op.storage_type, st_args, 1378 self.op.name, fields) 1379 1380 result = [] 1381 1382 for node in utils.NiceSort(self.nodes): 1383 nresult = data[node] 1384 if nresult.offline: 1385 continue 1386 1387 msg = nresult.fail_msg 1388 if msg: 1389 self.LogWarning("Can't get storage data from node %s: %s", node, msg) 1390 continue 1391 1392 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1393 1394 for name in utils.NiceSort(rows.keys()): 1395 row = rows[name] 1396 1397 out = [] 1398 1399 for field in self.op.output_fields: 1400 if field == constants.SF_NODE: 1401 val = node 1402 elif field == constants.SF_TYPE: 1403 val = self.op.storage_type 1404 elif field in field_idx: 1405 val = row[field_idx[field]] 1406 else: 1407 raise errors.ParameterError(field) 1408 1409 out.append(val) 1410 1411 result.append(out) 1412 1413 return result
1414 1415
1416 -class LUNodeRemove(LogicalUnit):
1417 """Logical unit for removing a node. 1418 1419 """ 1420 HPATH = "node-remove" 1421 HTYPE = constants.HTYPE_NODE 1422
1423 - def BuildHooksEnv(self):
1424 """Build hooks env. 1425 1426 """ 1427 return { 1428 "OP_TARGET": self.op.node_name, 1429 "NODE_NAME": self.op.node_name, 1430 }
1431
1432 - def BuildHooksNodes(self):
1433 """Build hooks nodes. 1434 1435 This doesn't run on the target node in the pre phase as a failed 1436 node would then be impossible to remove. 1437 1438 """ 1439 all_nodes = self.cfg.GetNodeList() 1440 try: 1441 all_nodes.remove(self.op.node_name) 1442 except ValueError: 1443 pass 1444 return (all_nodes, all_nodes)
1445
1446 - def CheckPrereq(self):
1447 """Check prerequisites. 1448 1449 This checks: 1450 - the node exists in the configuration 1451 - it does not have primary or secondary instances 1452 - it's not the master 1453 1454 Any errors are signaled by raising errors.OpPrereqError. 1455 1456 """ 1457 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name) 1458 node = self.cfg.GetNodeInfo(self.op.node_name) 1459 assert node is not None 1460 1461 masternode = self.cfg.GetMasterNode() 1462 if node.name == masternode: 1463 raise errors.OpPrereqError("Node is the master node, failover to another" 1464 " node is required", errors.ECODE_INVAL) 1465 1466 for instance_name, instance in self.cfg.GetAllInstancesInfo().items(): 1467 if node.name in instance.all_nodes: 1468 raise errors.OpPrereqError("Instance %s is still running on the node," 1469 " please remove first" % instance_name, 1470 errors.ECODE_INVAL) 1471 self.op.node_name = node.name 1472 self.node = node
1473
1474 - def Exec(self, feedback_fn):
1475 """Removes the node from the cluster. 1476 1477 """ 1478 node = self.node 1479 logging.info("Stopping the node daemon and removing configs from node %s", 1480 node.name) 1481 1482 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1483 1484 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1485 "Not owning BGL" 1486 1487 # Promote nodes to master candidate as needed 1488 AdjustCandidatePool(self, exceptions=[node.name]) 1489 self.context.RemoveNode(node.name) 1490 1491 # Run post hooks on the node before it's removed 1492 RunPostHook(self, node.name) 1493 1494 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup) 1495 msg = result.fail_msg 1496 if msg: 1497 self.LogWarning("Errors encountered on the remote node while leaving" 1498 " the cluster: %s", msg) 1499 1500 # Remove node from our /etc/hosts 1501 if self.cfg.GetClusterInfo().modify_etc_hosts: 1502 master_node = self.cfg.GetMasterNode() 1503 result = self.rpc.call_etc_hosts_modify(master_node, 1504 constants.ETC_HOSTS_REMOVE, 1505 node.name, None) 1506 result.Raise("Can't update hosts file with new host data") 1507 RedistributeAncillaryFiles(self)
1508 1509
1510 -class LURepairNodeStorage(NoHooksLU):
1511 """Repairs the volume group on a node. 1512 1513 """ 1514 REQ_BGL = False 1515
1516 - def CheckArguments(self):
1517 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name) 1518 1519 storage_type = self.op.storage_type 1520 1521 if (constants.SO_FIX_CONSISTENCY not in 1522 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1523 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1524 " repaired" % storage_type, 1525 errors.ECODE_INVAL)
1526
1527 - def ExpandNames(self):
1528 self.needed_locks = { 1529 locking.LEVEL_NODE: [self.op.node_name], 1530 }
1531
1532 - def _CheckFaultyDisks(self, instance, node_name):
1533 """Ensure faulty disks abort the opcode or at least warn.""" 1534 try: 1535 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1536 node_name, True): 1537 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1538 " node '%s'" % (instance.name, node_name), 1539 errors.ECODE_STATE) 1540 except errors.OpPrereqError, err: 1541 if self.op.ignore_consistency: 1542 self.LogWarning(str(err.args[0])) 1543 else: 1544 raise
1545
1546 - def CheckPrereq(self):
1547 """Check prerequisites. 1548 1549 """ 1550 # Check whether any instance on this node has faulty disks 1551 for inst in _GetNodeInstances(self.cfg, self.op.node_name): 1552 if not inst.disks_active: 1553 continue 1554 check_nodes = set(inst.all_nodes) 1555 check_nodes.discard(self.op.node_name) 1556 for inst_node_name in check_nodes: 1557 self._CheckFaultyDisks(inst, inst_node_name)
1558
1559 - def Exec(self, feedback_fn):
1560 feedback_fn("Repairing storage unit '%s' on %s ..." % 1561 (self.op.name, self.op.node_name)) 1562 1563 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1564 result = self.rpc.call_storage_execute(self.op.node_name, 1565 self.op.storage_type, st_args, 1566 self.op.name, 1567 constants.SO_FIX_CONSISTENCY) 1568 result.Raise("Failed to repair storage unit '%s' on %s" % 1569 (self.op.name, self.op.node_name))
1570