Package ganeti :: Package cmdlib :: Module group
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.group

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Logical units dealing with node groups.""" 
 32   
 33  import itertools 
 34  import logging 
 35   
 36  from ganeti import constants 
 37  from ganeti import errors 
 38  from ganeti import locking 
 39  from ganeti import objects 
 40  from ganeti import opcodes 
 41  from ganeti import utils 
 42  from ganeti.masterd import iallocator 
 43  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
 44  from ganeti.cmdlib.common import MergeAndVerifyHvState, \ 
 45    MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \ 
 46    CheckNodeGroupInstances, GetUpdatedIPolicy, \ 
 47    ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \ 
 48    CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceLvsToNodes, \ 
 49    CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \ 
 50    CheckDiskAccessModeConsistency, ConnectInstanceCommunicationNetworkOp 
 51   
 52  import ganeti.masterd.instance 
53 54 55 -class LUGroupAdd(LogicalUnit):
56 """Logical unit for creating node groups. 57 58 """ 59 HPATH = "group-add" 60 HTYPE = constants.HTYPE_GROUP 61 REQ_BGL = False 62
63 - def ExpandNames(self):
64 # We need the new group's UUID here so that we can create and acquire the 65 # corresponding lock. Later, in Exec(), we'll indicate to cfg.AddNodeGroup 66 # that it should not check whether the UUID exists in the configuration. 67 self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) 68 self.needed_locks = {} 69 self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
70
71 - def _CheckIpolicy(self):
72 """Checks the group's ipolicy for consistency and validity. 73 74 """ 75 if self.op.ipolicy: 76 cluster = self.cfg.GetClusterInfo() 77 full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy) 78 try: 79 objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False) 80 except errors.ConfigurationError, err: 81 raise errors.OpPrereqError("Invalid instance policy: %s" % err, 82 errors.ECODE_INVAL) 83 CheckIpolicyVsDiskTemplates(full_ipolicy, 84 cluster.enabled_disk_templates)
85
86 - def CheckPrereq(self):
87 """Check prerequisites. 88 89 This checks that the given group name is not an existing node group 90 already. 91 92 """ 93 try: 94 existing_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 95 except errors.OpPrereqError: 96 pass 97 else: 98 raise errors.OpPrereqError("Desired group name '%s' already exists as a" 99 " node group (UUID: %s)" % 100 (self.op.group_name, existing_uuid), 101 errors.ECODE_EXISTS) 102 103 if self.op.ndparams: 104 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 105 106 if self.op.hv_state: 107 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 108 else: 109 self.new_hv_state = None 110 111 if self.op.disk_state: 112 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 113 else: 114 self.new_disk_state = None 115 116 if self.op.diskparams: 117 for templ in constants.DISK_TEMPLATES: 118 if templ in self.op.diskparams: 119 utils.ForceDictType(self.op.diskparams[templ], 120 constants.DISK_DT_TYPES) 121 self.new_diskparams = self.op.diskparams 122 try: 123 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS) 124 except errors.OpPrereqError, err: 125 raise errors.OpPrereqError("While verify diskparams options: %s" % err, 126 errors.ECODE_INVAL) 127 else: 128 self.new_diskparams = {} 129 130 self._CheckIpolicy()
131
132 - def BuildHooksEnv(self):
133 """Build hooks env. 134 135 """ 136 return { 137 "GROUP_NAME": self.op.group_name, 138 }
139
140 - def BuildHooksNodes(self):
141 """Build hooks nodes. 142 143 """ 144 mn = self.cfg.GetMasterNode() 145 return ([mn], [mn])
146 147 @staticmethod
148 - def _ConnectInstanceCommunicationNetwork(cfg, group_uuid, network_name):
149 """Connect a node group to the instance communication network. 150 151 The group is connected to the instance communication network via 152 the Opcode 'OpNetworkConnect'. 153 154 @type cfg: L{ganeti.config.ConfigWriter} 155 @param cfg: Ganeti configuration 156 157 @type group_uuid: string 158 @param group_uuid: UUID of the group to connect 159 160 @type network_name: string 161 @param network_name: name of the network to connect to 162 163 @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None} 164 @return: L{ganeti.cmdlib.ResultWithJobs} if the group needs to be 165 connected, otherwise (the group is already connected) 166 L{None} 167 168 """ 169 try: 170 cfg.LookupNetwork(network_name) 171 network_exists = True 172 except errors.OpPrereqError: 173 network_exists = False 174 175 if network_exists: 176 op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name) 177 return ResultWithJobs([[op]]) 178 else: 179 return None
180
181 - def Exec(self, feedback_fn):
182 """Add the node group to the cluster. 183 184 """ 185 group_obj = objects.NodeGroup(name=self.op.group_name, members=[], 186 uuid=self.group_uuid, 187 alloc_policy=self.op.alloc_policy, 188 ndparams=self.op.ndparams, 189 diskparams=self.new_diskparams, 190 ipolicy=self.op.ipolicy, 191 hv_state_static=self.new_hv_state, 192 disk_state_static=self.new_disk_state) 193 194 self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False) 195 196 network_name = self.cfg.GetClusterInfo().instance_communication_network 197 if network_name: 198 return self._ConnectInstanceCommunicationNetwork(self.cfg, 199 self.group_uuid, 200 network_name)
201
202 203 -class LUGroupAssignNodes(NoHooksLU):
204 """Logical unit for assigning nodes to groups. 205 206 """ 207 REQ_BGL = False 208
209 - def ExpandNames(self):
210 # These raise errors.OpPrereqError on their own: 211 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 212 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes) 213 214 # We want to lock all the affected nodes and groups. We have readily 215 # available the list of nodes, and the *destination* group. To gather the 216 # list of "source" groups, we need to fetch node information later on. 217 self.needed_locks = { 218 locking.LEVEL_NODEGROUP: set([self.group_uuid]), 219 locking.LEVEL_NODE: self.op.node_uuids, 220 }
221
222 - def DeclareLocks(self, level):
223 if level == locking.LEVEL_NODEGROUP: 224 assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1 225 226 # Try to get all affected nodes' groups without having the group or node 227 # lock yet. Needs verification later in the code flow. 228 groups = self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids) 229 230 self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
231
232 - def CheckPrereq(self):
233 """Check prerequisites. 234 235 """ 236 assert self.needed_locks[locking.LEVEL_NODEGROUP] 237 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 238 frozenset(self.op.node_uuids)) 239 240 expected_locks = (set([self.group_uuid]) | 241 self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids)) 242 actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP) 243 if actual_locks != expected_locks: 244 raise errors.OpExecError("Nodes changed groups since locks were acquired," 245 " current groups are '%s', used to be '%s'" % 246 (utils.CommaJoin(expected_locks), 247 utils.CommaJoin(actual_locks))) 248 249 self.node_data = self.cfg.GetAllNodesInfo() 250 self.group = self.cfg.GetNodeGroup(self.group_uuid) 251 instance_data = self.cfg.GetAllInstancesInfo() 252 253 if self.group is None: 254 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % 255 (self.op.group_name, self.group_uuid)) 256 257 (new_splits, previous_splits) = \ 258 self.CheckAssignmentForSplitInstances([(uuid, self.group_uuid) 259 for uuid in self.op.node_uuids], 260 self.node_data, instance_data) 261 262 if new_splits: 263 fmt_new_splits = utils.CommaJoin(utils.NiceSort( 264 self.cfg.GetInstanceNames(new_splits))) 265 266 if not self.op.force: 267 raise errors.OpExecError("The following instances get split by this" 268 " change and --force was not given: %s" % 269 fmt_new_splits) 270 else: 271 self.LogWarning("This operation will split the following instances: %s", 272 fmt_new_splits) 273 274 if previous_splits: 275 self.LogWarning("In addition, these already-split instances continue" 276 " to be split across groups: %s", 277 utils.CommaJoin(utils.NiceSort( 278 self.cfg.GetInstanceNames(previous_splits))))
279
280 - def Exec(self, feedback_fn):
281 """Assign nodes to a new group. 282 283 """ 284 mods = [(node_uuid, self.group_uuid) for node_uuid in self.op.node_uuids] 285 286 self.cfg.AssignGroupNodes(mods)
287
288 - def CheckAssignmentForSplitInstances(self, changes, node_data, instance_data):
289 """Check for split instances after a node assignment. 290 291 This method considers a series of node assignments as an atomic operation, 292 and returns information about split instances after applying the set of 293 changes. 294 295 In particular, it returns information about newly split instances, and 296 instances that were already split, and remain so after the change. 297 298 Only instances whose disk template is listed in constants.DTS_INT_MIRROR are 299 considered. 300 301 @type changes: list of (node_uuid, new_group_uuid) pairs. 302 @param changes: list of node assignments to consider. 303 @param node_data: a dict with data for all nodes 304 @param instance_data: a dict with all instances to consider 305 @rtype: a two-tuple 306 @return: a list of instances that were previously okay and result split as a 307 consequence of this change, and a list of instances that were previously 308 split and this change does not fix. 309 310 """ 311 changed_nodes = dict((uuid, group) for uuid, group in changes 312 if node_data[uuid].group != group) 313 314 all_split_instances = set() 315 previously_split_instances = set() 316 317 for inst in instance_data.values(): 318 if inst.disk_template not in constants.DTS_INT_MIRROR: 319 continue 320 321 inst_nodes = self.cfg.GetInstanceNodes(inst.uuid) 322 if len(set(node_data[node_uuid].group 323 for node_uuid in inst_nodes)) > 1: 324 previously_split_instances.add(inst.uuid) 325 326 if len(set(changed_nodes.get(node_uuid, node_data[node_uuid].group) 327 for node_uuid in inst_nodes)) > 1: 328 all_split_instances.add(inst.uuid) 329 330 return (list(all_split_instances - previously_split_instances), 331 list(previously_split_instances & all_split_instances))
332
333 334 -class LUGroupSetParams(LogicalUnit):
335 """Modifies the parameters of a node group. 336 337 """ 338 HPATH = "group-modify" 339 HTYPE = constants.HTYPE_GROUP 340 REQ_BGL = False 341
342 - def CheckArguments(self):
343 all_changes = [ 344 self.op.ndparams, 345 self.op.diskparams, 346 self.op.alloc_policy, 347 self.op.hv_state, 348 self.op.disk_state, 349 self.op.ipolicy, 350 ] 351 352 if all_changes.count(None) == len(all_changes): 353 raise errors.OpPrereqError("Please pass at least one modification", 354 errors.ECODE_INVAL) 355 356 if self.op.diskparams: 357 CheckDiskAccessModeValidity(self.op.diskparams)
358
359 - def ExpandNames(self):
360 # This raises errors.OpPrereqError on its own: 361 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 362 363 self.needed_locks = { 364 locking.LEVEL_INSTANCE: [], 365 locking.LEVEL_NODEGROUP: [self.group_uuid], 366 } 367 368 self.share_locks[locking.LEVEL_INSTANCE] = 1
369
370 - def DeclareLocks(self, level):
371 if level == locking.LEVEL_INSTANCE: 372 assert not self.needed_locks[locking.LEVEL_INSTANCE] 373 374 # Lock instances optimistically, needs verification once group lock has 375 # been acquired 376 self.needed_locks[locking.LEVEL_INSTANCE] = \ 377 self.cfg.GetInstanceNames( 378 self.cfg.GetNodeGroupInstances(self.group_uuid))
379 380 @staticmethod
381 - def _UpdateAndVerifyDiskParams(old, new):
382 """Updates and verifies disk parameters. 383 384 """ 385 new_params = GetUpdatedParams(old, new) 386 utils.ForceDictType(new_params, constants.DISK_DT_TYPES) 387 return new_params
388
389 - def _CheckIpolicy(self, cluster, owned_instance_names):
390 """Sanity checks for the ipolicy. 391 392 @type cluster: C{objects.Cluster} 393 @param cluster: the cluster's configuration 394 @type owned_instance_names: list of string 395 @param owned_instance_names: list of instances 396 397 """ 398 if self.op.ipolicy: 399 self.new_ipolicy = GetUpdatedIPolicy(self.group.ipolicy, 400 self.op.ipolicy, 401 group_policy=True) 402 403 new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy) 404 CheckIpolicyVsDiskTemplates(new_ipolicy, 405 cluster.enabled_disk_templates) 406 instances = \ 407 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names)) 408 gmi = ganeti.masterd.instance 409 violations = \ 410 ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster, 411 self.group), 412 new_ipolicy, instances.values(), 413 self.cfg) 414 415 if violations: 416 self.LogWarning("After the ipolicy change the following instances" 417 " violate them: %s", 418 utils.CommaJoin(violations))
419
420 - def CheckPrereq(self):
421 """Check prerequisites. 422 423 """ 424 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) 425 426 # Check if locked instances are still correct 427 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names) 428 429 self.group = self.cfg.GetNodeGroup(self.group_uuid) 430 cluster = self.cfg.GetClusterInfo() 431 432 if self.group is None: 433 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % 434 (self.op.group_name, self.group_uuid)) 435 436 if self.op.ndparams: 437 new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams) 438 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 439 self.new_ndparams = new_ndparams 440 441 if self.op.diskparams: 442 diskparams = self.group.diskparams 443 uavdp = self._UpdateAndVerifyDiskParams 444 # For each disktemplate subdict update and verify the values 445 new_diskparams = dict((dt, 446 uavdp(diskparams.get(dt, {}), 447 self.op.diskparams[dt])) 448 for dt in constants.DISK_TEMPLATES 449 if dt in self.op.diskparams) 450 # As we've all subdicts of diskparams ready, lets merge the actual 451 # dict with all updated subdicts 452 self.new_diskparams = objects.FillDict(diskparams, new_diskparams) 453 454 try: 455 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS) 456 CheckDiskAccessModeConsistency(self.new_diskparams, self.cfg, 457 group=self.group) 458 except errors.OpPrereqError, err: 459 raise errors.OpPrereqError("While verify diskparams options: %s" % err, 460 errors.ECODE_INVAL) 461 462 if self.op.hv_state: 463 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 464 self.group.hv_state_static) 465 466 if self.op.disk_state: 467 self.new_disk_state = \ 468 MergeAndVerifyDiskState(self.op.disk_state, 469 self.group.disk_state_static) 470 471 self._CheckIpolicy(cluster, owned_instance_names)
472
473 - def BuildHooksEnv(self):
474 """Build hooks env. 475 476 """ 477 return { 478 "GROUP_NAME": self.op.group_name, 479 "NEW_ALLOC_POLICY": self.op.alloc_policy, 480 }
481
482 - def BuildHooksNodes(self):
483 """Build hooks nodes. 484 485 """ 486 mn = self.cfg.GetMasterNode() 487 return ([mn], [mn])
488
489 - def Exec(self, feedback_fn):
490 """Modifies the node group. 491 492 """ 493 result = [] 494 495 if self.op.ndparams: 496 self.group.ndparams = self.new_ndparams 497 result.append(("ndparams", str(self.group.ndparams))) 498 499 if self.op.diskparams: 500 self.group.diskparams = self.new_diskparams 501 result.append(("diskparams", str(self.group.diskparams))) 502 503 if self.op.alloc_policy: 504 self.group.alloc_policy = self.op.alloc_policy 505 506 if self.op.hv_state: 507 self.group.hv_state_static = self.new_hv_state 508 509 if self.op.disk_state: 510 self.group.disk_state_static = self.new_disk_state 511 512 if self.op.ipolicy: 513 self.group.ipolicy = self.new_ipolicy 514 515 self.cfg.Update(self.group, feedback_fn) 516 return result
517
518 519 -class LUGroupRemove(LogicalUnit):
520 HPATH = "group-remove" 521 HTYPE = constants.HTYPE_GROUP 522 REQ_BGL = False 523
524 - def ExpandNames(self):
525 # This will raises errors.OpPrereqError on its own: 526 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 527 self.needed_locks = { 528 locking.LEVEL_NODEGROUP: [self.group_uuid], 529 }
530
531 - def CheckPrereq(self):
532 """Check prerequisites. 533 534 This checks that the given group name exists as a node group, that is 535 empty (i.e., contains no nodes), and that is not the last group of the 536 cluster. 537 538 """ 539 # Verify that the group is empty. 540 group_nodes = [node.uuid 541 for node in self.cfg.GetAllNodesInfo().values() 542 if node.group == self.group_uuid] 543 544 if group_nodes: 545 raise errors.OpPrereqError("Group '%s' not empty, has the following" 546 " nodes: %s" % 547 (self.op.group_name, 548 utils.CommaJoin(utils.NiceSort(group_nodes))), 549 errors.ECODE_STATE) 550 551 # Verify the cluster would not be left group-less. 552 if len(self.cfg.GetNodeGroupList()) == 1: 553 raise errors.OpPrereqError("Group '%s' is the only group, cannot be" 554 " removed" % self.op.group_name, 555 errors.ECODE_STATE)
556
557 - def BuildHooksEnv(self):
558 """Build hooks env. 559 560 """ 561 return { 562 "GROUP_NAME": self.op.group_name, 563 }
564
565 - def BuildHooksNodes(self):
566 """Build hooks nodes. 567 568 """ 569 mn = self.cfg.GetMasterNode() 570 return ([mn], [mn])
571
572 - def Exec(self, feedback_fn):
573 """Remove the node group. 574 575 """ 576 try: 577 self.cfg.RemoveNodeGroup(self.group_uuid) 578 except errors.ConfigurationError: 579 raise errors.OpExecError("Group '%s' with UUID %s disappeared" % 580 (self.op.group_name, self.group_uuid))
581
582 583 -class LUGroupRename(LogicalUnit):
584 HPATH = "group-rename" 585 HTYPE = constants.HTYPE_GROUP 586 REQ_BGL = False 587
588 - def ExpandNames(self):
589 # This raises errors.OpPrereqError on its own: 590 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 591 592 self.needed_locks = { 593 locking.LEVEL_NODEGROUP: [self.group_uuid], 594 }
595
596 - def CheckPrereq(self):
597 """Check prerequisites. 598 599 Ensures requested new name is not yet used. 600 601 """ 602 try: 603 new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name) 604 except errors.OpPrereqError: 605 pass 606 else: 607 raise errors.OpPrereqError("Desired new name '%s' clashes with existing" 608 " node group (UUID: %s)" % 609 (self.op.new_name, new_name_uuid), 610 errors.ECODE_EXISTS)
611
612 - def BuildHooksEnv(self):
613 """Build hooks env. 614 615 """ 616 return { 617 "OLD_NAME": self.op.group_name, 618 "NEW_NAME": self.op.new_name, 619 }
620
621 - def BuildHooksNodes(self):
622 """Build hooks nodes. 623 624 """ 625 mn = self.cfg.GetMasterNode() 626 627 all_nodes = self.cfg.GetAllNodesInfo() 628 all_nodes.pop(mn, None) 629 630 run_nodes = [mn] 631 run_nodes.extend(node.uuid for node in all_nodes.values() 632 if node.group == self.group_uuid) 633 634 return (run_nodes, run_nodes)
635
636 - def Exec(self, feedback_fn):
637 """Rename the node group. 638 639 """ 640 group = self.cfg.GetNodeGroup(self.group_uuid) 641 642 if group is None: 643 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % 644 (self.op.group_name, self.group_uuid)) 645 646 group.name = self.op.new_name 647 self.cfg.Update(group, feedback_fn) 648 649 return self.op.new_name
650
651 652 -class LUGroupEvacuate(LogicalUnit):
653 HPATH = "group-evacuate" 654 HTYPE = constants.HTYPE_GROUP 655 REQ_BGL = False 656
657 - def ExpandNames(self):
658 # This raises errors.OpPrereqError on its own: 659 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 660 661 if self.op.target_groups: 662 self.req_target_uuids = map(self.cfg.LookupNodeGroup, 663 self.op.target_groups) 664 else: 665 self.req_target_uuids = [] 666 667 if self.group_uuid in self.req_target_uuids: 668 raise errors.OpPrereqError("Group to be evacuated (%s) can not be used" 669 " as a target group (targets are %s)" % 670 (self.group_uuid, 671 utils.CommaJoin(self.req_target_uuids)), 672 errors.ECODE_INVAL) 673 674 self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator) 675 676 self.share_locks = ShareAll() 677 self.needed_locks = { 678 locking.LEVEL_INSTANCE: [], 679 locking.LEVEL_NODEGROUP: [], 680 locking.LEVEL_NODE: [], 681 }
682
683 - def DeclareLocks(self, level):
684 if level == locking.LEVEL_INSTANCE: 685 assert not self.needed_locks[locking.LEVEL_INSTANCE] 686 687 # Lock instances optimistically, needs verification once node and group 688 # locks have been acquired 689 self.needed_locks[locking.LEVEL_INSTANCE] = \ 690 self.cfg.GetInstanceNames( 691 self.cfg.GetNodeGroupInstances(self.group_uuid)) 692 693 elif level == locking.LEVEL_NODEGROUP: 694 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 695 696 if self.req_target_uuids: 697 lock_groups = set([self.group_uuid] + self.req_target_uuids) 698 699 # Lock all groups used by instances optimistically; this requires going 700 # via the node before it's locked, requiring verification later on 701 lock_groups.update(group_uuid 702 for instance_name in 703 self.owned_locks(locking.LEVEL_INSTANCE) 704 for group_uuid in 705 self.cfg.GetInstanceNodeGroups( 706 self.cfg.GetInstanceInfoByName(instance_name) 707 .uuid)) 708 else: 709 # No target groups, need to lock all of them 710 lock_groups = locking.ALL_SET 711 712 self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups 713 714 elif level == locking.LEVEL_NODE: 715 # This will only lock the nodes in the group to be evacuated which 716 # contain actual instances 717 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 718 self._LockInstancesNodes() 719 720 # Lock all nodes in group to be evacuated and target groups 721 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 722 assert self.group_uuid in owned_groups 723 member_node_uuids = [node_uuid 724 for group in owned_groups 725 for node_uuid in 726 self.cfg.GetNodeGroup(group).members] 727 self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
728
729 - def CheckPrereq(self):
730 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) 731 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 732 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE)) 733 734 assert owned_groups.issuperset(self.req_target_uuids) 735 assert self.group_uuid in owned_groups 736 737 # Check if locked instances are still correct 738 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names) 739 740 # Get instance information 741 self.instances = \ 742 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names)) 743 744 # Check if node groups for locked instances are still correct 745 CheckInstancesNodeGroups(self.cfg, self.instances, 746 owned_groups, owned_node_uuids, self.group_uuid) 747 748 if self.req_target_uuids: 749 # User requested specific target groups 750 self.target_uuids = self.req_target_uuids 751 else: 752 # All groups except the one to be evacuated are potential targets 753 self.target_uuids = [group_uuid for group_uuid in owned_groups 754 if group_uuid != self.group_uuid] 755 756 if not self.target_uuids: 757 raise errors.OpPrereqError("There are no possible target groups", 758 errors.ECODE_INVAL)
759
760 - def BuildHooksEnv(self):
761 """Build hooks env. 762 763 """ 764 return { 765 "GROUP_NAME": self.op.group_name, 766 "TARGET_GROUPS": " ".join(self.target_uuids), 767 }
768
769 - def BuildHooksNodes(self):
770 """Build hooks nodes. 771 772 """ 773 mn = self.cfg.GetMasterNode() 774 775 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 776 777 run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members 778 779 return (run_nodes, run_nodes)
780 781 @staticmethod
782 - def _MigrateToFailover(op):
783 """Return an equivalent failover opcode for a migrate one. 784 785 If the argument is not a failover opcode, return it unchanged. 786 787 """ 788 if not isinstance(op, opcodes.OpInstanceMigrate): 789 return op 790 else: 791 return opcodes.OpInstanceFailover( 792 instance_name=op.instance_name, 793 instance_uuid=getattr(op, "instance_uuid", None), 794 target_node=getattr(op, "target_node", None), 795 target_node_uuid=getattr(op, "target_node_uuid", None), 796 ignore_ipolicy=op.ignore_ipolicy, 797 cleanup=op.cleanup)
798
799 - def Exec(self, feedback_fn):
800 inst_names = list(self.owned_locks(locking.LEVEL_INSTANCE)) 801 802 assert self.group_uuid not in self.target_uuids 803 804 req = iallocator.IAReqGroupChange(instances=inst_names, 805 target_groups=self.target_uuids) 806 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 807 808 ial.Run(self.op.iallocator) 809 810 if not ial.success: 811 raise errors.OpPrereqError("Can't compute group evacuation using" 812 " iallocator '%s': %s" % 813 (self.op.iallocator, ial.info), 814 errors.ECODE_NORES) 815 816 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False) 817 818 self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s", 819 len(jobs), self.op.group_name) 820 821 if self.op.force_failover: 822 self.LogInfo("Will insist on failovers") 823 jobs = [[self._MigrateToFailover(op) for op in job] for job in jobs] 824 825 if self.op.sequential: 826 self.LogInfo("Jobs will be submitted to run sequentially") 827 for job in jobs[1:]: 828 for op in job: 829 op.depends = [(-1, ["error", "success"])] 830 831 return ResultWithJobs(jobs)
832
833 834 -class LUGroupVerifyDisks(NoHooksLU):
835 """Verifies the status of all disks in a node group. 836 837 """ 838 REQ_BGL = False 839
840 - def ExpandNames(self):
841 # Raises errors.OpPrereqError on its own if group can't be found 842 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 843 844 self.share_locks = ShareAll() 845 self.needed_locks = { 846 locking.LEVEL_INSTANCE: [], 847 locking.LEVEL_NODEGROUP: [], 848 locking.LEVEL_NODE: [], 849 850 # This opcode is acquires all node locks in a group. LUClusterVerifyDisks 851 # starts one instance of this opcode for every group, which means all 852 # nodes will be locked for a short amount of time, so it's better to 853 # acquire the node allocation lock as well. 854 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 855 } 856 self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True 857 self.dont_collate_locks[locking.LEVEL_NODE] = True
858
859 - def DeclareLocks(self, level):
860 if level == locking.LEVEL_INSTANCE: 861 assert not self.needed_locks[locking.LEVEL_INSTANCE] 862 863 # Lock instances optimistically, needs verification once node and group 864 # locks have been acquired 865 self.needed_locks[locking.LEVEL_INSTANCE] = \ 866 self.cfg.GetInstanceNames( 867 self.cfg.GetNodeGroupInstances(self.group_uuid)) 868 869 elif level == locking.LEVEL_NODEGROUP: 870 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 871 872 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 873 set([self.group_uuid] + 874 # Lock all groups used by instances optimistically; this requires 875 # going via the node before it's locked, requiring verification 876 # later on 877 [group_uuid 878 for instance_name in self.owned_locks(locking.LEVEL_INSTANCE) 879 for group_uuid in 880 self.cfg.GetInstanceNodeGroups( 881 self.cfg.GetInstanceInfoByName(instance_name).uuid)]) 882 883 elif level == locking.LEVEL_NODE: 884 # This will only lock the nodes in the group to be verified which contain 885 # actual instances 886 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 887 self._LockInstancesNodes() 888 889 # Lock all nodes in group to be verified 890 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 891 member_node_uuids = self.cfg.GetNodeGroup(self.group_uuid).members 892 self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
893
894 - def CheckPrereq(self):
895 owned_inst_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) 896 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 897 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE)) 898 899 assert self.group_uuid in owned_groups 900 901 # Check if locked instances are still correct 902 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_inst_names) 903 904 # Get instance information 905 self.instances = dict(self.cfg.GetMultiInstanceInfoByName(owned_inst_names)) 906 907 # Check if node groups for locked instances are still correct 908 CheckInstancesNodeGroups(self.cfg, self.instances, 909 owned_groups, owned_node_uuids, self.group_uuid)
910
911 - def _VerifyInstanceLvs(self, node_errors, offline_disk_instance_names, 912 missing_disks):
913 node_lv_to_inst = MapInstanceLvsToNodes( 914 self.cfg, 915 [inst for inst in self.instances.values() if inst.disks_active]) 916 if node_lv_to_inst: 917 node_uuids = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) & 918 set(self.cfg.GetVmCapableNodeList())) 919 920 node_lvs = self.rpc.call_lv_list(node_uuids, []) 921 922 for (node_uuid, node_res) in node_lvs.items(): 923 if node_res.offline: 924 continue 925 926 msg = node_res.fail_msg 927 if msg: 928 logging.warning("Error enumerating LVs on node %s: %s", 929 self.cfg.GetNodeName(node_uuid), msg) 930 node_errors[node_uuid] = msg 931 continue 932 933 for lv_name, (_, _, lv_online) in node_res.payload.items(): 934 inst = node_lv_to_inst.pop((node_uuid, lv_name), None) 935 if not lv_online and inst is not None: 936 offline_disk_instance_names.add(inst.name) 937 938 # any leftover items in nv_dict are missing LVs, let's arrange the data 939 # better 940 for key, inst in node_lv_to_inst.iteritems(): 941 missing_disks.setdefault(inst.name, []).append(list(key))
942
943 - def _VerifyDrbdStates(self, node_errors, offline_disk_instance_names):
944 node_to_inst = {} 945 for inst in self.instances.values(): 946 if not inst.disks_active or inst.disk_template != constants.DT_DRBD8: 947 continue 948 949 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(inst.uuid) 950 for node_uuid in itertools.chain([inst.primary_node], 951 secondary_nodes): 952 node_to_inst.setdefault(node_uuid, []).append(inst) 953 954 for (node_uuid, insts) in node_to_inst.items(): 955 node_disks = [(self.cfg.GetInstanceDisks(inst.uuid), inst) 956 for inst in insts] 957 node_res = self.rpc.call_drbd_needs_activation(node_uuid, node_disks) 958 msg = node_res.fail_msg 959 if msg: 960 logging.warning("Error getting DRBD status on node %s: %s", 961 self.cfg.GetNodeName(node_uuid), msg) 962 node_errors[node_uuid] = msg 963 continue 964 965 faulty_disk_uuids = set(node_res.payload) 966 for inst in self.instances.values(): 967 disks = self.cfg.GetInstanceDisks(inst.uuid) 968 inst_disk_uuids = set([disk.uuid for disk in disks]) 969 if inst_disk_uuids.intersection(faulty_disk_uuids): 970 offline_disk_instance_names.add(inst.name)
971
972 - def Exec(self, feedback_fn):
973 """Verify integrity of cluster disks. 974 975 @rtype: tuple of three items 976 @return: a tuple of (dict of node-to-node_error, list of instances 977 which need activate-disks, dict of instance: (node, volume) for 978 missing volumes 979 980 """ 981 node_errors = {} 982 offline_disk_instance_names = set() 983 missing_disks = {} 984 985 self._VerifyInstanceLvs(node_errors, offline_disk_instance_names, 986 missing_disks) 987 self._VerifyDrbdStates(node_errors, offline_disk_instance_names) 988 989 return (node_errors, list(offline_disk_instance_names), missing_disks)
990