1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with node groups."""
32
33 import itertools
34 import logging
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import objects
40 from ganeti import opcodes
41 from ganeti import utils
42 from ganeti.masterd import iallocator
43 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
44 from ganeti.cmdlib.common import MergeAndVerifyHvState, \
45 MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \
46 CheckNodeGroupInstances, GetUpdatedIPolicy, \
47 ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \
48 CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceLvsToNodes, \
49 CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
50 CheckDiskAccessModeConsistency
51
52 import ganeti.masterd.instance
56 """Logical unit for creating node groups.
57
58 """
59 HPATH = "group-add"
60 HTYPE = constants.HTYPE_GROUP
61 REQ_BGL = False
62
64
65
66
67 self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
68 self.needed_locks = {}
69 self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
70
85
131
133 """Build hooks env.
134
135 """
136 return {
137 "GROUP_NAME": self.op.group_name,
138 }
139
141 """Build hooks nodes.
142
143 """
144 mn = self.cfg.GetMasterNode()
145 return ([mn], [mn])
146
147 - def Exec(self, feedback_fn):
148 """Add the node group to the cluster.
149
150 """
151 group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
152 uuid=self.group_uuid,
153 alloc_policy=self.op.alloc_policy,
154 ndparams=self.op.ndparams,
155 diskparams=self.new_diskparams,
156 ipolicy=self.op.ipolicy,
157 hv_state_static=self.new_hv_state,
158 disk_state_static=self.new_disk_state)
159
160 self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
161 del self.remove_locks[locking.LEVEL_NODEGROUP]
162
165 """Logical unit for assigning nodes to groups.
166
167 """
168 REQ_BGL = False
169
171
172 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
173 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
174
175
176
177
178 self.needed_locks = {
179 locking.LEVEL_NODEGROUP: set([self.group_uuid]),
180 locking.LEVEL_NODE: self.op.node_uuids,
181 }
182
184 if level == locking.LEVEL_NODEGROUP:
185 assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
186
187
188
189 groups = self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids)
190
191 self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
192
194 """Check prerequisites.
195
196 """
197 assert self.needed_locks[locking.LEVEL_NODEGROUP]
198 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
199 frozenset(self.op.node_uuids))
200
201 expected_locks = (set([self.group_uuid]) |
202 self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids))
203 actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP)
204 if actual_locks != expected_locks:
205 raise errors.OpExecError("Nodes changed groups since locks were acquired,"
206 " current groups are '%s', used to be '%s'" %
207 (utils.CommaJoin(expected_locks),
208 utils.CommaJoin(actual_locks)))
209
210 self.node_data = self.cfg.GetAllNodesInfo()
211 self.group = self.cfg.GetNodeGroup(self.group_uuid)
212 instance_data = self.cfg.GetAllInstancesInfo()
213
214 if self.group is None:
215 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
216 (self.op.group_name, self.group_uuid))
217
218 (new_splits, previous_splits) = \
219 self.CheckAssignmentForSplitInstances([(uuid, self.group_uuid)
220 for uuid in self.op.node_uuids],
221 self.node_data, instance_data)
222
223 if new_splits:
224 fmt_new_splits = utils.CommaJoin(utils.NiceSort(
225 self.cfg.GetInstanceNames(new_splits)))
226
227 if not self.op.force:
228 raise errors.OpExecError("The following instances get split by this"
229 " change and --force was not given: %s" %
230 fmt_new_splits)
231 else:
232 self.LogWarning("This operation will split the following instances: %s",
233 fmt_new_splits)
234
235 if previous_splits:
236 self.LogWarning("In addition, these already-split instances continue"
237 " to be split across groups: %s",
238 utils.CommaJoin(utils.NiceSort(
239 self.cfg.GetInstanceNames(previous_splits))))
240
241 - def Exec(self, feedback_fn):
242 """Assign nodes to a new group.
243
244 """
245 mods = [(node_uuid, self.group_uuid) for node_uuid in self.op.node_uuids]
246
247 self.cfg.AssignGroupNodes(mods)
248
249 @staticmethod
251 """Check for split instances after a node assignment.
252
253 This method considers a series of node assignments as an atomic operation,
254 and returns information about split instances after applying the set of
255 changes.
256
257 In particular, it returns information about newly split instances, and
258 instances that were already split, and remain so after the change.
259
260 Only instances whose disk template is listed in constants.DTS_INT_MIRROR are
261 considered.
262
263 @type changes: list of (node_uuid, new_group_uuid) pairs.
264 @param changes: list of node assignments to consider.
265 @param node_data: a dict with data for all nodes
266 @param instance_data: a dict with all instances to consider
267 @rtype: a two-tuple
268 @return: a list of instances that were previously okay and result split as a
269 consequence of this change, and a list of instances that were previously
270 split and this change does not fix.
271
272 """
273 changed_nodes = dict((uuid, group) for uuid, group in changes
274 if node_data[uuid].group != group)
275
276 all_split_instances = set()
277 previously_split_instances = set()
278
279 for inst in instance_data.values():
280 if inst.disk_template not in constants.DTS_INT_MIRROR:
281 continue
282
283 if len(set(node_data[node_uuid].group
284 for node_uuid in inst.all_nodes)) > 1:
285 previously_split_instances.add(inst.uuid)
286
287 if len(set(changed_nodes.get(node_uuid, node_data[node_uuid].group)
288 for node_uuid in inst.all_nodes)) > 1:
289 all_split_instances.add(inst.uuid)
290
291 return (list(all_split_instances - previously_split_instances),
292 list(previously_split_instances & all_split_instances))
293
296 """Modifies the parameters of a node group.
297
298 """
299 HPATH = "group-modify"
300 HTYPE = constants.HTYPE_GROUP
301 REQ_BGL = False
302
304 all_changes = [
305 self.op.ndparams,
306 self.op.diskparams,
307 self.op.alloc_policy,
308 self.op.hv_state,
309 self.op.disk_state,
310 self.op.ipolicy,
311 ]
312
313 if all_changes.count(None) == len(all_changes):
314 raise errors.OpPrereqError("Please pass at least one modification",
315 errors.ECODE_INVAL)
316
317 if self.op.diskparams:
318 CheckDiskAccessModeValidity(self.op.diskparams)
319
321
322 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
323
324 self.needed_locks = {
325 locking.LEVEL_INSTANCE: [],
326 locking.LEVEL_NODEGROUP: [self.group_uuid],
327 }
328
329 self.share_locks[locking.LEVEL_INSTANCE] = 1
330
340
341 @staticmethod
349
380
382 """Check prerequisites.
383
384 """
385 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
386
387
388 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
389
390 self.group = self.cfg.GetNodeGroup(self.group_uuid)
391 cluster = self.cfg.GetClusterInfo()
392
393 if self.group is None:
394 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
395 (self.op.group_name, self.group_uuid))
396
397 if self.op.ndparams:
398 new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams)
399 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
400 self.new_ndparams = new_ndparams
401
402 if self.op.diskparams:
403 diskparams = self.group.diskparams
404 uavdp = self._UpdateAndVerifyDiskParams
405
406 new_diskparams = dict((dt,
407 uavdp(diskparams.get(dt, {}),
408 self.op.diskparams[dt]))
409 for dt in constants.DISK_TEMPLATES
410 if dt in self.op.diskparams)
411
412
413 self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
414
415 try:
416 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
417 CheckDiskAccessModeConsistency(self.new_diskparams, self.cfg,
418 group=self.group)
419 except errors.OpPrereqError, err:
420 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
421 errors.ECODE_INVAL)
422
423 if self.op.hv_state:
424 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
425 self.group.hv_state_static)
426
427 if self.op.disk_state:
428 self.new_disk_state = \
429 MergeAndVerifyDiskState(self.op.disk_state,
430 self.group.disk_state_static)
431
432 self._CheckIpolicy(cluster, owned_instance_names)
433
435 """Build hooks env.
436
437 """
438 return {
439 "GROUP_NAME": self.op.group_name,
440 "NEW_ALLOC_POLICY": self.op.alloc_policy,
441 }
442
444 """Build hooks nodes.
445
446 """
447 mn = self.cfg.GetMasterNode()
448 return ([mn], [mn])
449
450 - def Exec(self, feedback_fn):
451 """Modifies the node group.
452
453 """
454 result = []
455
456 if self.op.ndparams:
457 self.group.ndparams = self.new_ndparams
458 result.append(("ndparams", str(self.group.ndparams)))
459
460 if self.op.diskparams:
461 self.group.diskparams = self.new_diskparams
462 result.append(("diskparams", str(self.group.diskparams)))
463
464 if self.op.alloc_policy:
465 self.group.alloc_policy = self.op.alloc_policy
466
467 if self.op.hv_state:
468 self.group.hv_state_static = self.new_hv_state
469
470 if self.op.disk_state:
471 self.group.disk_state_static = self.new_disk_state
472
473 if self.op.ipolicy:
474 self.group.ipolicy = self.new_ipolicy
475
476 self.cfg.Update(self.group, feedback_fn)
477 return result
478
481 HPATH = "group-remove"
482 HTYPE = constants.HTYPE_GROUP
483 REQ_BGL = False
484
486
487 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
488 self.needed_locks = {
489 locking.LEVEL_NODEGROUP: [self.group_uuid],
490 }
491
493 """Check prerequisites.
494
495 This checks that the given group name exists as a node group, that is
496 empty (i.e., contains no nodes), and that is not the last group of the
497 cluster.
498
499 """
500
501 group_nodes = [node.uuid
502 for node in self.cfg.GetAllNodesInfo().values()
503 if node.group == self.group_uuid]
504
505 if group_nodes:
506 raise errors.OpPrereqError("Group '%s' not empty, has the following"
507 " nodes: %s" %
508 (self.op.group_name,
509 utils.CommaJoin(utils.NiceSort(group_nodes))),
510 errors.ECODE_STATE)
511
512
513 if len(self.cfg.GetNodeGroupList()) == 1:
514 raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
515 " removed" % self.op.group_name,
516 errors.ECODE_STATE)
517
519 """Build hooks env.
520
521 """
522 return {
523 "GROUP_NAME": self.op.group_name,
524 }
525
527 """Build hooks nodes.
528
529 """
530 mn = self.cfg.GetMasterNode()
531 return ([mn], [mn])
532
533 - def Exec(self, feedback_fn):
534 """Remove the node group.
535
536 """
537 try:
538 self.cfg.RemoveNodeGroup(self.group_uuid)
539 except errors.ConfigurationError:
540 raise errors.OpExecError("Group '%s' with UUID %s disappeared" %
541 (self.op.group_name, self.group_uuid))
542
543 self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
544
547 HPATH = "group-rename"
548 HTYPE = constants.HTYPE_GROUP
549 REQ_BGL = False
550
552
553 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
554
555 self.needed_locks = {
556 locking.LEVEL_NODEGROUP: [self.group_uuid],
557 }
558
560 """Check prerequisites.
561
562 Ensures requested new name is not yet used.
563
564 """
565 try:
566 new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name)
567 except errors.OpPrereqError:
568 pass
569 else:
570 raise errors.OpPrereqError("Desired new name '%s' clashes with existing"
571 " node group (UUID: %s)" %
572 (self.op.new_name, new_name_uuid),
573 errors.ECODE_EXISTS)
574
576 """Build hooks env.
577
578 """
579 return {
580 "OLD_NAME": self.op.group_name,
581 "NEW_NAME": self.op.new_name,
582 }
583
598
599 - def Exec(self, feedback_fn):
600 """Rename the node group.
601
602 """
603 group = self.cfg.GetNodeGroup(self.group_uuid)
604
605 if group is None:
606 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
607 (self.op.group_name, self.group_uuid))
608
609 group.name = self.op.new_name
610 self.cfg.Update(group, feedback_fn)
611
612 return self.op.new_name
613
616 HPATH = "group-evacuate"
617 HTYPE = constants.HTYPE_GROUP
618 REQ_BGL = False
619
621
622 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
623
624 if self.op.target_groups:
625 self.req_target_uuids = map(self.cfg.LookupNodeGroup,
626 self.op.target_groups)
627 else:
628 self.req_target_uuids = []
629
630 if self.group_uuid in self.req_target_uuids:
631 raise errors.OpPrereqError("Group to be evacuated (%s) can not be used"
632 " as a target group (targets are %s)" %
633 (self.group_uuid,
634 utils.CommaJoin(self.req_target_uuids)),
635 errors.ECODE_INVAL)
636
637 self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator)
638
639 self.share_locks = ShareAll()
640 self.needed_locks = {
641 locking.LEVEL_INSTANCE: [],
642 locking.LEVEL_NODEGROUP: [],
643 locking.LEVEL_NODE: [],
644 }
645
647 if level == locking.LEVEL_INSTANCE:
648 assert not self.needed_locks[locking.LEVEL_INSTANCE]
649
650
651
652 self.needed_locks[locking.LEVEL_INSTANCE] = \
653 self.cfg.GetInstanceNames(
654 self.cfg.GetNodeGroupInstances(self.group_uuid))
655
656 elif level == locking.LEVEL_NODEGROUP:
657 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
658
659 if self.req_target_uuids:
660 lock_groups = set([self.group_uuid] + self.req_target_uuids)
661
662
663
664 lock_groups.update(group_uuid
665 for instance_name in
666 self.owned_locks(locking.LEVEL_INSTANCE)
667 for group_uuid in
668 self.cfg.GetInstanceNodeGroups(
669 self.cfg.GetInstanceInfoByName(instance_name)
670 .uuid))
671 else:
672
673 lock_groups = locking.ALL_SET
674
675 self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
676
677 elif level == locking.LEVEL_NODE:
678
679
680 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
681 self._LockInstancesNodes()
682
683
684 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
685 assert self.group_uuid in owned_groups
686 member_node_uuids = [node_uuid
687 for group in owned_groups
688 for node_uuid in
689 self.cfg.GetNodeGroup(group).members]
690 self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
691
693 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
694 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
695 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
696
697 assert owned_groups.issuperset(self.req_target_uuids)
698 assert self.group_uuid in owned_groups
699
700
701 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
702
703
704 self.instances = \
705 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names))
706
707
708 CheckInstancesNodeGroups(self.cfg, self.instances,
709 owned_groups, owned_node_uuids, self.group_uuid)
710
711 if self.req_target_uuids:
712
713 self.target_uuids = self.req_target_uuids
714 else:
715
716 self.target_uuids = [group_uuid for group_uuid in owned_groups
717 if group_uuid != self.group_uuid]
718
719 if not self.target_uuids:
720 raise errors.OpPrereqError("There are no possible target groups",
721 errors.ECODE_INVAL)
722
724 """Build hooks env.
725
726 """
727 return {
728 "GROUP_NAME": self.op.group_name,
729 "TARGET_GROUPS": " ".join(self.target_uuids),
730 }
731
733 """Build hooks nodes.
734
735 """
736 mn = self.cfg.GetMasterNode()
737
738 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
739
740 run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
741
742 return (run_nodes, run_nodes)
743
744 @staticmethod
746 """Return an equivalent failover opcode for a migrate one.
747
748 If the argument is not a failover opcode, return it unchanged.
749
750 """
751 if not isinstance(op, opcodes.OpInstanceMigrate):
752 return op
753 else:
754 return opcodes.OpInstanceFailover(
755 instance_name=op.instance_name,
756 instance_uuid=getattr(op, "instance_uuid", None),
757 target_node=getattr(op, "target_node", None),
758 target_node_uuid=getattr(op, "target_node_uuid", None),
759 ignore_ipolicy=op.ignore_ipolicy,
760 cleanup=op.cleanup)
761
762 - def Exec(self, feedback_fn):
763 inst_names = list(self.owned_locks(locking.LEVEL_INSTANCE))
764
765 assert self.group_uuid not in self.target_uuids
766
767 req = iallocator.IAReqGroupChange(instances=inst_names,
768 target_groups=self.target_uuids)
769 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
770
771 ial.Run(self.op.iallocator)
772
773 if not ial.success:
774 raise errors.OpPrereqError("Can't compute group evacuation using"
775 " iallocator '%s': %s" %
776 (self.op.iallocator, ial.info),
777 errors.ECODE_NORES)
778
779 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
780
781 self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s",
782 len(jobs), self.op.group_name)
783
784 if self.op.force_failover:
785 self.LogInfo("Will insist on failovers")
786 jobs = [[self._MigrateToFailover(op) for op in job] for job in jobs]
787
788 if self.op.sequential:
789 self.LogInfo("Jobs will be submitted to run sequentially")
790 for job in jobs[1:]:
791 for op in job:
792 op.depends = [(-1, ["error", "success"])]
793
794 return ResultWithJobs(jobs)
795
798 """Verifies the status of all disks in a node group.
799
800 """
801 REQ_BGL = False
802
819
854
856 owned_inst_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
857 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
858 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
859
860 assert self.group_uuid in owned_groups
861
862
863 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_inst_names)
864
865
866 self.instances = dict(self.cfg.GetMultiInstanceInfoByName(owned_inst_names))
867
868
869 CheckInstancesNodeGroups(self.cfg, self.instances,
870 owned_groups, owned_node_uuids, self.group_uuid)
871
872 - def _VerifyInstanceLvs(self, node_errors, offline_disk_instance_names,
873 missing_disks):
874 node_lv_to_inst = MapInstanceLvsToNodes(
875 [inst for inst in self.instances.values() if inst.disks_active])
876 if node_lv_to_inst:
877 node_uuids = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
878 set(self.cfg.GetVmCapableNodeList()))
879
880 node_lvs = self.rpc.call_lv_list(node_uuids, [])
881
882 for (node_uuid, node_res) in node_lvs.items():
883 if node_res.offline:
884 continue
885
886 msg = node_res.fail_msg
887 if msg:
888 logging.warning("Error enumerating LVs on node %s: %s",
889 self.cfg.GetNodeName(node_uuid), msg)
890 node_errors[node_uuid] = msg
891 continue
892
893 for lv_name, (_, _, lv_online) in node_res.payload.items():
894 inst = node_lv_to_inst.pop((node_uuid, lv_name), None)
895 if not lv_online and inst is not None:
896 offline_disk_instance_names.add(inst.name)
897
898
899
900 for key, inst in node_lv_to_inst.iteritems():
901 missing_disks.setdefault(inst.name, []).append(list(key))
902
904 node_to_inst = {}
905 for inst in self.instances.values():
906 if not inst.disks_active or inst.disk_template != constants.DT_DRBD8:
907 continue
908
909 for node_uuid in itertools.chain([inst.primary_node],
910 inst.secondary_nodes):
911 node_to_inst.setdefault(node_uuid, []).append(inst)
912
913 for (node_uuid, insts) in node_to_inst.items():
914 node_disks = [(inst.disks, inst) for inst in insts]
915 node_res = self.rpc.call_drbd_needs_activation(node_uuid, node_disks)
916 msg = node_res.fail_msg
917 if msg:
918 logging.warning("Error getting DRBD status on node %s: %s",
919 self.cfg.GetNodeName(node_uuid), msg)
920 node_errors[node_uuid] = msg
921 continue
922
923 faulty_disk_uuids = set(node_res.payload)
924 for inst in self.instances.values():
925 inst_disk_uuids = set([disk.uuid for disk in inst.disks])
926 if inst_disk_uuids.intersection(faulty_disk_uuids):
927 offline_disk_instance_names.add(inst.name)
928
929 - def Exec(self, feedback_fn):
930 """Verify integrity of cluster disks.
931
932 @rtype: tuple of three items
933 @return: a tuple of (dict of node-to-node_error, list of instances
934 which need activate-disks, dict of instance: (node, volume) for
935 missing volumes
936
937 """
938 node_errors = {}
939 offline_disk_instance_names = set()
940 missing_disks = {}
941
942 self._VerifyInstanceLvs(node_errors, offline_disk_instance_names,
943 missing_disks)
944 self._VerifyDrbdStates(node_errors, offline_disk_instance_names)
945
946 return (node_errors, list(offline_disk_instance_names), missing_disks)
947