Package ganeti :: Package cmdlib :: Module group
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.group

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Logical units dealing with node groups.""" 
 32   
 33  import itertools 
 34  import logging 
 35   
 36  from ganeti import constants 
 37  from ganeti import errors 
 38  from ganeti import locking 
 39  from ganeti import objects 
 40  from ganeti import opcodes 
 41  from ganeti import utils 
 42  from ganeti.masterd import iallocator 
 43  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
 44  from ganeti.cmdlib.common import MergeAndVerifyHvState, \ 
 45    MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \ 
 46    CheckNodeGroupInstances, GetUpdatedIPolicy, \ 
 47    ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \ 
 48    CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceLvsToNodes, \ 
 49    CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \ 
 50    CheckDiskAccessModeConsistency, ConnectInstanceCommunicationNetworkOp 
 51   
 52  import ganeti.masterd.instance 
53 54 55 -class LUGroupAdd(LogicalUnit):
56 """Logical unit for creating node groups. 57 58 """ 59 HPATH = "group-add" 60 HTYPE = constants.HTYPE_GROUP 61 REQ_BGL = False 62
63 - def ExpandNames(self):
64 # We need the new group's UUID here so that we can create and acquire the 65 # corresponding lock. Later, in Exec(), we'll indicate to cfg.AddNodeGroup 66 # that it should not check whether the UUID exists in the configuration. 67 self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) 68 self.needed_locks = {} 69 self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
70
71 - def _CheckIpolicy(self):
72 """Checks the group's ipolicy for consistency and validity. 73 74 """ 75 if self.op.ipolicy: 76 cluster = self.cfg.GetClusterInfo() 77 full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy) 78 try: 79 objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False) 80 except errors.ConfigurationError, err: 81 raise errors.OpPrereqError("Invalid instance policy: %s" % err, 82 errors.ECODE_INVAL) 83 CheckIpolicyVsDiskTemplates(full_ipolicy, 84 cluster.enabled_disk_templates)
85
86 - def CheckPrereq(self):
87 """Check prerequisites. 88 89 This checks that the given group name is not an existing node group 90 already. 91 92 """ 93 try: 94 existing_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 95 except errors.OpPrereqError: 96 pass 97 else: 98 raise errors.OpPrereqError("Desired group name '%s' already exists as a" 99 " node group (UUID: %s)" % 100 (self.op.group_name, existing_uuid), 101 errors.ECODE_EXISTS) 102 103 if self.op.ndparams: 104 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 105 106 if self.op.hv_state: 107 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 108 else: 109 self.new_hv_state = None 110 111 if self.op.disk_state: 112 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 113 else: 114 self.new_disk_state = None 115 116 if self.op.diskparams: 117 for templ in constants.DISK_TEMPLATES: 118 if templ in self.op.diskparams: 119 utils.ForceDictType(self.op.diskparams[templ], 120 constants.DISK_DT_TYPES) 121 self.new_diskparams = self.op.diskparams 122 try: 123 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS) 124 except errors.OpPrereqError, err: 125 raise errors.OpPrereqError("While verify diskparams options: %s" % err, 126 errors.ECODE_INVAL) 127 else: 128 self.new_diskparams = {} 129 130 self._CheckIpolicy()
131
132 - def BuildHooksEnv(self):
133 """Build hooks env. 134 135 """ 136 return { 137 "GROUP_NAME": self.op.group_name, 138 }
139
140 - def BuildHooksNodes(self):
141 """Build hooks nodes. 142 143 """ 144 mn = self.cfg.GetMasterNode() 145 return ([mn], [mn])
146 147 @staticmethod
148 - def _ConnectInstanceCommunicationNetwork(cfg, group_uuid, network_name):
149 """Connect a node group to the instance communication network. 150 151 The group is connected to the instance communication network via 152 the Opcode 'OpNetworkConnect'. 153 154 @type cfg: L{ganeti.config.ConfigWriter} 155 @param cfg: Ganeti configuration 156 157 @type group_uuid: string 158 @param group_uuid: UUID of the group to connect 159 160 @type network_name: string 161 @param network_name: name of the network to connect to 162 163 @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None} 164 @return: L{ganeti.cmdlib.ResultWithJobs} if the group needs to be 165 connected, otherwise (the group is already connected) 166 L{None} 167 168 """ 169 try: 170 cfg.LookupNetwork(network_name) 171 network_exists = True 172 except errors.OpPrereqError: 173 network_exists = False 174 175 if network_exists: 176 op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name) 177 return ResultWithJobs([[op]]) 178 else: 179 return None
180
181 - def Exec(self, feedback_fn):
182 """Add the node group to the cluster. 183 184 """ 185 group_obj = objects.NodeGroup(name=self.op.group_name, members=[], 186 uuid=self.group_uuid, 187 alloc_policy=self.op.alloc_policy, 188 ndparams=self.op.ndparams, 189 diskparams=self.new_diskparams, 190 ipolicy=self.op.ipolicy, 191 hv_state_static=self.new_hv_state, 192 disk_state_static=self.new_disk_state) 193 194 self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False) 195 196 network_name = self.cfg.GetClusterInfo().instance_communication_network 197 if network_name: 198 return self._ConnectInstanceCommunicationNetwork(self.cfg, 199 self.group_uuid, 200 network_name)
201
202 203 -class LUGroupAssignNodes(NoHooksLU):
204 """Logical unit for assigning nodes to groups. 205 206 """ 207 REQ_BGL = False 208
209 - def ExpandNames(self):
210 # These raise errors.OpPrereqError on their own: 211 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 212 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes) 213 214 # We want to lock all the affected nodes and groups. We have readily 215 # available the list of nodes, and the *destination* group. To gather the 216 # list of "source" groups, we need to fetch node information later on. 217 self.needed_locks = { 218 locking.LEVEL_NODEGROUP: set([self.group_uuid]), 219 locking.LEVEL_NODE: self.op.node_uuids, 220 }
221
222 - def DeclareLocks(self, level):
223 if level == locking.LEVEL_NODEGROUP: 224 assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1 225 226 # Try to get all affected nodes' groups without having the group or node 227 # lock yet. Needs verification later in the code flow. 228 groups = self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids) 229 230 self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
231
232 - def CheckPrereq(self):
233 """Check prerequisites. 234 235 """ 236 assert self.needed_locks[locking.LEVEL_NODEGROUP] 237 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 238 frozenset(self.op.node_uuids)) 239 240 expected_locks = (set([self.group_uuid]) | 241 self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids)) 242 actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP) 243 if actual_locks != expected_locks: 244 raise errors.OpExecError("Nodes changed groups since locks were acquired," 245 " current groups are '%s', used to be '%s'" % 246 (utils.CommaJoin(expected_locks), 247 utils.CommaJoin(actual_locks))) 248 249 self.node_data = self.cfg.GetAllNodesInfo() 250 self.group = self.cfg.GetNodeGroup(self.group_uuid) 251 instance_data = self.cfg.GetAllInstancesInfo() 252 253 if self.group is None: 254 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % 255 (self.op.group_name, self.group_uuid)) 256 257 (new_splits, previous_splits) = \ 258 self.CheckAssignmentForSplitInstances([(uuid, self.group_uuid) 259 for uuid in self.op.node_uuids], 260 self.node_data, instance_data) 261 262 if new_splits: 263 fmt_new_splits = utils.CommaJoin(utils.NiceSort( 264 self.cfg.GetInstanceNames(new_splits))) 265 266 if not self.op.force: 267 raise errors.OpExecError("The following instances get split by this" 268 " change and --force was not given: %s" % 269 fmt_new_splits) 270 else: 271 self.LogWarning("This operation will split the following instances: %s", 272 fmt_new_splits) 273 274 if previous_splits: 275 self.LogWarning("In addition, these already-split instances continue" 276 " to be split across groups: %s", 277 utils.CommaJoin(utils.NiceSort( 278 self.cfg.GetInstanceNames(previous_splits))))
279
280 - def Exec(self, feedback_fn):
281 """Assign nodes to a new group. 282 283 """ 284 mods = [(node_uuid, self.group_uuid) for node_uuid in self.op.node_uuids] 285 286 self.cfg.AssignGroupNodes(mods)
287
288 - def CheckAssignmentForSplitInstances(self, changes, node_data, instance_data):
289 """Check for split instances after a node assignment. 290 291 This method considers a series of node assignments as an atomic operation, 292 and returns information about split instances after applying the set of 293 changes. 294 295 In particular, it returns information about newly split instances, and 296 instances that were already split, and remain so after the change. 297 298 Only disks whose template is listed in constants.DTS_INT_MIRROR are 299 considered. 300 301 @type changes: list of (node_uuid, new_group_uuid) pairs. 302 @param changes: list of node assignments to consider. 303 @param node_data: a dict with data for all nodes 304 @param instance_data: a dict with all instances to consider 305 @rtype: a two-tuple 306 @return: a list of instances that were previously okay and result split as a 307 consequence of this change, and a list of instances that were previously 308 split and this change does not fix. 309 310 """ 311 changed_nodes = dict((uuid, group) for uuid, group in changes 312 if node_data[uuid].group != group) 313 314 all_split_instances = set() 315 previously_split_instances = set() 316 317 for inst in instance_data.values(): 318 inst_disks = self.cfg.GetInstanceDisks(inst.uuid) 319 if not utils.AnyDiskOfType(inst_disks, constants.DTS_INT_MIRROR): 320 continue 321 322 inst_nodes = self.cfg.GetInstanceNodes(inst.uuid) 323 if len(set(node_data[node_uuid].group 324 for node_uuid in inst_nodes)) > 1: 325 previously_split_instances.add(inst.uuid) 326 327 if len(set(changed_nodes.get(node_uuid, node_data[node_uuid].group) 328 for node_uuid in inst_nodes)) > 1: 329 all_split_instances.add(inst.uuid) 330 331 return (list(all_split_instances - previously_split_instances), 332 list(previously_split_instances & all_split_instances))
333
334 335 -class LUGroupSetParams(LogicalUnit):
336 """Modifies the parameters of a node group. 337 338 """ 339 HPATH = "group-modify" 340 HTYPE = constants.HTYPE_GROUP 341 REQ_BGL = False 342
343 - def CheckArguments(self):
344 all_changes = [ 345 self.op.ndparams, 346 self.op.diskparams, 347 self.op.alloc_policy, 348 self.op.hv_state, 349 self.op.disk_state, 350 self.op.ipolicy, 351 ] 352 353 if all_changes.count(None) == len(all_changes): 354 raise errors.OpPrereqError("Please pass at least one modification", 355 errors.ECODE_INVAL) 356 357 if self.op.diskparams: 358 CheckDiskAccessModeValidity(self.op.diskparams)
359
360 - def ExpandNames(self):
361 # This raises errors.OpPrereqError on its own: 362 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 363 364 self.needed_locks = { 365 locking.LEVEL_INSTANCE: [], 366 locking.LEVEL_NODEGROUP: [self.group_uuid], 367 } 368 369 self.share_locks[locking.LEVEL_INSTANCE] = 1
370
371 - def DeclareLocks(self, level):
372 if level == locking.LEVEL_INSTANCE: 373 assert not self.needed_locks[locking.LEVEL_INSTANCE] 374 375 # Lock instances optimistically, needs verification once group lock has 376 # been acquired 377 self.needed_locks[locking.LEVEL_INSTANCE] = \ 378 self.cfg.GetInstanceNames( 379 self.cfg.GetNodeGroupInstances(self.group_uuid))
380 381 @staticmethod
382 - def _UpdateAndVerifyDiskParams(old, new):
383 """Updates and verifies disk parameters. 384 385 """ 386 new_params = GetUpdatedParams(old, new) 387 utils.ForceDictType(new_params, constants.DISK_DT_TYPES) 388 return new_params
389
390 - def _CheckIpolicy(self, cluster, owned_instance_names):
391 """Sanity checks for the ipolicy. 392 393 @type cluster: C{objects.Cluster} 394 @param cluster: the cluster's configuration 395 @type owned_instance_names: list of string 396 @param owned_instance_names: list of instances 397 398 """ 399 if self.op.ipolicy: 400 self.new_ipolicy = GetUpdatedIPolicy(self.group.ipolicy, 401 self.op.ipolicy, 402 group_policy=True) 403 404 new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy) 405 CheckIpolicyVsDiskTemplates(new_ipolicy, 406 cluster.enabled_disk_templates) 407 instances = \ 408 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names)) 409 gmi = ganeti.masterd.instance 410 violations = \ 411 ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster, 412 self.group), 413 new_ipolicy, instances.values(), 414 self.cfg) 415 416 if violations: 417 self.LogWarning("After the ipolicy change the following instances" 418 " violate them: %s", 419 utils.CommaJoin(violations))
420
421 - def CheckPrereq(self):
422 """Check prerequisites. 423 424 """ 425 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) 426 427 # Check if locked instances are still correct 428 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names) 429 430 self.group = self.cfg.GetNodeGroup(self.group_uuid) 431 cluster = self.cfg.GetClusterInfo() 432 433 if self.group is None: 434 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % 435 (self.op.group_name, self.group_uuid)) 436 437 if self.op.ndparams: 438 new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams) 439 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 440 self.new_ndparams = new_ndparams 441 442 if self.op.diskparams: 443 diskparams = self.group.diskparams 444 uavdp = self._UpdateAndVerifyDiskParams 445 # For each disktemplate subdict update and verify the values 446 new_diskparams = dict((dt, 447 uavdp(diskparams.get(dt, {}), 448 self.op.diskparams[dt])) 449 for dt in constants.DISK_TEMPLATES 450 if dt in self.op.diskparams) 451 # As we've all subdicts of diskparams ready, lets merge the actual 452 # dict with all updated subdicts 453 self.new_diskparams = objects.FillDict(diskparams, new_diskparams) 454 455 try: 456 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS) 457 CheckDiskAccessModeConsistency(self.new_diskparams, self.cfg, 458 group=self.group) 459 except errors.OpPrereqError, err: 460 raise errors.OpPrereqError("While verify diskparams options: %s" % err, 461 errors.ECODE_INVAL) 462 463 if self.op.hv_state: 464 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 465 self.group.hv_state_static) 466 467 if self.op.disk_state: 468 self.new_disk_state = \ 469 MergeAndVerifyDiskState(self.op.disk_state, 470 self.group.disk_state_static) 471 472 self._CheckIpolicy(cluster, owned_instance_names)
473
474 - def BuildHooksEnv(self):
475 """Build hooks env. 476 477 """ 478 return { 479 "GROUP_NAME": self.op.group_name, 480 "NEW_ALLOC_POLICY": self.op.alloc_policy, 481 }
482
483 - def BuildHooksNodes(self):
484 """Build hooks nodes. 485 486 """ 487 mn = self.cfg.GetMasterNode() 488 return ([mn], [mn])
489
490 - def Exec(self, feedback_fn):
491 """Modifies the node group. 492 493 """ 494 result = [] 495 496 if self.op.ndparams: 497 self.group.ndparams = self.new_ndparams 498 result.append(("ndparams", str(self.group.ndparams))) 499 500 if self.op.diskparams: 501 self.group.diskparams = self.new_diskparams 502 result.append(("diskparams", str(self.group.diskparams))) 503 504 if self.op.alloc_policy: 505 self.group.alloc_policy = self.op.alloc_policy 506 507 if self.op.hv_state: 508 self.group.hv_state_static = self.new_hv_state 509 510 if self.op.disk_state: 511 self.group.disk_state_static = self.new_disk_state 512 513 if self.op.ipolicy: 514 self.group.ipolicy = self.new_ipolicy 515 516 self.cfg.Update(self.group, feedback_fn) 517 return result
518
519 520 -class LUGroupRemove(LogicalUnit):
521 HPATH = "group-remove" 522 HTYPE = constants.HTYPE_GROUP 523 REQ_BGL = False 524
525 - def ExpandNames(self):
526 # This will raises errors.OpPrereqError on its own: 527 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 528 self.needed_locks = { 529 locking.LEVEL_NODEGROUP: [self.group_uuid], 530 }
531
532 - def CheckPrereq(self):
533 """Check prerequisites. 534 535 This checks that the given group name exists as a node group, that is 536 empty (i.e., contains no nodes), and that is not the last group of the 537 cluster. 538 539 """ 540 # Verify that the group is empty. 541 group_nodes = [node.uuid 542 for node in self.cfg.GetAllNodesInfo().values() 543 if node.group == self.group_uuid] 544 545 if group_nodes: 546 raise errors.OpPrereqError("Group '%s' not empty, has the following" 547 " nodes: %s" % 548 (self.op.group_name, 549 utils.CommaJoin(utils.NiceSort(group_nodes))), 550 errors.ECODE_STATE) 551 552 # Verify the cluster would not be left group-less. 553 if len(self.cfg.GetNodeGroupList()) == 1: 554 raise errors.OpPrereqError("Group '%s' is the only group, cannot be" 555 " removed" % self.op.group_name, 556 errors.ECODE_STATE)
557
558 - def BuildHooksEnv(self):
559 """Build hooks env. 560 561 """ 562 return { 563 "GROUP_NAME": self.op.group_name, 564 }
565
566 - def BuildHooksNodes(self):
567 """Build hooks nodes. 568 569 """ 570 mn = self.cfg.GetMasterNode() 571 return ([mn], [mn])
572
573 - def Exec(self, feedback_fn):
574 """Remove the node group. 575 576 """ 577 try: 578 self.cfg.RemoveNodeGroup(self.group_uuid) 579 except errors.ConfigurationError: 580 raise errors.OpExecError("Group '%s' with UUID %s disappeared" % 581 (self.op.group_name, self.group_uuid))
582
583 584 -class LUGroupRename(LogicalUnit):
585 HPATH = "group-rename" 586 HTYPE = constants.HTYPE_GROUP 587 REQ_BGL = False 588
589 - def ExpandNames(self):
590 # This raises errors.OpPrereqError on its own: 591 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 592 593 self.needed_locks = { 594 locking.LEVEL_NODEGROUP: [self.group_uuid], 595 }
596
597 - def CheckPrereq(self):
598 """Check prerequisites. 599 600 Ensures requested new name is not yet used. 601 602 """ 603 try: 604 new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name) 605 except errors.OpPrereqError: 606 pass 607 else: 608 raise errors.OpPrereqError("Desired new name '%s' clashes with existing" 609 " node group (UUID: %s)" % 610 (self.op.new_name, new_name_uuid), 611 errors.ECODE_EXISTS)
612
613 - def BuildHooksEnv(self):
614 """Build hooks env. 615 616 """ 617 return { 618 "OLD_NAME": self.op.group_name, 619 "NEW_NAME": self.op.new_name, 620 }
621
622 - def BuildHooksNodes(self):
623 """Build hooks nodes. 624 625 """ 626 mn = self.cfg.GetMasterNode() 627 628 all_nodes = self.cfg.GetAllNodesInfo() 629 all_nodes.pop(mn, None) 630 631 run_nodes = [mn] 632 run_nodes.extend(node.uuid for node in all_nodes.values() 633 if node.group == self.group_uuid) 634 635 return (run_nodes, run_nodes)
636
637 - def Exec(self, feedback_fn):
638 """Rename the node group. 639 640 """ 641 group = self.cfg.GetNodeGroup(self.group_uuid) 642 643 if group is None: 644 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % 645 (self.op.group_name, self.group_uuid)) 646 647 group.name = self.op.new_name 648 self.cfg.Update(group, feedback_fn) 649 650 return self.op.new_name
651
652 653 -class LUGroupEvacuate(LogicalUnit):
654 HPATH = "group-evacuate" 655 HTYPE = constants.HTYPE_GROUP 656 REQ_BGL = False 657
658 - def ExpandNames(self):
659 # This raises errors.OpPrereqError on its own: 660 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 661 662 if self.op.target_groups: 663 self.req_target_uuids = map(self.cfg.LookupNodeGroup, 664 self.op.target_groups) 665 else: 666 self.req_target_uuids = [] 667 668 if self.group_uuid in self.req_target_uuids: 669 raise errors.OpPrereqError("Group to be evacuated (%s) can not be used" 670 " as a target group (targets are %s)" % 671 (self.group_uuid, 672 utils.CommaJoin(self.req_target_uuids)), 673 errors.ECODE_INVAL) 674 675 self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator) 676 677 self.share_locks = ShareAll() 678 self.needed_locks = { 679 locking.LEVEL_INSTANCE: [], 680 locking.LEVEL_NODEGROUP: [], 681 locking.LEVEL_NODE: [], 682 }
683
684 - def DeclareLocks(self, level):
685 if level == locking.LEVEL_INSTANCE: 686 assert not self.needed_locks[locking.LEVEL_INSTANCE] 687 688 # Lock instances optimistically, needs verification once node and group 689 # locks have been acquired 690 self.needed_locks[locking.LEVEL_INSTANCE] = \ 691 self.cfg.GetInstanceNames( 692 self.cfg.GetNodeGroupInstances(self.group_uuid)) 693 694 elif level == locking.LEVEL_NODEGROUP: 695 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 696 697 if self.req_target_uuids: 698 lock_groups = set([self.group_uuid] + self.req_target_uuids) 699 700 # Lock all groups used by instances optimistically; this requires going 701 # via the node before it's locked, requiring verification later on 702 lock_groups.update(group_uuid 703 for instance_name in 704 self.owned_locks(locking.LEVEL_INSTANCE) 705 for group_uuid in 706 self.cfg.GetInstanceNodeGroups( 707 self.cfg.GetInstanceInfoByName(instance_name) 708 .uuid)) 709 else: 710 # No target groups, need to lock all of them 711 lock_groups = locking.ALL_SET 712 713 self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups 714 715 elif level == locking.LEVEL_NODE: 716 # This will only lock the nodes in the group to be evacuated which 717 # contain actual instances 718 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 719 self._LockInstancesNodes() 720 721 # Lock all nodes in group to be evacuated and target groups 722 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 723 assert self.group_uuid in owned_groups 724 member_node_uuids = [node_uuid 725 for group in owned_groups 726 for node_uuid in 727 self.cfg.GetNodeGroup(group).members] 728 self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
729
730 - def CheckPrereq(self):
731 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) 732 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 733 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE)) 734 735 assert owned_groups.issuperset(self.req_target_uuids) 736 assert self.group_uuid in owned_groups 737 738 # Check if locked instances are still correct 739 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names) 740 741 # Get instance information 742 self.instances = \ 743 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names)) 744 745 # Check if node groups for locked instances are still correct 746 CheckInstancesNodeGroups(self.cfg, self.instances, 747 owned_groups, owned_node_uuids, self.group_uuid) 748 749 if self.req_target_uuids: 750 # User requested specific target groups 751 self.target_uuids = self.req_target_uuids 752 else: 753 # All groups except the one to be evacuated are potential targets 754 self.target_uuids = [group_uuid for group_uuid in owned_groups 755 if group_uuid != self.group_uuid] 756 757 if not self.target_uuids: 758 raise errors.OpPrereqError("There are no possible target groups", 759 errors.ECODE_INVAL)
760
761 - def BuildHooksEnv(self):
762 """Build hooks env. 763 764 """ 765 return { 766 "GROUP_NAME": self.op.group_name, 767 "TARGET_GROUPS": " ".join(self.target_uuids), 768 }
769
770 - def BuildHooksNodes(self):
771 """Build hooks nodes. 772 773 """ 774 mn = self.cfg.GetMasterNode() 775 776 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 777 778 run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members 779 780 return (run_nodes, run_nodes)
781 782 @staticmethod
783 - def _MigrateToFailover(op):
784 """Return an equivalent failover opcode for a migrate one. 785 786 If the argument is not a failover opcode, return it unchanged. 787 788 """ 789 if not isinstance(op, opcodes.OpInstanceMigrate): 790 return op 791 else: 792 return opcodes.OpInstanceFailover( 793 instance_name=op.instance_name, 794 instance_uuid=getattr(op, "instance_uuid", None), 795 target_node=getattr(op, "target_node", None), 796 target_node_uuid=getattr(op, "target_node_uuid", None), 797 ignore_ipolicy=op.ignore_ipolicy, 798 cleanup=op.cleanup)
799
800 - def Exec(self, feedback_fn):
801 inst_names = list(self.owned_locks(locking.LEVEL_INSTANCE)) 802 803 assert self.group_uuid not in self.target_uuids 804 805 req = iallocator.IAReqGroupChange(instances=inst_names, 806 target_groups=self.target_uuids) 807 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 808 809 ial.Run(self.op.iallocator) 810 811 if not ial.success: 812 raise errors.OpPrereqError("Can't compute group evacuation using" 813 " iallocator '%s': %s" % 814 (self.op.iallocator, ial.info), 815 errors.ECODE_NORES) 816 817 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False) 818 819 self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s", 820 len(jobs), self.op.group_name) 821 822 if self.op.force_failover: 823 self.LogInfo("Will insist on failovers") 824 jobs = [[self._MigrateToFailover(op) for op in job] for job in jobs] 825 826 if self.op.sequential: 827 self.LogInfo("Jobs will be submitted to run sequentially") 828 for job in jobs[1:]: 829 for op in job: 830 op.depends = [(-1, ["error", "success"])] 831 832 return ResultWithJobs(jobs)
833
834 835 -class LUGroupVerifyDisks(NoHooksLU):
836 """Verifies the status of all disks in a node group. 837 838 """ 839 REQ_BGL = False 840
841 - def ExpandNames(self):
842 # Raises errors.OpPrereqError on its own if group can't be found 843 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 844 845 self.share_locks = ShareAll() 846 self.needed_locks = { 847 locking.LEVEL_INSTANCE: [], 848 locking.LEVEL_NODEGROUP: [], 849 locking.LEVEL_NODE: [], 850 } 851 self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True 852 self.dont_collate_locks[locking.LEVEL_NODE] = True
853
854 - def DeclareLocks(self, level):
855 if level == locking.LEVEL_INSTANCE: 856 assert not self.needed_locks[locking.LEVEL_INSTANCE] 857 858 # Lock instances optimistically, needs verification once node and group 859 # locks have been acquired 860 self.needed_locks[locking.LEVEL_INSTANCE] = \ 861 self.cfg.GetInstanceNames( 862 self.cfg.GetNodeGroupInstances(self.group_uuid)) 863 864 elif level == locking.LEVEL_NODEGROUP: 865 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 866 867 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 868 set([self.group_uuid] + 869 # Lock all groups used by instances optimistically; this requires 870 # going via the node before it's locked, requiring verification 871 # later on 872 [group_uuid 873 for instance_name in self.owned_locks(locking.LEVEL_INSTANCE) 874 for group_uuid in 875 self.cfg.GetInstanceNodeGroups( 876 self.cfg.GetInstanceInfoByName(instance_name).uuid)]) 877 878 elif level == locking.LEVEL_NODE: 879 # This will only lock the nodes in the group to be verified which contain 880 # actual instances 881 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 882 self._LockInstancesNodes() 883 884 # Lock all nodes in group to be verified 885 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 886 member_node_uuids = self.cfg.GetNodeGroup(self.group_uuid).members 887 self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
888
889 - def CheckPrereq(self):
890 owned_inst_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) 891 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 892 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE)) 893 894 assert self.group_uuid in owned_groups 895 896 # Check if locked instances are still correct 897 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_inst_names) 898 899 # Get instance information 900 self.instances = dict(self.cfg.GetMultiInstanceInfoByName(owned_inst_names)) 901 902 # Check if node groups for locked instances are still correct 903 CheckInstancesNodeGroups(self.cfg, self.instances, 904 owned_groups, owned_node_uuids, self.group_uuid)
905
906 - def _VerifyInstanceLvs(self, node_errors, offline_disk_instance_names, 907 missing_disks):
908 node_lv_to_inst = MapInstanceLvsToNodes( 909 self.cfg, 910 [inst for inst in self.instances.values() if inst.disks_active]) 911 if node_lv_to_inst: 912 node_uuids = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) & 913 set(self.cfg.GetVmCapableNodeList())) 914 915 node_lvs = self.rpc.call_lv_list(node_uuids, []) 916 917 for (node_uuid, node_res) in node_lvs.items(): 918 if node_res.offline: 919 continue 920 921 msg = node_res.fail_msg 922 if msg: 923 logging.warning("Error enumerating LVs on node %s: %s", 924 self.cfg.GetNodeName(node_uuid), msg) 925 node_errors[node_uuid] = msg 926 continue 927 928 for lv_name, (_, _, lv_online) in node_res.payload.items(): 929 inst = node_lv_to_inst.pop((node_uuid, lv_name), None) 930 if not lv_online and inst is not None: 931 offline_disk_instance_names.add(inst.name) 932 933 # any leftover items in nv_dict are missing LVs, let's arrange the data 934 # better 935 for key, inst in node_lv_to_inst.iteritems(): 936 missing_disks.setdefault(inst.name, []).append(list(key))
937
938 - def _VerifyDrbdStates(self, node_errors, offline_disk_instance_names):
939 node_to_inst = {} 940 for inst in self.instances.values(): 941 disks = self.cfg.GetInstanceDisks(inst.uuid) 942 if not (inst.disks_active and 943 utils.AnyDiskOfType(disks, [constants.DT_DRBD8])): 944 continue 945 946 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(inst.uuid) 947 for node_uuid in itertools.chain([inst.primary_node], 948 secondary_nodes): 949 node_to_inst.setdefault(node_uuid, []).append(inst) 950 951 for (node_uuid, insts) in node_to_inst.items(): 952 node_disks = [(self.cfg.GetInstanceDisks(inst.uuid), inst) 953 for inst in insts] 954 node_res = self.rpc.call_drbd_needs_activation(node_uuid, node_disks) 955 msg = node_res.fail_msg 956 if msg: 957 logging.warning("Error getting DRBD status on node %s: %s", 958 self.cfg.GetNodeName(node_uuid), msg) 959 node_errors[node_uuid] = msg 960 continue 961 962 faulty_disk_uuids = set(node_res.payload) 963 for inst in self.instances.values(): 964 disks = self.cfg.GetInstanceDisks(inst.uuid) 965 inst_disk_uuids = set([disk.uuid for disk in disks]) 966 if inst_disk_uuids.intersection(faulty_disk_uuids): 967 offline_disk_instance_names.add(inst.name)
968
969 - def Exec(self, feedback_fn):
970 """Verify integrity of cluster disks. 971 972 @rtype: tuple of three items 973 @return: a tuple of (dict of node-to-node_error, list of instances 974 which need activate-disks, dict of instance: (node, volume) for 975 missing volumes 976 977 """ 978 node_errors = {} 979 offline_disk_instance_names = set() 980 missing_disks = {} 981 982 self._VerifyInstanceLvs(node_errors, offline_disk_instance_names, 983 missing_disks) 984 self._VerifyDrbdStates(node_errors, offline_disk_instance_names) 985 986 return (node_errors, list(offline_disk_instance_names), missing_disks)
987