Package ganeti :: Package cmdlib :: Module group
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.group

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Logical units dealing with node groups.""" 
 32   
 33  import itertools 
 34  import logging 
 35   
 36  from ganeti import constants 
 37  from ganeti import errors 
 38  from ganeti import locking 
 39  from ganeti import objects 
 40  from ganeti import opcodes 
 41  from ganeti import utils 
 42  from ganeti.masterd import iallocator 
 43  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs 
 44  from ganeti.cmdlib.common import MergeAndVerifyHvState, \ 
 45    MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \ 
 46    CheckNodeGroupInstances, GetUpdatedIPolicy, \ 
 47    ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \ 
 48    CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceLvsToNodes, \ 
 49    CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \ 
 50    CheckDiskAccessModeConsistency 
 51   
 52  import ganeti.masterd.instance 
53 54 55 -class LUGroupAdd(LogicalUnit):
56 """Logical unit for creating node groups. 57 58 """ 59 HPATH = "group-add" 60 HTYPE = constants.HTYPE_GROUP 61 REQ_BGL = False 62
63 - def ExpandNames(self):
64 # We need the new group's UUID here so that we can create and acquire the 65 # corresponding lock. Later, in Exec(), we'll indicate to cfg.AddNodeGroup 66 # that it should not check whether the UUID exists in the configuration. 67 self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) 68 self.needed_locks = {} 69 self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
70
71 - def _CheckIpolicy(self):
72 """Checks the group's ipolicy for consistency and validity. 73 74 """ 75 if self.op.ipolicy: 76 cluster = self.cfg.GetClusterInfo() 77 full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy) 78 try: 79 objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False) 80 except errors.ConfigurationError, err: 81 raise errors.OpPrereqError("Invalid instance policy: %s" % err, 82 errors.ECODE_INVAL) 83 CheckIpolicyVsDiskTemplates(full_ipolicy, 84 cluster.enabled_disk_templates)
85
86 - def CheckPrereq(self):
87 """Check prerequisites. 88 89 This checks that the given group name is not an existing node group 90 already. 91 92 """ 93 try: 94 existing_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 95 except errors.OpPrereqError: 96 pass 97 else: 98 raise errors.OpPrereqError("Desired group name '%s' already exists as a" 99 " node group (UUID: %s)" % 100 (self.op.group_name, existing_uuid), 101 errors.ECODE_EXISTS) 102 103 if self.op.ndparams: 104 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 105 106 if self.op.hv_state: 107 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) 108 else: 109 self.new_hv_state = None 110 111 if self.op.disk_state: 112 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) 113 else: 114 self.new_disk_state = None 115 116 if self.op.diskparams: 117 for templ in constants.DISK_TEMPLATES: 118 if templ in self.op.diskparams: 119 utils.ForceDictType(self.op.diskparams[templ], 120 constants.DISK_DT_TYPES) 121 self.new_diskparams = self.op.diskparams 122 try: 123 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS) 124 except errors.OpPrereqError, err: 125 raise errors.OpPrereqError("While verify diskparams options: %s" % err, 126 errors.ECODE_INVAL) 127 else: 128 self.new_diskparams = {} 129 130 self._CheckIpolicy()
131
132 - def BuildHooksEnv(self):
133 """Build hooks env. 134 135 """ 136 return { 137 "GROUP_NAME": self.op.group_name, 138 }
139
140 - def BuildHooksNodes(self):
141 """Build hooks nodes. 142 143 """ 144 mn = self.cfg.GetMasterNode() 145 return ([mn], [mn])
146
147 - def Exec(self, feedback_fn):
148 """Add the node group to the cluster. 149 150 """ 151 group_obj = objects.NodeGroup(name=self.op.group_name, members=[], 152 uuid=self.group_uuid, 153 alloc_policy=self.op.alloc_policy, 154 ndparams=self.op.ndparams, 155 diskparams=self.new_diskparams, 156 ipolicy=self.op.ipolicy, 157 hv_state_static=self.new_hv_state, 158 disk_state_static=self.new_disk_state) 159 160 self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False) 161 del self.remove_locks[locking.LEVEL_NODEGROUP]
162
163 164 -class LUGroupAssignNodes(NoHooksLU):
165 """Logical unit for assigning nodes to groups. 166 167 """ 168 REQ_BGL = False 169
170 - def ExpandNames(self):
171 # These raise errors.OpPrereqError on their own: 172 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 173 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes) 174 175 # We want to lock all the affected nodes and groups. We have readily 176 # available the list of nodes, and the *destination* group. To gather the 177 # list of "source" groups, we need to fetch node information later on. 178 self.needed_locks = { 179 locking.LEVEL_NODEGROUP: set([self.group_uuid]), 180 locking.LEVEL_NODE: self.op.node_uuids, 181 }
182
183 - def DeclareLocks(self, level):
184 if level == locking.LEVEL_NODEGROUP: 185 assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1 186 187 # Try to get all affected nodes' groups without having the group or node 188 # lock yet. Needs verification later in the code flow. 189 groups = self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids) 190 191 self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
192
193 - def CheckPrereq(self):
194 """Check prerequisites. 195 196 """ 197 assert self.needed_locks[locking.LEVEL_NODEGROUP] 198 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == 199 frozenset(self.op.node_uuids)) 200 201 expected_locks = (set([self.group_uuid]) | 202 self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids)) 203 actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP) 204 if actual_locks != expected_locks: 205 raise errors.OpExecError("Nodes changed groups since locks were acquired," 206 " current groups are '%s', used to be '%s'" % 207 (utils.CommaJoin(expected_locks), 208 utils.CommaJoin(actual_locks))) 209 210 self.node_data = self.cfg.GetAllNodesInfo() 211 self.group = self.cfg.GetNodeGroup(self.group_uuid) 212 instance_data = self.cfg.GetAllInstancesInfo() 213 214 if self.group is None: 215 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % 216 (self.op.group_name, self.group_uuid)) 217 218 (new_splits, previous_splits) = \ 219 self.CheckAssignmentForSplitInstances([(uuid, self.group_uuid) 220 for uuid in self.op.node_uuids], 221 self.node_data, instance_data) 222 223 if new_splits: 224 fmt_new_splits = utils.CommaJoin(utils.NiceSort( 225 self.cfg.GetInstanceNames(new_splits))) 226 227 if not self.op.force: 228 raise errors.OpExecError("The following instances get split by this" 229 " change and --force was not given: %s" % 230 fmt_new_splits) 231 else: 232 self.LogWarning("This operation will split the following instances: %s", 233 fmt_new_splits) 234 235 if previous_splits: 236 self.LogWarning("In addition, these already-split instances continue" 237 " to be split across groups: %s", 238 utils.CommaJoin(utils.NiceSort( 239 self.cfg.GetInstanceNames(previous_splits))))
240
241 - def Exec(self, feedback_fn):
242 """Assign nodes to a new group. 243 244 """ 245 mods = [(node_uuid, self.group_uuid) for node_uuid in self.op.node_uuids] 246 247 self.cfg.AssignGroupNodes(mods)
248 249 @staticmethod
250 - def CheckAssignmentForSplitInstances(changes, node_data, instance_data):
251 """Check for split instances after a node assignment. 252 253 This method considers a series of node assignments as an atomic operation, 254 and returns information about split instances after applying the set of 255 changes. 256 257 In particular, it returns information about newly split instances, and 258 instances that were already split, and remain so after the change. 259 260 Only instances whose disk template is listed in constants.DTS_INT_MIRROR are 261 considered. 262 263 @type changes: list of (node_uuid, new_group_uuid) pairs. 264 @param changes: list of node assignments to consider. 265 @param node_data: a dict with data for all nodes 266 @param instance_data: a dict with all instances to consider 267 @rtype: a two-tuple 268 @return: a list of instances that were previously okay and result split as a 269 consequence of this change, and a list of instances that were previously 270 split and this change does not fix. 271 272 """ 273 changed_nodes = dict((uuid, group) for uuid, group in changes 274 if node_data[uuid].group != group) 275 276 all_split_instances = set() 277 previously_split_instances = set() 278 279 for inst in instance_data.values(): 280 if inst.disk_template not in constants.DTS_INT_MIRROR: 281 continue 282 283 if len(set(node_data[node_uuid].group 284 for node_uuid in inst.all_nodes)) > 1: 285 previously_split_instances.add(inst.uuid) 286 287 if len(set(changed_nodes.get(node_uuid, node_data[node_uuid].group) 288 for node_uuid in inst.all_nodes)) > 1: 289 all_split_instances.add(inst.uuid) 290 291 return (list(all_split_instances - previously_split_instances), 292 list(previously_split_instances & all_split_instances))
293
294 295 -class LUGroupSetParams(LogicalUnit):
296 """Modifies the parameters of a node group. 297 298 """ 299 HPATH = "group-modify" 300 HTYPE = constants.HTYPE_GROUP 301 REQ_BGL = False 302
303 - def CheckArguments(self):
304 all_changes = [ 305 self.op.ndparams, 306 self.op.diskparams, 307 self.op.alloc_policy, 308 self.op.hv_state, 309 self.op.disk_state, 310 self.op.ipolicy, 311 ] 312 313 if all_changes.count(None) == len(all_changes): 314 raise errors.OpPrereqError("Please pass at least one modification", 315 errors.ECODE_INVAL) 316 317 if self.op.diskparams: 318 CheckDiskAccessModeValidity(self.op.diskparams)
319
320 - def ExpandNames(self):
321 # This raises errors.OpPrereqError on its own: 322 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 323 324 self.needed_locks = { 325 locking.LEVEL_INSTANCE: [], 326 locking.LEVEL_NODEGROUP: [self.group_uuid], 327 } 328 329 self.share_locks[locking.LEVEL_INSTANCE] = 1
330
331 - def DeclareLocks(self, level):
332 if level == locking.LEVEL_INSTANCE: 333 assert not self.needed_locks[locking.LEVEL_INSTANCE] 334 335 # Lock instances optimistically, needs verification once group lock has 336 # been acquired 337 self.needed_locks[locking.LEVEL_INSTANCE] = \ 338 self.cfg.GetInstanceNames( 339 self.cfg.GetNodeGroupInstances(self.group_uuid))
340 341 @staticmethod
342 - def _UpdateAndVerifyDiskParams(old, new):
343 """Updates and verifies disk parameters. 344 345 """ 346 new_params = GetUpdatedParams(old, new) 347 utils.ForceDictType(new_params, constants.DISK_DT_TYPES) 348 return new_params
349
350 - def _CheckIpolicy(self, cluster, owned_instance_names):
351 """Sanity checks for the ipolicy. 352 353 @type cluster: C{objects.Cluster} 354 @param cluster: the cluster's configuration 355 @type owned_instance_names: list of string 356 @param owned_instance_names: list of instances 357 358 """ 359 if self.op.ipolicy: 360 self.new_ipolicy = GetUpdatedIPolicy(self.group.ipolicy, 361 self.op.ipolicy, 362 group_policy=True) 363 364 new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy) 365 CheckIpolicyVsDiskTemplates(new_ipolicy, 366 cluster.enabled_disk_templates) 367 instances = \ 368 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names)) 369 gmi = ganeti.masterd.instance 370 violations = \ 371 ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster, 372 self.group), 373 new_ipolicy, instances.values(), 374 self.cfg) 375 376 if violations: 377 self.LogWarning("After the ipolicy change the following instances" 378 " violate them: %s", 379 utils.CommaJoin(violations))
380
381 - def CheckPrereq(self):
382 """Check prerequisites. 383 384 """ 385 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) 386 387 # Check if locked instances are still correct 388 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names) 389 390 self.group = self.cfg.GetNodeGroup(self.group_uuid) 391 cluster = self.cfg.GetClusterInfo() 392 393 if self.group is None: 394 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % 395 (self.op.group_name, self.group_uuid)) 396 397 if self.op.ndparams: 398 new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams) 399 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) 400 self.new_ndparams = new_ndparams 401 402 if self.op.diskparams: 403 diskparams = self.group.diskparams 404 uavdp = self._UpdateAndVerifyDiskParams 405 # For each disktemplate subdict update and verify the values 406 new_diskparams = dict((dt, 407 uavdp(diskparams.get(dt, {}), 408 self.op.diskparams[dt])) 409 for dt in constants.DISK_TEMPLATES 410 if dt in self.op.diskparams) 411 # As we've all subdicts of diskparams ready, lets merge the actual 412 # dict with all updated subdicts 413 self.new_diskparams = objects.FillDict(diskparams, new_diskparams) 414 415 try: 416 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS) 417 CheckDiskAccessModeConsistency(self.new_diskparams, self.cfg, 418 group=self.group) 419 except errors.OpPrereqError, err: 420 raise errors.OpPrereqError("While verify diskparams options: %s" % err, 421 errors.ECODE_INVAL) 422 423 if self.op.hv_state: 424 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 425 self.group.hv_state_static) 426 427 if self.op.disk_state: 428 self.new_disk_state = \ 429 MergeAndVerifyDiskState(self.op.disk_state, 430 self.group.disk_state_static) 431 432 self._CheckIpolicy(cluster, owned_instance_names)
433
434 - def BuildHooksEnv(self):
435 """Build hooks env. 436 437 """ 438 return { 439 "GROUP_NAME": self.op.group_name, 440 "NEW_ALLOC_POLICY": self.op.alloc_policy, 441 }
442
443 - def BuildHooksNodes(self):
444 """Build hooks nodes. 445 446 """ 447 mn = self.cfg.GetMasterNode() 448 return ([mn], [mn])
449
450 - def Exec(self, feedback_fn):
451 """Modifies the node group. 452 453 """ 454 result = [] 455 456 if self.op.ndparams: 457 self.group.ndparams = self.new_ndparams 458 result.append(("ndparams", str(self.group.ndparams))) 459 460 if self.op.diskparams: 461 self.group.diskparams = self.new_diskparams 462 result.append(("diskparams", str(self.group.diskparams))) 463 464 if self.op.alloc_policy: 465 self.group.alloc_policy = self.op.alloc_policy 466 467 if self.op.hv_state: 468 self.group.hv_state_static = self.new_hv_state 469 470 if self.op.disk_state: 471 self.group.disk_state_static = self.new_disk_state 472 473 if self.op.ipolicy: 474 self.group.ipolicy = self.new_ipolicy 475 476 self.cfg.Update(self.group, feedback_fn) 477 return result
478
479 480 -class LUGroupRemove(LogicalUnit):
481 HPATH = "group-remove" 482 HTYPE = constants.HTYPE_GROUP 483 REQ_BGL = False 484
485 - def ExpandNames(self):
486 # This will raises errors.OpPrereqError on its own: 487 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 488 self.needed_locks = { 489 locking.LEVEL_NODEGROUP: [self.group_uuid], 490 }
491
492 - def CheckPrereq(self):
493 """Check prerequisites. 494 495 This checks that the given group name exists as a node group, that is 496 empty (i.e., contains no nodes), and that is not the last group of the 497 cluster. 498 499 """ 500 # Verify that the group is empty. 501 group_nodes = [node.uuid 502 for node in self.cfg.GetAllNodesInfo().values() 503 if node.group == self.group_uuid] 504 505 if group_nodes: 506 raise errors.OpPrereqError("Group '%s' not empty, has the following" 507 " nodes: %s" % 508 (self.op.group_name, 509 utils.CommaJoin(utils.NiceSort(group_nodes))), 510 errors.ECODE_STATE) 511 512 # Verify the cluster would not be left group-less. 513 if len(self.cfg.GetNodeGroupList()) == 1: 514 raise errors.OpPrereqError("Group '%s' is the only group, cannot be" 515 " removed" % self.op.group_name, 516 errors.ECODE_STATE)
517
518 - def BuildHooksEnv(self):
519 """Build hooks env. 520 521 """ 522 return { 523 "GROUP_NAME": self.op.group_name, 524 }
525
526 - def BuildHooksNodes(self):
527 """Build hooks nodes. 528 529 """ 530 mn = self.cfg.GetMasterNode() 531 return ([mn], [mn])
532
533 - def Exec(self, feedback_fn):
534 """Remove the node group. 535 536 """ 537 try: 538 self.cfg.RemoveNodeGroup(self.group_uuid) 539 except errors.ConfigurationError: 540 raise errors.OpExecError("Group '%s' with UUID %s disappeared" % 541 (self.op.group_name, self.group_uuid)) 542 543 self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
544
545 546 -class LUGroupRename(LogicalUnit):
547 HPATH = "group-rename" 548 HTYPE = constants.HTYPE_GROUP 549 REQ_BGL = False 550
551 - def ExpandNames(self):
552 # This raises errors.OpPrereqError on its own: 553 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 554 555 self.needed_locks = { 556 locking.LEVEL_NODEGROUP: [self.group_uuid], 557 }
558
559 - def CheckPrereq(self):
560 """Check prerequisites. 561 562 Ensures requested new name is not yet used. 563 564 """ 565 try: 566 new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name) 567 except errors.OpPrereqError: 568 pass 569 else: 570 raise errors.OpPrereqError("Desired new name '%s' clashes with existing" 571 " node group (UUID: %s)" % 572 (self.op.new_name, new_name_uuid), 573 errors.ECODE_EXISTS)
574
575 - def BuildHooksEnv(self):
576 """Build hooks env. 577 578 """ 579 return { 580 "OLD_NAME": self.op.group_name, 581 "NEW_NAME": self.op.new_name, 582 }
583
584 - def BuildHooksNodes(self):
585 """Build hooks nodes. 586 587 """ 588 mn = self.cfg.GetMasterNode() 589 590 all_nodes = self.cfg.GetAllNodesInfo() 591 all_nodes.pop(mn, None) 592 593 run_nodes = [mn] 594 run_nodes.extend(node.uuid for node in all_nodes.values() 595 if node.group == self.group_uuid) 596 597 return (run_nodes, run_nodes)
598
599 - def Exec(self, feedback_fn):
600 """Rename the node group. 601 602 """ 603 group = self.cfg.GetNodeGroup(self.group_uuid) 604 605 if group is None: 606 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % 607 (self.op.group_name, self.group_uuid)) 608 609 group.name = self.op.new_name 610 self.cfg.Update(group, feedback_fn) 611 612 return self.op.new_name
613
614 615 -class LUGroupEvacuate(LogicalUnit):
616 HPATH = "group-evacuate" 617 HTYPE = constants.HTYPE_GROUP 618 REQ_BGL = False 619
620 - def ExpandNames(self):
621 # This raises errors.OpPrereqError on its own: 622 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 623 624 if self.op.target_groups: 625 self.req_target_uuids = map(self.cfg.LookupNodeGroup, 626 self.op.target_groups) 627 else: 628 self.req_target_uuids = [] 629 630 if self.group_uuid in self.req_target_uuids: 631 raise errors.OpPrereqError("Group to be evacuated (%s) can not be used" 632 " as a target group (targets are %s)" % 633 (self.group_uuid, 634 utils.CommaJoin(self.req_target_uuids)), 635 errors.ECODE_INVAL) 636 637 self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator) 638 639 self.share_locks = ShareAll() 640 self.needed_locks = { 641 locking.LEVEL_INSTANCE: [], 642 locking.LEVEL_NODEGROUP: [], 643 locking.LEVEL_NODE: [], 644 }
645
646 - def DeclareLocks(self, level):
647 if level == locking.LEVEL_INSTANCE: 648 assert not self.needed_locks[locking.LEVEL_INSTANCE] 649 650 # Lock instances optimistically, needs verification once node and group 651 # locks have been acquired 652 self.needed_locks[locking.LEVEL_INSTANCE] = \ 653 self.cfg.GetInstanceNames( 654 self.cfg.GetNodeGroupInstances(self.group_uuid)) 655 656 elif level == locking.LEVEL_NODEGROUP: 657 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 658 659 if self.req_target_uuids: 660 lock_groups = set([self.group_uuid] + self.req_target_uuids) 661 662 # Lock all groups used by instances optimistically; this requires going 663 # via the node before it's locked, requiring verification later on 664 lock_groups.update(group_uuid 665 for instance_name in 666 self.owned_locks(locking.LEVEL_INSTANCE) 667 for group_uuid in 668 self.cfg.GetInstanceNodeGroups( 669 self.cfg.GetInstanceInfoByName(instance_name) 670 .uuid)) 671 else: 672 # No target groups, need to lock all of them 673 lock_groups = locking.ALL_SET 674 675 self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups 676 677 elif level == locking.LEVEL_NODE: 678 # This will only lock the nodes in the group to be evacuated which 679 # contain actual instances 680 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 681 self._LockInstancesNodes() 682 683 # Lock all nodes in group to be evacuated and target groups 684 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 685 assert self.group_uuid in owned_groups 686 member_node_uuids = [node_uuid 687 for group in owned_groups 688 for node_uuid in 689 self.cfg.GetNodeGroup(group).members] 690 self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
691
692 - def CheckPrereq(self):
693 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) 694 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 695 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE)) 696 697 assert owned_groups.issuperset(self.req_target_uuids) 698 assert self.group_uuid in owned_groups 699 700 # Check if locked instances are still correct 701 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names) 702 703 # Get instance information 704 self.instances = \ 705 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names)) 706 707 # Check if node groups for locked instances are still correct 708 CheckInstancesNodeGroups(self.cfg, self.instances, 709 owned_groups, owned_node_uuids, self.group_uuid) 710 711 if self.req_target_uuids: 712 # User requested specific target groups 713 self.target_uuids = self.req_target_uuids 714 else: 715 # All groups except the one to be evacuated are potential targets 716 self.target_uuids = [group_uuid for group_uuid in owned_groups 717 if group_uuid != self.group_uuid] 718 719 if not self.target_uuids: 720 raise errors.OpPrereqError("There are no possible target groups", 721 errors.ECODE_INVAL)
722
723 - def BuildHooksEnv(self):
724 """Build hooks env. 725 726 """ 727 return { 728 "GROUP_NAME": self.op.group_name, 729 "TARGET_GROUPS": " ".join(self.target_uuids), 730 }
731
732 - def BuildHooksNodes(self):
733 """Build hooks nodes. 734 735 """ 736 mn = self.cfg.GetMasterNode() 737 738 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 739 740 run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members 741 742 return (run_nodes, run_nodes)
743 744 @staticmethod
745 - def _MigrateToFailover(op):
746 """Return an equivalent failover opcode for a migrate one. 747 748 If the argument is not a failover opcode, return it unchanged. 749 750 """ 751 if not isinstance(op, opcodes.OpInstanceMigrate): 752 return op 753 else: 754 return opcodes.OpInstanceFailover( 755 instance_name=op.instance_name, 756 instance_uuid=getattr(op, "instance_uuid", None), 757 target_node=getattr(op, "target_node", None), 758 target_node_uuid=getattr(op, "target_node_uuid", None), 759 ignore_ipolicy=op.ignore_ipolicy, 760 cleanup=op.cleanup)
761
762 - def Exec(self, feedback_fn):
763 inst_names = list(self.owned_locks(locking.LEVEL_INSTANCE)) 764 765 assert self.group_uuid not in self.target_uuids 766 767 req = iallocator.IAReqGroupChange(instances=inst_names, 768 target_groups=self.target_uuids) 769 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 770 771 ial.Run(self.op.iallocator) 772 773 if not ial.success: 774 raise errors.OpPrereqError("Can't compute group evacuation using" 775 " iallocator '%s': %s" % 776 (self.op.iallocator, ial.info), 777 errors.ECODE_NORES) 778 779 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False) 780 781 self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s", 782 len(jobs), self.op.group_name) 783 784 if self.op.force_failover: 785 self.LogInfo("Will insist on failovers") 786 jobs = [[self._MigrateToFailover(op) for op in job] for job in jobs] 787 788 if self.op.sequential: 789 self.LogInfo("Jobs will be submitted to run sequentially") 790 for job in jobs[1:]: 791 for op in job: 792 op.depends = [(-1, ["error", "success"])] 793 794 return ResultWithJobs(jobs)
795
796 797 -class LUGroupVerifyDisks(NoHooksLU):
798 """Verifies the status of all disks in a node group. 799 800 """ 801 REQ_BGL = False 802
803 - def ExpandNames(self):
804 # Raises errors.OpPrereqError on its own if group can't be found 805 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 806 807 self.share_locks = ShareAll() 808 self.needed_locks = { 809 locking.LEVEL_INSTANCE: [], 810 locking.LEVEL_NODEGROUP: [], 811 locking.LEVEL_NODE: [], 812 813 # This opcode is acquires all node locks in a group. LUClusterVerifyDisks 814 # starts one instance of this opcode for every group, which means all 815 # nodes will be locked for a short amount of time, so it's better to 816 # acquire the node allocation lock as well. 817 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 818 }
819
820 - def DeclareLocks(self, level):
821 if level == locking.LEVEL_INSTANCE: 822 assert not self.needed_locks[locking.LEVEL_INSTANCE] 823 824 # Lock instances optimistically, needs verification once node and group 825 # locks have been acquired 826 self.needed_locks[locking.LEVEL_INSTANCE] = \ 827 self.cfg.GetInstanceNames( 828 self.cfg.GetNodeGroupInstances(self.group_uuid)) 829 830 elif level == locking.LEVEL_NODEGROUP: 831 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 832 833 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 834 set([self.group_uuid] + 835 # Lock all groups used by instances optimistically; this requires 836 # going via the node before it's locked, requiring verification 837 # later on 838 [group_uuid 839 for instance_name in self.owned_locks(locking.LEVEL_INSTANCE) 840 for group_uuid in 841 self.cfg.GetInstanceNodeGroups( 842 self.cfg.GetInstanceInfoByName(instance_name).uuid)]) 843 844 elif level == locking.LEVEL_NODE: 845 # This will only lock the nodes in the group to be verified which contain 846 # actual instances 847 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 848 self._LockInstancesNodes() 849 850 # Lock all nodes in group to be verified 851 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 852 member_node_uuids = self.cfg.GetNodeGroup(self.group_uuid).members 853 self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
854
855 - def CheckPrereq(self):
856 owned_inst_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) 857 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 858 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE)) 859 860 assert self.group_uuid in owned_groups 861 862 # Check if locked instances are still correct 863 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_inst_names) 864 865 # Get instance information 866 self.instances = dict(self.cfg.GetMultiInstanceInfoByName(owned_inst_names)) 867 868 # Check if node groups for locked instances are still correct 869 CheckInstancesNodeGroups(self.cfg, self.instances, 870 owned_groups, owned_node_uuids, self.group_uuid)
871
872 - def _VerifyInstanceLvs(self, node_errors, offline_disk_instance_names, 873 missing_disks):
874 node_lv_to_inst = MapInstanceLvsToNodes( 875 [inst for inst in self.instances.values() if inst.disks_active]) 876 if node_lv_to_inst: 877 node_uuids = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) & 878 set(self.cfg.GetVmCapableNodeList())) 879 880 node_lvs = self.rpc.call_lv_list(node_uuids, []) 881 882 for (node_uuid, node_res) in node_lvs.items(): 883 if node_res.offline: 884 continue 885 886 msg = node_res.fail_msg 887 if msg: 888 logging.warning("Error enumerating LVs on node %s: %s", 889 self.cfg.GetNodeName(node_uuid), msg) 890 node_errors[node_uuid] = msg 891 continue 892 893 for lv_name, (_, _, lv_online) in node_res.payload.items(): 894 inst = node_lv_to_inst.pop((node_uuid, lv_name), None) 895 if not lv_online and inst is not None: 896 offline_disk_instance_names.add(inst.name) 897 898 # any leftover items in nv_dict are missing LVs, let's arrange the data 899 # better 900 for key, inst in node_lv_to_inst.iteritems(): 901 missing_disks.setdefault(inst.name, []).append(list(key))
902
903 - def _VerifyDrbdStates(self, node_errors, offline_disk_instance_names):
904 node_to_inst = {} 905 for inst in self.instances.values(): 906 if not inst.disks_active or inst.disk_template != constants.DT_DRBD8: 907 continue 908 909 for node_uuid in itertools.chain([inst.primary_node], 910 inst.secondary_nodes): 911 node_to_inst.setdefault(node_uuid, []).append(inst) 912 913 for (node_uuid, insts) in node_to_inst.items(): 914 node_disks = [(inst.disks, inst) for inst in insts] 915 node_res = self.rpc.call_drbd_needs_activation(node_uuid, node_disks) 916 msg = node_res.fail_msg 917 if msg: 918 logging.warning("Error getting DRBD status on node %s: %s", 919 self.cfg.GetNodeName(node_uuid), msg) 920 node_errors[node_uuid] = msg 921 continue 922 923 faulty_disk_uuids = set(node_res.payload) 924 for inst in self.instances.values(): 925 inst_disk_uuids = set([disk.uuid for disk in inst.disks]) 926 if inst_disk_uuids.intersection(faulty_disk_uuids): 927 offline_disk_instance_names.add(inst.name)
928
929 - def Exec(self, feedback_fn):
930 """Verify integrity of cluster disks. 931 932 @rtype: tuple of three items 933 @return: a tuple of (dict of node-to-node_error, list of instances 934 which need activate-disks, dict of instance: (node, volume) for 935 missing volumes 936 937 """ 938 node_errors = {} 939 offline_disk_instance_names = set() 940 missing_disks = {} 941 942 self._VerifyInstanceLvs(node_errors, offline_disk_instance_names, 943 missing_disks) 944 self._VerifyDrbdStates(node_errors, offline_disk_instance_names) 945 946 return (node_errors, list(offline_disk_instance_names), missing_disks)
947