Package ganeti :: Package cmdlib :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with nodes.""" 
  32   
  33  import logging 
  34  import operator 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import netutils 
  40  from ganeti import objects 
  41  from ganeti import opcodes 
  42  import ganeti.rpc.node as rpc 
  43  from ganeti import utils 
  44  from ganeti.masterd import iallocator 
  45   
  46  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
  47  from ganeti.cmdlib.common import CheckParamsNotGlobal, \ 
  48    MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  49    IsExclusiveStorageEnabledNode, CheckNodePVs, \ 
  50    RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ 
  51    CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ 
  52    AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ 
  53    GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ 
  54    FindFaultyInstanceDisks, CheckStorageTypeEnabled, CreateNewClientCert, \ 
  55    AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \ 
  56    EnsureKvmdOnNodes 
  57   
  58   
59 -def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate. 61 62 """ 63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size 64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) 65 # the new node will increase mc_max with one, so: 66 mc_should = min(mc_should + 1, cp_size) 67 return mc_now < mc_should
68 69
70 -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip. 72 73 @type lu: L{LogicalUnit} 74 @param lu: the LU on behalf of which we make the check 75 @type node: L{objects.Node} 76 @param node: the node to check 77 @type secondary_ip: string 78 @param secondary_ip: the ip to check 79 @type prereq: boolean 80 @param prereq: whether to throw a prerequisite or an execute error 81 @raise errors.OpPrereqError: if the node doesn't have the ip, 82 and prereq=True 83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False 84 85 """ 86 # this can be called with a new node, which has no UUID yet, so perform the 87 # RPC call using its name 88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) 89 result.Raise("Failure checking secondary ip on node %s" % node.name, 90 prereq=prereq, ecode=errors.ECODE_ENVIRON) 91 if not result.payload: 92 msg = ("Node claims it doesn't have the secondary ip you gave (%s)," 93 " please fix and re-run this command" % secondary_ip) 94 if prereq: 95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) 96 else: 97 raise errors.OpExecError(msg)
98 99
100 -class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster. 102 103 """ 104 HPATH = "node-add" 105 HTYPE = constants.HTYPE_NODE 106 _NFLAGS = ["master_capable", "vm_capable"] 107
108 - def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily() 110 # validate/normalize the node name 111 self.hostname = netutils.GetHostname(name=self.op.node_name, 112 family=self.primary_ip_family) 113 self.op.node_name = self.hostname.name 114 115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): 116 raise errors.OpPrereqError("Cannot readd the master node", 117 errors.ECODE_STATE) 118 119 if self.op.readd and self.op.group: 120 raise errors.OpPrereqError("Cannot pass a node group when a node is" 121 " being readded", errors.ECODE_INVAL)
122
123 - def BuildHooksEnv(self):
124 """Build hooks env. 125 126 This will run on all nodes before, and on all nodes + the new node after. 127 128 """ 129 return { 130 "OP_TARGET": self.op.node_name, 131 "NODE_NAME": self.op.node_name, 132 "NODE_PIP": self.op.primary_ip, 133 "NODE_SIP": self.op.secondary_ip, 134 "MASTER_CAPABLE": str(self.op.master_capable), 135 "VM_CAPABLE": str(self.op.vm_capable), 136 }
137
138 - def BuildHooksNodes(self):
139 """Build hooks nodes. 140 141 """ 142 hook_nodes = self.cfg.GetNodeList() 143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) 144 if new_node_info is not None: 145 # Exclude added node 146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) 147 148 # add the new node as post hook node by name; it does not have an UUID yet 149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 - def CheckPrereq(self):
155 """Check prerequisites. 156 157 This checks: 158 - the new node is not already in the config 159 - it is resolvable 160 - its parameters (single/dual homed) matches the cluster 161 162 Any errors are signaled by raising errors.OpPrereqError. 163 164 """ 165 node_name = self.hostname.name 166 self.op.primary_ip = self.hostname.ip 167 if self.op.secondary_ip is None: 168 if self.primary_ip_family == netutils.IP6Address.family: 169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 170 " IPv4 address must be given as secondary", 171 errors.ECODE_INVAL) 172 self.op.secondary_ip = self.op.primary_ip 173 174 secondary_ip = self.op.secondary_ip 175 if not netutils.IP4Address.IsValid(secondary_ip): 176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 177 " address" % secondary_ip, errors.ECODE_INVAL) 178 179 existing_node_info = self.cfg.GetNodeInfoByName(node_name) 180 if not self.op.readd and existing_node_info is not None: 181 raise errors.OpPrereqError("Node %s is already in the configuration" % 182 node_name, errors.ECODE_EXISTS) 183 elif self.op.readd and existing_node_info is None: 184 raise errors.OpPrereqError("Node %s is not in the configuration" % 185 node_name, errors.ECODE_NOENT) 186 187 self.changed_primary_ip = False 188 189 for existing_node in self.cfg.GetAllNodesInfo().values(): 190 if self.op.readd and node_name == existing_node.name: 191 if existing_node.secondary_ip != secondary_ip: 192 raise errors.OpPrereqError("Readded node doesn't have the same IP" 193 " address configuration as before", 194 errors.ECODE_INVAL) 195 if existing_node.primary_ip != self.op.primary_ip: 196 self.changed_primary_ip = True 197 198 continue 199 200 if (existing_node.primary_ip == self.op.primary_ip or 201 existing_node.secondary_ip == self.op.primary_ip or 202 existing_node.primary_ip == secondary_ip or 203 existing_node.secondary_ip == secondary_ip): 204 raise errors.OpPrereqError("New node ip address(es) conflict with" 205 " existing node %s" % existing_node.name, 206 errors.ECODE_NOTUNIQUE) 207 208 # After this 'if' block, None is no longer a valid value for the 209 # _capable op attributes 210 if self.op.readd: 211 assert existing_node_info is not None, \ 212 "Can't retrieve locked node %s" % node_name 213 for attr in self._NFLAGS: 214 if getattr(self.op, attr) is None: 215 setattr(self.op, attr, getattr(existing_node_info, attr)) 216 else: 217 for attr in self._NFLAGS: 218 if getattr(self.op, attr) is None: 219 setattr(self.op, attr, True) 220 221 if self.op.readd and not self.op.vm_capable: 222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) 223 if pri or sec: 224 raise errors.OpPrereqError("Node %s being re-added with vm_capable" 225 " flag set to false, but it already holds" 226 " instances" % node_name, 227 errors.ECODE_STATE) 228 229 # check that the type of the node (single versus dual homed) is the 230 # same as for the master 231 myself = self.cfg.GetMasterNodeInfo() 232 master_singlehomed = myself.secondary_ip == myself.primary_ip 233 newbie_singlehomed = secondary_ip == self.op.primary_ip 234 if master_singlehomed != newbie_singlehomed: 235 if master_singlehomed: 236 raise errors.OpPrereqError("The master has no secondary ip but the" 237 " new node has one", 238 errors.ECODE_INVAL) 239 else: 240 raise errors.OpPrereqError("The master has a secondary ip but the" 241 " new node doesn't have one", 242 errors.ECODE_INVAL) 243 244 # checks reachability 245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): 246 raise errors.OpPrereqError("Node not reachable by ping", 247 errors.ECODE_ENVIRON) 248 249 if not newbie_singlehomed: 250 # check reachability from my secondary ip to newbie's secondary ip 251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, 252 source=myself.secondary_ip): 253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 254 " based ping to node daemon port", 255 errors.ECODE_ENVIRON) 256 257 if self.op.readd: 258 exceptions = [existing_node_info.uuid] 259 else: 260 exceptions = [] 261 262 if self.op.master_capable: 263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) 264 else: 265 self.master_candidate = False 266 267 self.node_group = None 268 if self.op.readd: 269 self.new_node = existing_node_info 270 self.node_group = existing_node_info.group 271 else: 272 self.node_group = self.cfg.LookupNodeGroup(self.op.group) 273 self.new_node = objects.Node(name=node_name, 274 primary_ip=self.op.primary_ip, 275 secondary_ip=secondary_ip, 276 master_candidate=self.master_candidate, 277 offline=False, drained=False, 278 group=self.node_group, ndparams={}) 279 280 if self.op.ndparams: 281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 283 "node", "cluster or group") 284 285 if self.op.hv_state: 286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 287 288 if self.op.disk_state: 289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 290 291 # TODO: If we need to have multiple DnsOnlyRunner we probably should make 292 # it a property on the base class. 293 rpcrunner = rpc.DnsOnlyRunner() 294 result = rpcrunner.call_version([node_name])[node_name] 295 result.Raise("Can't get version information from node %s" % node_name, 296 prereq=True) 297 if constants.PROTOCOL_VERSION == result.payload: 298 logging.info("Communication to node %s fine, sw version %s match", 299 node_name, result.payload) 300 else: 301 raise errors.OpPrereqError("Version mismatch master version %s," 302 " node version %s" % 303 (constants.PROTOCOL_VERSION, result.payload), 304 errors.ECODE_ENVIRON) 305 306 vg_name = self.cfg.GetVGName() 307 if vg_name is not None: 308 vparams = {constants.NV_PVLIST: [vg_name]} 309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) 310 cname = self.cfg.GetClusterName() 311 result = rpcrunner.call_node_verify_light( 312 [node_name], vparams, cname, 313 self.cfg.GetClusterInfo().hvparams, 314 {node_name: self.node_group}, 315 self.cfg.GetAllNodeGroupsInfoDict() 316 )[node_name] 317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) 318 if errmsgs: 319 raise errors.OpPrereqError("Checks on node PVs failed: %s" % 320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
322 - def _InitOpenVSwitch(self):
323 filled_ndparams = self.cfg.GetClusterInfo().FillND( 324 self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) 325 326 ovs = filled_ndparams.get(constants.ND_OVS, None) 327 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) 328 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) 329 330 if ovs: 331 if not ovs_link: 332 self.LogInfo("No physical interface for OpenvSwitch was given." 333 " OpenvSwitch will not have an outside connection. This" 334 " might not be what you want.") 335 336 result = self.rpc.call_node_configure_ovs( 337 self.new_node.name, ovs_name, ovs_link) 338 result.Raise("Failed to initialize OpenVSwitch on new node")
339
340 - def Exec(self, feedback_fn):
341 """Adds the new node to the cluster. 342 343 """ 344 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 345 "Not owning BGL" 346 347 # We adding a new node so we assume it's powered 348 self.new_node.powered = True 349 350 # for re-adds, reset the offline/drained/master-candidate flags; 351 # we need to reset here, otherwise offline would prevent RPC calls 352 # later in the procedure; this also means that if the re-add 353 # fails, we are left with a non-offlined, broken node 354 if self.op.readd: 355 self.new_node.offline = False 356 self.new_node.drained = False 357 self.LogInfo("Readding a node, the offline/drained flags were reset") 358 # if we demote the node, we do cleanup later in the procedure 359 self.new_node.master_candidate = self.master_candidate 360 if self.changed_primary_ip: 361 self.new_node.primary_ip = self.op.primary_ip 362 363 # copy the master/vm_capable flags 364 for attr in self._NFLAGS: 365 setattr(self.new_node, attr, getattr(self.op, attr)) 366 367 # notify the user about any possible mc promotion 368 if self.new_node.master_candidate: 369 self.LogInfo("Node will be a master candidate") 370 371 if self.op.ndparams: 372 self.new_node.ndparams = self.op.ndparams 373 else: 374 self.new_node.ndparams = {} 375 376 if self.op.hv_state: 377 self.new_node.hv_state_static = self.new_hv_state 378 379 if self.op.disk_state: 380 self.new_node.disk_state_static = self.new_disk_state 381 382 # Add node to our /etc/hosts, and add key to known_hosts 383 if self.cfg.GetClusterInfo().modify_etc_hosts: 384 master_node = self.cfg.GetMasterNode() 385 result = self.rpc.call_etc_hosts_modify( 386 master_node, constants.ETC_HOSTS_ADD, self.hostname.name, 387 self.hostname.ip) 388 result.Raise("Can't update hosts file with new host data") 389 390 if self.new_node.secondary_ip != self.new_node.primary_ip: 391 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, 392 False) 393 394 node_verifier_uuids = [self.cfg.GetMasterNode()] 395 node_verify_param = { 396 constants.NV_NODELIST: ([self.new_node.name], {}), 397 # TODO: do a node-net-test as well? 398 } 399 400 result = self.rpc.call_node_verify( 401 node_verifier_uuids, node_verify_param, 402 self.cfg.GetClusterName(), 403 self.cfg.GetClusterInfo().hvparams, 404 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)}, 405 self.cfg.GetAllNodeGroupsInfoDict() 406 ) 407 for verifier in node_verifier_uuids: 408 result[verifier].Raise("Cannot communicate with node %s" % verifier) 409 nl_payload = result[verifier].payload[constants.NV_NODELIST] 410 if nl_payload: 411 for failed in nl_payload: 412 feedback_fn("ssh/hostname verification failed" 413 " (checking from %s): %s" % 414 (verifier, nl_payload[failed])) 415 raise errors.OpExecError("ssh/hostname verification failed") 416 417 self._InitOpenVSwitch() 418 419 if self.op.readd: 420 self.context.ReaddNode(self.new_node) 421 RedistributeAncillaryFiles(self) 422 # make sure we redistribute the config 423 self.cfg.Update(self.new_node, feedback_fn) 424 # and make sure the new node will not have old files around 425 if not self.new_node.master_candidate: 426 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) 427 result.Warn("Node failed to demote itself from master candidate status", 428 self.LogWarning) 429 else: 430 self.context.AddNode(self.new_node, self.proc.GetECId()) 431 RedistributeAncillaryFiles(self) 432 433 cluster = self.cfg.GetClusterInfo() 434 # We create a new certificate even if the node is readded 435 digest = CreateNewClientCert(self, self.new_node.uuid) 436 if self.new_node.master_candidate: 437 utils.AddNodeToCandidateCerts(self.new_node.uuid, digest, 438 cluster.candidate_certs) 439 self.cfg.Update(cluster, feedback_fn) 440 else: 441 if self.new_node.uuid in cluster.candidate_certs: 442 utils.RemoveNodeFromCandidateCerts(self.new_node.uuid, 443 cluster.candidate_certs) 444 self.cfg.Update(cluster, feedback_fn) 445 446 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
447 448
449 -class LUNodeSetParams(LogicalUnit):
450 """Modifies the parameters of a node. 451 452 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) 453 to the node role (as _ROLE_*) 454 @cvar _R2F: a dictionary from node role to tuples of flags 455 @cvar _FLAGS: a list of attribute names corresponding to the flags 456 457 """ 458 HPATH = "node-modify" 459 HTYPE = constants.HTYPE_NODE 460 REQ_BGL = False 461 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) 462 _F2R = { 463 (True, False, False): _ROLE_CANDIDATE, 464 (False, True, False): _ROLE_DRAINED, 465 (False, False, True): _ROLE_OFFLINE, 466 (False, False, False): _ROLE_REGULAR, 467 } 468 _R2F = dict((v, k) for k, v in _F2R.items()) 469 _FLAGS = ["master_candidate", "drained", "offline"] 470
471 - def CheckArguments(self):
472 (self.op.node_uuid, self.op.node_name) = \ 473 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 474 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, 475 self.op.master_capable, self.op.vm_capable, 476 self.op.secondary_ip, self.op.ndparams, self.op.hv_state, 477 self.op.disk_state] 478 if all_mods.count(None) == len(all_mods): 479 raise errors.OpPrereqError("Please pass at least one modification", 480 errors.ECODE_INVAL) 481 if all_mods.count(True) > 1: 482 raise errors.OpPrereqError("Can't set the node into more than one" 483 " state at the same time", 484 errors.ECODE_INVAL) 485 486 # Boolean value that tells us whether we might be demoting from MC 487 self.might_demote = (self.op.master_candidate is False or 488 self.op.offline is True or 489 self.op.drained is True or 490 self.op.master_capable is False) 491 492 if self.op.secondary_ip: 493 if not netutils.IP4Address.IsValid(self.op.secondary_ip): 494 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" 495 " address" % self.op.secondary_ip, 496 errors.ECODE_INVAL) 497 498 self.lock_all = self.op.auto_promote and self.might_demote 499 self.lock_instances = self.op.secondary_ip is not None
500
501 - def _InstanceFilter(self, instance):
502 """Filter for getting affected instances. 503 504 """ 505 return (instance.disk_template in constants.DTS_INT_MIRROR and 506 self.op.node_uuid in instance.all_nodes)
507
508 - def ExpandNames(self):
509 if self.lock_all: 510 self.needed_locks = { 511 locking.LEVEL_NODE: locking.ALL_SET, 512 513 # Block allocations when all nodes are locked 514 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 515 } 516 else: 517 self.needed_locks = { 518 locking.LEVEL_NODE: self.op.node_uuid, 519 } 520 521 # Since modifying a node can have severe effects on currently running 522 # operations the resource lock is at least acquired in shared mode 523 self.needed_locks[locking.LEVEL_NODE_RES] = \ 524 self.needed_locks[locking.LEVEL_NODE] 525 526 # Get all locks except nodes in shared mode; they are not used for anything 527 # but read-only access 528 self.share_locks = ShareAll() 529 self.share_locks[locking.LEVEL_NODE] = 0 530 self.share_locks[locking.LEVEL_NODE_RES] = 0 531 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 532 533 if self.lock_instances: 534 self.needed_locks[locking.LEVEL_INSTANCE] = \ 535 self.cfg.GetInstanceNames( 536 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
537
538 - def BuildHooksEnv(self):
539 """Build hooks env. 540 541 This runs on the master node. 542 543 """ 544 return { 545 "OP_TARGET": self.op.node_name, 546 "MASTER_CANDIDATE": str(self.op.master_candidate), 547 "OFFLINE": str(self.op.offline), 548 "DRAINED": str(self.op.drained), 549 "MASTER_CAPABLE": str(self.op.master_capable), 550 "VM_CAPABLE": str(self.op.vm_capable), 551 }
552
553 - def BuildHooksNodes(self):
554 """Build hooks nodes. 555 556 """ 557 nl = [self.cfg.GetMasterNode(), self.op.node_uuid] 558 return (nl, nl)
559
560 - def CheckPrereq(self):
561 """Check prerequisites. 562 563 This only checks the instance list against the existing names. 564 565 """ 566 node = self.cfg.GetNodeInfo(self.op.node_uuid) 567 if self.lock_instances: 568 affected_instances = \ 569 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) 570 571 # Verify instance locks 572 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 573 wanted_instance_names = frozenset([inst.name for inst in 574 affected_instances.values()]) 575 if wanted_instance_names - owned_instance_names: 576 raise errors.OpPrereqError("Instances affected by changing node %s's" 577 " secondary IP address have changed since" 578 " locks were acquired, wanted '%s', have" 579 " '%s'; retry the operation" % 580 (node.name, 581 utils.CommaJoin(wanted_instance_names), 582 utils.CommaJoin(owned_instance_names)), 583 errors.ECODE_STATE) 584 else: 585 affected_instances = None 586 587 if (self.op.master_candidate is not None or 588 self.op.drained is not None or 589 self.op.offline is not None): 590 # we can't change the master's node flags 591 if node.uuid == self.cfg.GetMasterNode(): 592 raise errors.OpPrereqError("The master role can be changed" 593 " only via master-failover", 594 errors.ECODE_INVAL) 595 596 if self.op.master_candidate and not node.master_capable: 597 raise errors.OpPrereqError("Node %s is not master capable, cannot make" 598 " it a master candidate" % node.name, 599 errors.ECODE_STATE) 600 601 if self.op.vm_capable is False: 602 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) 603 if ipri or isec: 604 raise errors.OpPrereqError("Node %s hosts instances, cannot unset" 605 " the vm_capable flag" % node.name, 606 errors.ECODE_STATE) 607 608 if node.master_candidate and self.might_demote and not self.lock_all: 609 assert not self.op.auto_promote, "auto_promote set but lock_all not" 610 # check if after removing the current node, we're missing master 611 # candidates 612 (mc_remaining, mc_should, _) = \ 613 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) 614 if mc_remaining < mc_should: 615 raise errors.OpPrereqError("Not enough master candidates, please" 616 " pass auto promote option to allow" 617 " promotion (--auto-promote or RAPI" 618 " auto_promote=True)", errors.ECODE_STATE) 619 620 self.old_flags = old_flags = (node.master_candidate, 621 node.drained, node.offline) 622 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) 623 self.old_role = old_role = self._F2R[old_flags] 624 625 # Check for ineffective changes 626 for attr in self._FLAGS: 627 if getattr(self.op, attr) is False and getattr(node, attr) is False: 628 self.LogInfo("Ignoring request to unset flag %s, already unset", attr) 629 setattr(self.op, attr, None) 630 631 # Past this point, any flag change to False means a transition 632 # away from the respective state, as only real changes are kept 633 634 # TODO: We might query the real power state if it supports OOB 635 if SupportsOob(self.cfg, node): 636 if self.op.offline is False and not (node.powered or 637 self.op.powered is True): 638 raise errors.OpPrereqError(("Node %s needs to be turned on before its" 639 " offline status can be reset") % 640 self.op.node_name, errors.ECODE_STATE) 641 elif self.op.powered is not None: 642 raise errors.OpPrereqError(("Unable to change powered state for node %s" 643 " as it does not support out-of-band" 644 " handling") % self.op.node_name, 645 errors.ECODE_STATE) 646 647 # If we're being deofflined/drained, we'll MC ourself if needed 648 if (self.op.drained is False or self.op.offline is False or 649 (self.op.master_capable and not node.master_capable)): 650 if _DecideSelfPromotion(self): 651 self.op.master_candidate = True 652 self.LogInfo("Auto-promoting node to master candidate") 653 654 # If we're no longer master capable, we'll demote ourselves from MC 655 if self.op.master_capable is False and node.master_candidate: 656 self.LogInfo("Demoting from master candidate") 657 self.op.master_candidate = False 658 659 # Compute new role 660 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 661 if self.op.master_candidate: 662 new_role = self._ROLE_CANDIDATE 663 elif self.op.drained: 664 new_role = self._ROLE_DRAINED 665 elif self.op.offline: 666 new_role = self._ROLE_OFFLINE 667 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: 668 # False is still in new flags, which means we're un-setting (the 669 # only) True flag 670 new_role = self._ROLE_REGULAR 671 else: # no new flags, nothing, keep old role 672 new_role = old_role 673 674 self.new_role = new_role 675 676 if old_role == self._ROLE_OFFLINE and new_role != old_role: 677 # Trying to transition out of offline status 678 result = self.rpc.call_version([node.uuid])[node.uuid] 679 if result.fail_msg: 680 raise errors.OpPrereqError("Node %s is being de-offlined but fails" 681 " to report its version: %s" % 682 (node.name, result.fail_msg), 683 errors.ECODE_STATE) 684 else: 685 self.LogWarning("Transitioning node from offline to online state" 686 " without using re-add. Please make sure the node" 687 " is healthy!") 688 689 # When changing the secondary ip, verify if this is a single-homed to 690 # multi-homed transition or vice versa, and apply the relevant 691 # restrictions. 692 if self.op.secondary_ip: 693 # Ok even without locking, because this can't be changed by any LU 694 master = self.cfg.GetMasterNodeInfo() 695 master_singlehomed = master.secondary_ip == master.primary_ip 696 if master_singlehomed and self.op.secondary_ip != node.primary_ip: 697 if self.op.force and node.uuid == master.uuid: 698 self.LogWarning("Transitioning from single-homed to multi-homed" 699 " cluster; all nodes will require a secondary IP" 700 " address") 701 else: 702 raise errors.OpPrereqError("Changing the secondary ip on a" 703 " single-homed cluster requires the" 704 " --force option to be passed, and the" 705 " target node to be the master", 706 errors.ECODE_INVAL) 707 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: 708 if self.op.force and node.uuid == master.uuid: 709 self.LogWarning("Transitioning from multi-homed to single-homed" 710 " cluster; secondary IP addresses will have to be" 711 " removed") 712 else: 713 raise errors.OpPrereqError("Cannot set the secondary IP to be the" 714 " same as the primary IP on a multi-homed" 715 " cluster, unless the --force option is" 716 " passed, and the target node is the" 717 " master", errors.ECODE_INVAL) 718 719 assert not (set([inst.name for inst in affected_instances.values()]) - 720 self.owned_locks(locking.LEVEL_INSTANCE)) 721 722 if node.offline: 723 if affected_instances: 724 msg = ("Cannot change secondary IP address: offline node has" 725 " instances (%s) configured to use it" % 726 utils.CommaJoin( 727 [inst.name for inst in affected_instances.values()])) 728 raise errors.OpPrereqError(msg, errors.ECODE_STATE) 729 else: 730 # On online nodes, check that no instances are running, and that 731 # the node has the new ip and we can reach it. 732 for instance in affected_instances.values(): 733 CheckInstanceState(self, instance, INSTANCE_DOWN, 734 msg="cannot change secondary ip") 735 736 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) 737 if master.uuid != node.uuid: 738 # check reachability from master secondary ip to new secondary ip 739 if not netutils.TcpPing(self.op.secondary_ip, 740 constants.DEFAULT_NODED_PORT, 741 source=master.secondary_ip): 742 raise errors.OpPrereqError("Node secondary ip not reachable by TCP" 743 " based ping to node daemon port", 744 errors.ECODE_ENVIRON) 745 746 if self.op.ndparams: 747 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) 748 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 749 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", 750 "node", "cluster or group") 751 self.new_ndparams = new_ndparams 752 753 if self.op.hv_state: 754 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 755 node.hv_state_static) 756 757 if self.op.disk_state: 758 self.new_disk_state = \ 759 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
760
761 - def Exec(self, feedback_fn):
762 """Modifies a node. 763 764 """ 765 node = self.cfg.GetNodeInfo(self.op.node_uuid) 766 result = [] 767 768 if self.op.ndparams: 769 node.ndparams = self.new_ndparams 770 771 if self.op.powered is not None: 772 node.powered = self.op.powered 773 774 if self.op.hv_state: 775 node.hv_state_static = self.new_hv_state 776 777 if self.op.disk_state: 778 node.disk_state_static = self.new_disk_state 779 780 for attr in ["master_capable", "vm_capable"]: 781 val = getattr(self.op, attr) 782 if val is not None: 783 setattr(node, attr, val) 784 result.append((attr, str(val))) 785 786 if self.new_role != self.old_role: 787 # Tell the node to demote itself, if no longer MC and not offline 788 if self.old_role == self._ROLE_CANDIDATE and \ 789 self.new_role != self._ROLE_OFFLINE: 790 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg 791 if msg: 792 self.LogWarning("Node failed to demote itself: %s", msg) 793 794 new_flags = self._R2F[self.new_role] 795 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): 796 if of != nf: 797 result.append((desc, str(nf))) 798 (node.master_candidate, node.drained, node.offline) = new_flags 799 800 # we locked all nodes, we adjust the CP before updating this node 801 if self.lock_all: 802 AdjustCandidatePool(self, [node.uuid], feedback_fn) 803 804 cluster = self.cfg.GetClusterInfo() 805 # if node gets promoted, grant RPC priviledges 806 if self.new_role == self._ROLE_CANDIDATE: 807 AddNodeCertToCandidateCerts(self, node.uuid, cluster) 808 # if node is demoted, revoke RPC priviledges 809 if self.old_role == self._ROLE_CANDIDATE: 810 RemoveNodeCertFromCandidateCerts(node.uuid, cluster) 811 812 if self.op.secondary_ip: 813 node.secondary_ip = self.op.secondary_ip 814 result.append(("secondary_ip", self.op.secondary_ip)) 815 816 # this will trigger configuration file update, if needed 817 self.cfg.Update(node, feedback_fn) 818 819 # this will trigger job queue propagation or cleanup if the mc 820 # flag changed 821 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: 822 self.context.ReaddNode(node) 823 824 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid]) 825 826 return result
827 828
829 -class LUNodePowercycle(NoHooksLU):
830 """Powercycles a node. 831 832 """ 833 REQ_BGL = False 834
835 - def CheckArguments(self):
836 (self.op.node_uuid, self.op.node_name) = \ 837 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 838 839 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: 840 raise errors.OpPrereqError("The node is the master and the force" 841 " parameter was not set", 842 errors.ECODE_INVAL)
843
844 - def ExpandNames(self):
845 """Locking for PowercycleNode. 846 847 This is a last-resort option and shouldn't block on other 848 jobs. Therefore, we grab no locks. 849 850 """ 851 self.needed_locks = {}
852
853 - def Exec(self, feedback_fn):
854 """Reboots a node. 855 856 """ 857 default_hypervisor = self.cfg.GetHypervisorType() 858 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] 859 result = self.rpc.call_node_powercycle(self.op.node_uuid, 860 default_hypervisor, 861 hvparams) 862 result.Raise("Failed to schedule the reboot") 863 return result.payload
864 865
866 -def _GetNodeInstancesInner(cfg, fn):
867 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
868 869
870 -def _GetNodePrimaryInstances(cfg, node_uuid):
871 """Returns primary instances on a node. 872 873 """ 874 return _GetNodeInstancesInner(cfg, 875 lambda inst: node_uuid == inst.primary_node)
876 877
878 -def _GetNodeSecondaryInstances(cfg, node_uuid):
879 """Returns secondary instances on a node. 880 881 """ 882 return _GetNodeInstancesInner(cfg, 883 lambda inst: node_uuid in inst.secondary_nodes)
884 885
886 -def _GetNodeInstances(cfg, node_uuid):
887 """Returns a list of all primary and secondary instances on a node. 888 889 """ 890 891 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
892 893
894 -class LUNodeEvacuate(NoHooksLU):
895 """Evacuates instances off a list of nodes. 896 897 """ 898 REQ_BGL = False 899
900 - def CheckArguments(self):
901 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
902
903 - def ExpandNames(self):
904 (self.op.node_uuid, self.op.node_name) = \ 905 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 906 907 if self.op.remote_node is not None: 908 (self.op.remote_node_uuid, self.op.remote_node) = \ 909 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 910 self.op.remote_node) 911 assert self.op.remote_node 912 913 if self.op.node_uuid == self.op.remote_node_uuid: 914 raise errors.OpPrereqError("Can not use evacuated node as a new" 915 " secondary node", errors.ECODE_INVAL) 916 917 if self.op.mode != constants.NODE_EVAC_SEC: 918 raise errors.OpPrereqError("Without the use of an iallocator only" 919 " secondary instances can be evacuated", 920 errors.ECODE_INVAL) 921 922 # Declare locks 923 self.share_locks = ShareAll() 924 self.needed_locks = { 925 locking.LEVEL_INSTANCE: [], 926 locking.LEVEL_NODEGROUP: [], 927 locking.LEVEL_NODE: [], 928 } 929 930 # Determine nodes (via group) optimistically, needs verification once locks 931 # have been acquired 932 self.lock_nodes = self._DetermineNodes()
933
934 - def _DetermineNodes(self):
935 """Gets the list of node UUIDs to operate on. 936 937 """ 938 if self.op.remote_node is None: 939 # Iallocator will choose any node(s) in the same group 940 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) 941 else: 942 group_nodes = frozenset([self.op.remote_node_uuid]) 943 944 # Determine nodes to be locked 945 return set([self.op.node_uuid]) | group_nodes
946
947 - def _DetermineInstances(self):
948 """Builds list of instances to operate on. 949 950 """ 951 assert self.op.mode in constants.NODE_EVAC_MODES 952 953 if self.op.mode == constants.NODE_EVAC_PRI: 954 # Primary instances only 955 inst_fn = _GetNodePrimaryInstances 956 assert self.op.remote_node is None, \ 957 "Evacuating primary instances requires iallocator" 958 elif self.op.mode == constants.NODE_EVAC_SEC: 959 # Secondary instances only 960 inst_fn = _GetNodeSecondaryInstances 961 else: 962 # All instances 963 assert self.op.mode == constants.NODE_EVAC_ALL 964 inst_fn = _GetNodeInstances 965 # TODO: In 2.6, change the iallocator interface to take an evacuation mode 966 # per instance 967 raise errors.OpPrereqError("Due to an issue with the iallocator" 968 " interface it is not possible to evacuate" 969 " all instances at once; specify explicitly" 970 " whether to evacuate primary or secondary" 971 " instances", 972 errors.ECODE_INVAL) 973 974 return inst_fn(self.cfg, self.op.node_uuid)
975
976 - def DeclareLocks(self, level):
977 if level == locking.LEVEL_INSTANCE: 978 # Lock instances optimistically, needs verification once node and group 979 # locks have been acquired 980 self.needed_locks[locking.LEVEL_INSTANCE] = \ 981 set(i.name for i in self._DetermineInstances()) 982 983 elif level == locking.LEVEL_NODEGROUP: 984 # Lock node groups for all potential target nodes optimistically, needs 985 # verification once nodes have been acquired 986 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 987 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) 988 989 elif level == locking.LEVEL_NODE: 990 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
991
992 - def CheckPrereq(self):
993 # Verify locks 994 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) 995 owned_nodes = self.owned_locks(locking.LEVEL_NODE) 996 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 997 998 need_nodes = self._DetermineNodes() 999 1000 if not owned_nodes.issuperset(need_nodes): 1001 raise errors.OpPrereqError("Nodes in same group as '%s' changed since" 1002 " locks were acquired, current nodes are" 1003 " are '%s', used to be '%s'; retry the" 1004 " operation" % 1005 (self.op.node_name, 1006 utils.CommaJoin(need_nodes), 1007 utils.CommaJoin(owned_nodes)), 1008 errors.ECODE_STATE) 1009 1010 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) 1011 if owned_groups != wanted_groups: 1012 raise errors.OpExecError("Node groups changed since locks were acquired," 1013 " current groups are '%s', used to be '%s';" 1014 " retry the operation" % 1015 (utils.CommaJoin(wanted_groups), 1016 utils.CommaJoin(owned_groups))) 1017 1018 # Determine affected instances 1019 self.instances = self._DetermineInstances() 1020 self.instance_names = [i.name for i in self.instances] 1021 1022 if set(self.instance_names) != owned_instance_names: 1023 raise errors.OpExecError("Instances on node '%s' changed since locks" 1024 " were acquired, current instances are '%s'," 1025 " used to be '%s'; retry the operation" % 1026 (self.op.node_name, 1027 utils.CommaJoin(self.instance_names), 1028 utils.CommaJoin(owned_instance_names))) 1029 1030 if self.instance_names: 1031 self.LogInfo("Evacuating instances from node '%s': %s", 1032 self.op.node_name, 1033 utils.CommaJoin(utils.NiceSort(self.instance_names))) 1034 else: 1035 self.LogInfo("No instances to evacuate from node '%s'", 1036 self.op.node_name) 1037 1038 if self.op.remote_node is not None: 1039 for i in self.instances: 1040 if i.primary_node == self.op.remote_node_uuid: 1041 raise errors.OpPrereqError("Node %s is the primary node of" 1042 " instance %s, cannot use it as" 1043 " secondary" % 1044 (self.op.remote_node, i.name), 1045 errors.ECODE_INVAL)
1046
1047 - def Exec(self, feedback_fn):
1048 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) 1049 1050 if not self.instance_names: 1051 # No instances to evacuate 1052 jobs = [] 1053 1054 elif self.op.iallocator is not None: 1055 # TODO: Implement relocation to other group 1056 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode, 1057 instances=list(self.instance_names)) 1058 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 1059 1060 ial.Run(self.op.iallocator) 1061 1062 if not ial.success: 1063 raise errors.OpPrereqError("Can't compute node evacuation using" 1064 " iallocator '%s': %s" % 1065 (self.op.iallocator, ial.info), 1066 errors.ECODE_NORES) 1067 1068 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) 1069 1070 elif self.op.remote_node is not None: 1071 assert self.op.mode == constants.NODE_EVAC_SEC 1072 jobs = [ 1073 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, 1074 remote_node=self.op.remote_node, 1075 disks=[], 1076 mode=constants.REPLACE_DISK_CHG, 1077 early_release=self.op.early_release)] 1078 for instance_name in self.instance_names] 1079 1080 else: 1081 raise errors.ProgrammerError("No iallocator or remote node") 1082 1083 return ResultWithJobs(jobs)
1084 1085
1086 -class LUNodeMigrate(LogicalUnit):
1087 """Migrate all instances from a node. 1088 1089 """ 1090 HPATH = "node-migrate" 1091 HTYPE = constants.HTYPE_NODE 1092 REQ_BGL = False 1093
1094 - def CheckArguments(self):
1095 pass
1096
1097 - def ExpandNames(self):
1098 (self.op.node_uuid, self.op.node_name) = \ 1099 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1100 1101 self.share_locks = ShareAll() 1102 self.needed_locks = { 1103 locking.LEVEL_NODE: [self.op.node_uuid], 1104 }
1105
1106 - def BuildHooksEnv(self):
1107 """Build hooks env. 1108 1109 This runs on the master, the primary and all the secondaries. 1110 1111 """ 1112 return { 1113 "NODE_NAME": self.op.node_name, 1114 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 1115 }
1116
1117 - def BuildHooksNodes(self):
1118 """Build hooks nodes. 1119 1120 """ 1121 nl = [self.cfg.GetMasterNode()] 1122 return (nl, nl)
1123
1124 - def CheckPrereq(self):
1125 pass
1126
1127 - def Exec(self, feedback_fn):
1128 # Prepare jobs for migration instances 1129 jobs = [ 1130 [opcodes.OpInstanceMigrate( 1131 instance_name=inst.name, 1132 mode=self.op.mode, 1133 live=self.op.live, 1134 iallocator=self.op.iallocator, 1135 target_node=self.op.target_node, 1136 allow_runtime_changes=self.op.allow_runtime_changes, 1137 ignore_ipolicy=self.op.ignore_ipolicy)] 1138 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] 1139 1140 # TODO: Run iallocator in this opcode and pass correct placement options to 1141 # OpInstanceMigrate. Since other jobs can modify the cluster between 1142 # running the iallocator and the actual migration, a good consistency model 1143 # will have to be found. 1144 1145 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 1146 frozenset([self.op.node_uuid])) 1147 1148 return ResultWithJobs(jobs)
1149 1150
1151 -def _GetStorageTypeArgs(cfg, storage_type):
1152 """Returns the arguments for a storage type. 1153 1154 """ 1155 # Special case for file storage 1156 1157 if storage_type == constants.ST_FILE: 1158 return [[cfg.GetFileStorageDir()]] 1159 elif storage_type == constants.ST_SHARED_FILE: 1160 dts = cfg.GetClusterInfo().enabled_disk_templates 1161 paths = [] 1162 if constants.DT_SHARED_FILE in dts: 1163 paths.append(cfg.GetSharedFileStorageDir()) 1164 if constants.DT_GLUSTER in dts: 1165 paths.append(cfg.GetGlusterStorageDir()) 1166 return [paths] 1167 else: 1168 return []
1169 1170
1171 -class LUNodeModifyStorage(NoHooksLU):
1172 """Logical unit for modifying a storage volume on a node. 1173 1174 """ 1175 REQ_BGL = False 1176
1177 - def CheckArguments(self):
1178 (self.op.node_uuid, self.op.node_name) = \ 1179 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1180 1181 storage_type = self.op.storage_type 1182 1183 try: 1184 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] 1185 except KeyError: 1186 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1187 " modified" % storage_type, 1188 errors.ECODE_INVAL) 1189 1190 diff = set(self.op.changes.keys()) - modifiable 1191 if diff: 1192 raise errors.OpPrereqError("The following fields can not be modified for" 1193 " storage units of type '%s': %r" % 1194 (storage_type, list(diff)), 1195 errors.ECODE_INVAL)
1196
1197 - def CheckPrereq(self):
1198 """Check prerequisites. 1199 1200 """ 1201 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1202
1203 - def ExpandNames(self):
1204 self.needed_locks = { 1205 locking.LEVEL_NODE: self.op.node_uuid, 1206 }
1207
1208 - def Exec(self, feedback_fn):
1209 """Computes the list of nodes and their attributes. 1210 1211 """ 1212 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1213 result = self.rpc.call_storage_modify(self.op.node_uuid, 1214 self.op.storage_type, st_args, 1215 self.op.name, self.op.changes) 1216 result.Raise("Failed to modify storage unit '%s' on %s" % 1217 (self.op.name, self.op.node_name))
1218 1219
1220 -def _CheckOutputFields(fields, selected):
1221 """Checks whether all selected fields are valid according to fields. 1222 1223 @type fields: L{utils.FieldSet} 1224 @param fields: fields set 1225 @type selected: L{utils.FieldSet} 1226 @param selected: fields set 1227 1228 """ 1229 delta = fields.NonMatching(selected) 1230 if delta: 1231 raise errors.OpPrereqError("Unknown output fields selected: %s" 1232 % ",".join(delta), errors.ECODE_INVAL)
1233 1234
1235 -class LUNodeQueryvols(NoHooksLU):
1236 """Logical unit for getting volumes on node(s). 1237 1238 """ 1239 REQ_BGL = False 1240
1241 - def CheckArguments(self):
1246
1247 - def ExpandNames(self):
1248 self.share_locks = ShareAll() 1249 1250 if self.op.nodes: 1251 self.needed_locks = { 1252 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1253 } 1254 else: 1255 self.needed_locks = { 1256 locking.LEVEL_NODE: locking.ALL_SET, 1257 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1258 }
1259
1260 - def Exec(self, feedback_fn):
1261 """Computes the list of nodes and their attributes. 1262 1263 """ 1264 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1265 volumes = self.rpc.call_node_volumes(node_uuids) 1266 1267 ilist = self.cfg.GetAllInstancesInfo() 1268 vol2inst = MapInstanceLvsToNodes(ilist.values()) 1269 1270 output = [] 1271 for node_uuid in node_uuids: 1272 nresult = volumes[node_uuid] 1273 if nresult.offline: 1274 continue 1275 msg = nresult.fail_msg 1276 if msg: 1277 self.LogWarning("Can't compute volume data on node %s: %s", 1278 self.cfg.GetNodeName(node_uuid), msg) 1279 continue 1280 1281 node_vols = sorted(nresult.payload, 1282 key=operator.itemgetter(constants.VF_DEV)) 1283 1284 for vol in node_vols: 1285 node_output = [] 1286 for field in self.op.output_fields: 1287 if field == constants.VF_NODE: 1288 val = self.cfg.GetNodeName(node_uuid) 1289 elif field == constants.VF_PHYS: 1290 val = vol[constants.VF_DEV] 1291 elif field == constants.VF_VG: 1292 val = vol[constants.VF_VG] 1293 elif field == constants.VF_NAME: 1294 val = vol[constants.VF_NAME] 1295 elif field == constants.VF_SIZE: 1296 val = int(float(vol[constants.VF_SIZE])) 1297 elif field == constants.VF_INSTANCE: 1298 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + 1299 vol[constants.VF_NAME]), None) 1300 if inst is not None: 1301 val = inst.name 1302 else: 1303 val = "-" 1304 else: 1305 raise errors.ParameterError(field) 1306 node_output.append(str(val)) 1307 1308 output.append(node_output) 1309 1310 return output
1311 1312
1313 -class LUNodeQueryStorage(NoHooksLU):
1314 """Logical unit for getting information on storage units on node(s). 1315 1316 """ 1317 REQ_BGL = False 1318
1319 - def CheckArguments(self):
1320 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), 1321 self.op.output_fields)
1322
1323 - def ExpandNames(self):
1324 self.share_locks = ShareAll() 1325 1326 if self.op.nodes: 1327 self.needed_locks = { 1328 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], 1329 } 1330 else: 1331 self.needed_locks = { 1332 locking.LEVEL_NODE: locking.ALL_SET, 1333 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1334 }
1335
1336 - def _DetermineStorageType(self):
1337 """Determines the default storage type of the cluster. 1338 1339 """ 1340 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates 1341 default_storage_type = \ 1342 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] 1343 return default_storage_type
1344
1345 - def CheckPrereq(self):
1346 """Check prerequisites. 1347 1348 """ 1349 if self.op.storage_type: 1350 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1351 self.storage_type = self.op.storage_type 1352 else: 1353 self.storage_type = self._DetermineStorageType() 1354 supported_storage_types = constants.STS_REPORT_NODE_STORAGE 1355 if self.storage_type not in supported_storage_types: 1356 raise errors.OpPrereqError( 1357 "Storage reporting for storage type '%s' is not supported. Please" 1358 " use the --storage-type option to specify one of the supported" 1359 " storage types (%s) or set the default disk template to one that" 1360 " supports storage reporting." % 1361 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1362
1363 - def Exec(self, feedback_fn):
1364 """Computes the list of nodes and their attributes. 1365 1366 """ 1367 if self.op.storage_type: 1368 self.storage_type = self.op.storage_type 1369 else: 1370 self.storage_type = self._DetermineStorageType() 1371 1372 self.node_uuids = self.owned_locks(locking.LEVEL_NODE) 1373 1374 # Always get name to sort by 1375 if constants.SF_NAME in self.op.output_fields: 1376 fields = self.op.output_fields[:] 1377 else: 1378 fields = [constants.SF_NAME] + self.op.output_fields 1379 1380 # Never ask for node or type as it's only known to the LU 1381 for extra in [constants.SF_NODE, constants.SF_TYPE]: 1382 while extra in fields: 1383 fields.remove(extra) 1384 1385 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) 1386 name_idx = field_idx[constants.SF_NAME] 1387 1388 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) 1389 data = self.rpc.call_storage_list(self.node_uuids, 1390 self.storage_type, st_args, 1391 self.op.name, fields) 1392 1393 result = [] 1394 1395 for node_uuid in utils.NiceSort(self.node_uuids): 1396 node_name = self.cfg.GetNodeName(node_uuid) 1397 nresult = data[node_uuid] 1398 if nresult.offline: 1399 continue 1400 1401 msg = nresult.fail_msg 1402 if msg: 1403 self.LogWarning("Can't get storage data from node %s: %s", 1404 node_name, msg) 1405 continue 1406 1407 rows = dict([(row[name_idx], row) for row in nresult.payload]) 1408 1409 for name in utils.NiceSort(rows.keys()): 1410 row = rows[name] 1411 1412 out = [] 1413 1414 for field in self.op.output_fields: 1415 if field == constants.SF_NODE: 1416 val = node_name 1417 elif field == constants.SF_TYPE: 1418 val = self.storage_type 1419 elif field in field_idx: 1420 val = row[field_idx[field]] 1421 else: 1422 raise errors.ParameterError(field) 1423 1424 out.append(val) 1425 1426 result.append(out) 1427 1428 return result
1429 1430
1431 -class LUNodeRemove(LogicalUnit):
1432 """Logical unit for removing a node. 1433 1434 """ 1435 HPATH = "node-remove" 1436 HTYPE = constants.HTYPE_NODE 1437
1438 - def BuildHooksEnv(self):
1439 """Build hooks env. 1440 1441 """ 1442 return { 1443 "OP_TARGET": self.op.node_name, 1444 "NODE_NAME": self.op.node_name, 1445 }
1446
1447 - def BuildHooksNodes(self):
1448 """Build hooks nodes. 1449 1450 This doesn't run on the target node in the pre phase as a failed 1451 node would then be impossible to remove. 1452 1453 """ 1454 all_nodes = self.cfg.GetNodeList() 1455 try: 1456 all_nodes.remove(self.op.node_uuid) 1457 except ValueError: 1458 pass 1459 return (all_nodes, all_nodes)
1460
1461 - def CheckPrereq(self):
1462 """Check prerequisites. 1463 1464 This checks: 1465 - the node exists in the configuration 1466 - it does not have primary or secondary instances 1467 - it's not the master 1468 1469 Any errors are signaled by raising errors.OpPrereqError. 1470 1471 """ 1472 (self.op.node_uuid, self.op.node_name) = \ 1473 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1474 node = self.cfg.GetNodeInfo(self.op.node_uuid) 1475 assert node is not None 1476 1477 masternode = self.cfg.GetMasterNode() 1478 if node.uuid == masternode: 1479 raise errors.OpPrereqError("Node is the master node, failover to another" 1480 " node is required", errors.ECODE_INVAL) 1481 1482 for _, instance in self.cfg.GetAllInstancesInfo().items(): 1483 if node.uuid in instance.all_nodes: 1484 raise errors.OpPrereqError("Instance %s is still running on the node," 1485 " please remove first" % instance.name, 1486 errors.ECODE_INVAL) 1487 self.op.node_name = node.name 1488 self.node = node
1489
1490 - def Exec(self, feedback_fn):
1491 """Removes the node from the cluster. 1492 1493 """ 1494 logging.info("Stopping the node daemon and removing configs from node %s", 1495 self.node.name) 1496 1497 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup 1498 1499 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ 1500 "Not owning BGL" 1501 1502 # Promote nodes to master candidate as needed 1503 AdjustCandidatePool(self, [self.node.uuid], feedback_fn) 1504 self.context.RemoveNode(self.node) 1505 1506 # Run post hooks on the node before it's removed 1507 RunPostHook(self, self.node.name) 1508 1509 # we have to call this by name rather than by UUID, as the node is no longer 1510 # in the config 1511 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) 1512 msg = result.fail_msg 1513 if msg: 1514 self.LogWarning("Errors encountered on the remote node while leaving" 1515 " the cluster: %s", msg) 1516 1517 cluster = self.cfg.GetClusterInfo() 1518 1519 # Remove node from candidate certificate list 1520 if self.node.master_candidate: 1521 utils.RemoveNodeFromCandidateCerts(self.node.uuid, 1522 cluster.candidate_certs) 1523 self.cfg.Update(cluster, feedback_fn) 1524 1525 # Remove node from our /etc/hosts 1526 if cluster.modify_etc_hosts: 1527 master_node_uuid = self.cfg.GetMasterNode() 1528 result = self.rpc.call_etc_hosts_modify(master_node_uuid, 1529 constants.ETC_HOSTS_REMOVE, 1530 self.node.name, None) 1531 result.Raise("Can't update hosts file with new host data") 1532 RedistributeAncillaryFiles(self)
1533 1534
1535 -class LURepairNodeStorage(NoHooksLU):
1536 """Repairs the volume group on a node. 1537 1538 """ 1539 REQ_BGL = False 1540
1541 - def CheckArguments(self):
1542 (self.op.node_uuid, self.op.node_name) = \ 1543 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) 1544 1545 storage_type = self.op.storage_type 1546 1547 if (constants.SO_FIX_CONSISTENCY not in 1548 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): 1549 raise errors.OpPrereqError("Storage units of type '%s' can not be" 1550 " repaired" % storage_type, 1551 errors.ECODE_INVAL)
1552
1553 - def ExpandNames(self):
1554 self.needed_locks = { 1555 locking.LEVEL_NODE: [self.op.node_uuid], 1556 }
1557
1558 - def _CheckFaultyDisks(self, instance, node_uuid):
1559 """Ensure faulty disks abort the opcode or at least warn.""" 1560 try: 1561 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, 1562 node_uuid, True): 1563 raise errors.OpPrereqError("Instance '%s' has faulty disks on" 1564 " node '%s'" % 1565 (instance.name, 1566 self.cfg.GetNodeName(node_uuid)), 1567 errors.ECODE_STATE) 1568 except errors.OpPrereqError, err: 1569 if self.op.ignore_consistency: 1570 self.LogWarning(str(err.args[0])) 1571 else: 1572 raise
1573
1574 - def CheckPrereq(self):
1575 """Check prerequisites. 1576 1577 """ 1578 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) 1579 1580 # Check whether any instance on this node has faulty disks 1581 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): 1582 if not inst.disks_active: 1583 continue 1584 check_nodes = set(inst.all_nodes) 1585 check_nodes.discard(self.op.node_uuid) 1586 for inst_node_uuid in check_nodes: 1587 self._CheckFaultyDisks(inst, inst_node_uuid)
1588
1589 - def Exec(self, feedback_fn):
1590 feedback_fn("Repairing storage unit '%s' on %s ..." % 1591 (self.op.name, self.op.node_name)) 1592 1593 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) 1594 result = self.rpc.call_storage_execute(self.op.node_uuid, 1595 self.op.storage_type, st_args, 1596 self.op.name, 1597 constants.SO_FIX_CONSISTENCY) 1598 result.Raise("Failed to repair storage unit '%s' on %s" % 1599 (self.op.name, self.op.node_name))
1600