1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with node groups."""
32
33 import itertools
34 import logging
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import objects
40 from ganeti import opcodes
41 from ganeti import utils
42 from ganeti.masterd import iallocator
43 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
44 from ganeti.cmdlib.common import MergeAndVerifyHvState, \
45 MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \
46 CheckNodeGroupInstances, GetUpdatedIPolicy, \
47 ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \
48 CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceLvsToNodes, \
49 CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
50 CheckDiskAccessModeConsistency, ConnectInstanceCommunicationNetworkOp
51
52 import ganeti.masterd.instance
56 """Logical unit for creating node groups.
57
58 """
59 HPATH = "group-add"
60 HTYPE = constants.HTYPE_GROUP
61 REQ_BGL = False
62
64
65
66
67 self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
68 self.needed_locks = {}
69 self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
70
85
131
133 """Build hooks env.
134
135 """
136 return {
137 "GROUP_NAME": self.op.group_name,
138 }
139
141 """Build hooks nodes.
142
143 """
144 mn = self.cfg.GetMasterNode()
145 return ([mn], [mn])
146
147 @staticmethod
149 """Connect a node group to the instance communication network.
150
151 The group is connected to the instance communication network via
152 the Opcode 'OpNetworkConnect'.
153
154 @type cfg: L{ganeti.config.ConfigWriter}
155 @param cfg: Ganeti configuration
156
157 @type group_uuid: string
158 @param group_uuid: UUID of the group to connect
159
160 @type network_name: string
161 @param network_name: name of the network to connect to
162
163 @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None}
164 @return: L{ganeti.cmdlib.ResultWithJobs} if the group needs to be
165 connected, otherwise (the group is already connected)
166 L{None}
167
168 """
169 try:
170 cfg.LookupNetwork(network_name)
171 network_exists = True
172 except errors.OpPrereqError:
173 network_exists = False
174
175 if network_exists:
176 op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name)
177 return ResultWithJobs([[op]])
178 else:
179 return None
180
181 - def Exec(self, feedback_fn):
182 """Add the node group to the cluster.
183
184 """
185 group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
186 uuid=self.group_uuid,
187 alloc_policy=self.op.alloc_policy,
188 ndparams=self.op.ndparams,
189 diskparams=self.new_diskparams,
190 ipolicy=self.op.ipolicy,
191 hv_state_static=self.new_hv_state,
192 disk_state_static=self.new_disk_state)
193
194 self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
195
196 network_name = self.cfg.GetClusterInfo().instance_communication_network
197 if network_name:
198 return self._ConnectInstanceCommunicationNetwork(self.cfg,
199 self.group_uuid,
200 network_name)
201
204 """Logical unit for assigning nodes to groups.
205
206 """
207 REQ_BGL = False
208
210
211 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
212 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
213
214
215
216
217 self.needed_locks = {
218 locking.LEVEL_NODEGROUP: set([self.group_uuid]),
219 locking.LEVEL_NODE: self.op.node_uuids,
220 }
221
223 if level == locking.LEVEL_NODEGROUP:
224 assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
225
226
227
228 groups = self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids)
229
230 self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
231
233 """Check prerequisites.
234
235 """
236 assert self.needed_locks[locking.LEVEL_NODEGROUP]
237 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
238 frozenset(self.op.node_uuids))
239
240 expected_locks = (set([self.group_uuid]) |
241 self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids))
242 actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP)
243 if actual_locks != expected_locks:
244 raise errors.OpExecError("Nodes changed groups since locks were acquired,"
245 " current groups are '%s', used to be '%s'" %
246 (utils.CommaJoin(expected_locks),
247 utils.CommaJoin(actual_locks)))
248
249 self.node_data = self.cfg.GetAllNodesInfo()
250 self.group = self.cfg.GetNodeGroup(self.group_uuid)
251 instance_data = self.cfg.GetAllInstancesInfo()
252
253 if self.group is None:
254 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
255 (self.op.group_name, self.group_uuid))
256
257 (new_splits, previous_splits) = \
258 self.CheckAssignmentForSplitInstances([(uuid, self.group_uuid)
259 for uuid in self.op.node_uuids],
260 self.node_data, instance_data)
261
262 if new_splits:
263 fmt_new_splits = utils.CommaJoin(utils.NiceSort(
264 self.cfg.GetInstanceNames(new_splits)))
265
266 if not self.op.force:
267 raise errors.OpExecError("The following instances get split by this"
268 " change and --force was not given: %s" %
269 fmt_new_splits)
270 else:
271 self.LogWarning("This operation will split the following instances: %s",
272 fmt_new_splits)
273
274 if previous_splits:
275 self.LogWarning("In addition, these already-split instances continue"
276 " to be split across groups: %s",
277 utils.CommaJoin(utils.NiceSort(
278 self.cfg.GetInstanceNames(previous_splits))))
279
280 - def Exec(self, feedback_fn):
281 """Assign nodes to a new group.
282
283 """
284 mods = [(node_uuid, self.group_uuid) for node_uuid in self.op.node_uuids]
285
286 self.cfg.AssignGroupNodes(mods)
287
289 """Check for split instances after a node assignment.
290
291 This method considers a series of node assignments as an atomic operation,
292 and returns information about split instances after applying the set of
293 changes.
294
295 In particular, it returns information about newly split instances, and
296 instances that were already split, and remain so after the change.
297
298 Only disks whose template is listed in constants.DTS_INT_MIRROR are
299 considered.
300
301 @type changes: list of (node_uuid, new_group_uuid) pairs.
302 @param changes: list of node assignments to consider.
303 @param node_data: a dict with data for all nodes
304 @param instance_data: a dict with all instances to consider
305 @rtype: a two-tuple
306 @return: a list of instances that were previously okay and result split as a
307 consequence of this change, and a list of instances that were previously
308 split and this change does not fix.
309
310 """
311 changed_nodes = dict((uuid, group) for uuid, group in changes
312 if node_data[uuid].group != group)
313
314 all_split_instances = set()
315 previously_split_instances = set()
316
317 for inst in instance_data.values():
318 inst_disks = self.cfg.GetInstanceDisks(inst.uuid)
319 if not utils.AnyDiskOfType(inst_disks, constants.DTS_INT_MIRROR):
320 continue
321
322 inst_nodes = self.cfg.GetInstanceNodes(inst.uuid)
323 if len(set(node_data[node_uuid].group
324 for node_uuid in inst_nodes)) > 1:
325 previously_split_instances.add(inst.uuid)
326
327 if len(set(changed_nodes.get(node_uuid, node_data[node_uuid].group)
328 for node_uuid in inst_nodes)) > 1:
329 all_split_instances.add(inst.uuid)
330
331 return (list(all_split_instances - previously_split_instances),
332 list(previously_split_instances & all_split_instances))
333
336 """Modifies the parameters of a node group.
337
338 """
339 HPATH = "group-modify"
340 HTYPE = constants.HTYPE_GROUP
341 REQ_BGL = False
342
344 all_changes = [
345 self.op.ndparams,
346 self.op.diskparams,
347 self.op.alloc_policy,
348 self.op.hv_state,
349 self.op.disk_state,
350 self.op.ipolicy,
351 ]
352
353 if all_changes.count(None) == len(all_changes):
354 raise errors.OpPrereqError("Please pass at least one modification",
355 errors.ECODE_INVAL)
356
357 if self.op.diskparams:
358 CheckDiskAccessModeValidity(self.op.diskparams)
359
361
362 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
363
364 self.needed_locks = {
365 locking.LEVEL_INSTANCE: [],
366 locking.LEVEL_NODEGROUP: [self.group_uuid],
367 }
368
369 self.share_locks[locking.LEVEL_INSTANCE] = 1
370
380
381 @staticmethod
389
420
422 """Check prerequisites.
423
424 """
425 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
426
427
428 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
429
430 self.group = self.cfg.GetNodeGroup(self.group_uuid)
431 cluster = self.cfg.GetClusterInfo()
432
433 if self.group is None:
434 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
435 (self.op.group_name, self.group_uuid))
436
437 if self.op.ndparams:
438 new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams)
439 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
440 self.new_ndparams = new_ndparams
441
442 if self.op.diskparams:
443 diskparams = self.group.diskparams
444 uavdp = self._UpdateAndVerifyDiskParams
445
446 new_diskparams = dict((dt,
447 uavdp(diskparams.get(dt, {}),
448 self.op.diskparams[dt]))
449 for dt in constants.DISK_TEMPLATES
450 if dt in self.op.diskparams)
451
452
453 self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
454
455 try:
456 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
457 CheckDiskAccessModeConsistency(self.new_diskparams, self.cfg,
458 group=self.group)
459 except errors.OpPrereqError, err:
460 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
461 errors.ECODE_INVAL)
462
463 if self.op.hv_state:
464 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
465 self.group.hv_state_static)
466
467 if self.op.disk_state:
468 self.new_disk_state = \
469 MergeAndVerifyDiskState(self.op.disk_state,
470 self.group.disk_state_static)
471
472 self._CheckIpolicy(cluster, owned_instance_names)
473
475 """Build hooks env.
476
477 """
478 return {
479 "GROUP_NAME": self.op.group_name,
480 "NEW_ALLOC_POLICY": self.op.alloc_policy,
481 }
482
484 """Build hooks nodes.
485
486 """
487 mn = self.cfg.GetMasterNode()
488 return ([mn], [mn])
489
490 - def Exec(self, feedback_fn):
491 """Modifies the node group.
492
493 """
494 result = []
495
496 if self.op.ndparams:
497 self.group.ndparams = self.new_ndparams
498 result.append(("ndparams", str(self.group.ndparams)))
499
500 if self.op.diskparams:
501 self.group.diskparams = self.new_diskparams
502 result.append(("diskparams", str(self.group.diskparams)))
503
504 if self.op.alloc_policy:
505 self.group.alloc_policy = self.op.alloc_policy
506
507 if self.op.hv_state:
508 self.group.hv_state_static = self.new_hv_state
509
510 if self.op.disk_state:
511 self.group.disk_state_static = self.new_disk_state
512
513 if self.op.ipolicy:
514 self.group.ipolicy = self.new_ipolicy
515
516 self.cfg.Update(self.group, feedback_fn)
517 return result
518
521 HPATH = "group-remove"
522 HTYPE = constants.HTYPE_GROUP
523 REQ_BGL = False
524
526
527 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
528 self.needed_locks = {
529 locking.LEVEL_NODEGROUP: [self.group_uuid],
530 }
531
533 """Check prerequisites.
534
535 This checks that the given group name exists as a node group, that is
536 empty (i.e., contains no nodes), and that is not the last group of the
537 cluster.
538
539 """
540
541 group_nodes = [node.uuid
542 for node in self.cfg.GetAllNodesInfo().values()
543 if node.group == self.group_uuid]
544
545 if group_nodes:
546 raise errors.OpPrereqError("Group '%s' not empty, has the following"
547 " nodes: %s" %
548 (self.op.group_name,
549 utils.CommaJoin(utils.NiceSort(group_nodes))),
550 errors.ECODE_STATE)
551
552
553 if len(self.cfg.GetNodeGroupList()) == 1:
554 raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
555 " removed" % self.op.group_name,
556 errors.ECODE_STATE)
557
559 """Build hooks env.
560
561 """
562 return {
563 "GROUP_NAME": self.op.group_name,
564 }
565
567 """Build hooks nodes.
568
569 """
570 mn = self.cfg.GetMasterNode()
571 return ([mn], [mn])
572
573 - def Exec(self, feedback_fn):
582
585 HPATH = "group-rename"
586 HTYPE = constants.HTYPE_GROUP
587 REQ_BGL = False
588
590
591 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
592
593 self.needed_locks = {
594 locking.LEVEL_NODEGROUP: [self.group_uuid],
595 }
596
598 """Check prerequisites.
599
600 Ensures requested new name is not yet used.
601
602 """
603 try:
604 new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name)
605 except errors.OpPrereqError:
606 pass
607 else:
608 raise errors.OpPrereqError("Desired new name '%s' clashes with existing"
609 " node group (UUID: %s)" %
610 (self.op.new_name, new_name_uuid),
611 errors.ECODE_EXISTS)
612
614 """Build hooks env.
615
616 """
617 return {
618 "OLD_NAME": self.op.group_name,
619 "NEW_NAME": self.op.new_name,
620 }
621
636
637 - def Exec(self, feedback_fn):
638 """Rename the node group.
639
640 """
641 group = self.cfg.GetNodeGroup(self.group_uuid)
642
643 if group is None:
644 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
645 (self.op.group_name, self.group_uuid))
646
647 group.name = self.op.new_name
648 self.cfg.Update(group, feedback_fn)
649
650 return self.op.new_name
651
654 HPATH = "group-evacuate"
655 HTYPE = constants.HTYPE_GROUP
656 REQ_BGL = False
657
659
660 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
661
662 if self.op.target_groups:
663 self.req_target_uuids = map(self.cfg.LookupNodeGroup,
664 self.op.target_groups)
665 else:
666 self.req_target_uuids = []
667
668 if self.group_uuid in self.req_target_uuids:
669 raise errors.OpPrereqError("Group to be evacuated (%s) can not be used"
670 " as a target group (targets are %s)" %
671 (self.group_uuid,
672 utils.CommaJoin(self.req_target_uuids)),
673 errors.ECODE_INVAL)
674
675 self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator)
676
677 self.share_locks = ShareAll()
678 self.needed_locks = {
679 locking.LEVEL_INSTANCE: [],
680 locking.LEVEL_NODEGROUP: [],
681 locking.LEVEL_NODE: [],
682 }
683
729
731 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
732 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
733 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
734
735 assert owned_groups.issuperset(self.req_target_uuids)
736 assert self.group_uuid in owned_groups
737
738
739 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
740
741
742 self.instances = \
743 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names))
744
745
746 CheckInstancesNodeGroups(self.cfg, self.instances,
747 owned_groups, owned_node_uuids, self.group_uuid)
748
749 if self.req_target_uuids:
750
751 self.target_uuids = self.req_target_uuids
752 else:
753
754 self.target_uuids = [group_uuid for group_uuid in owned_groups
755 if group_uuid != self.group_uuid]
756
757 if not self.target_uuids:
758 raise errors.OpPrereqError("There are no possible target groups",
759 errors.ECODE_INVAL)
760
762 """Build hooks env.
763
764 """
765 return {
766 "GROUP_NAME": self.op.group_name,
767 "TARGET_GROUPS": " ".join(self.target_uuids),
768 }
769
771 """Build hooks nodes.
772
773 """
774 mn = self.cfg.GetMasterNode()
775
776 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
777
778 run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
779
780 return (run_nodes, run_nodes)
781
782 @staticmethod
784 """Return an equivalent failover opcode for a migrate one.
785
786 If the argument is not a failover opcode, return it unchanged.
787
788 """
789 if not isinstance(op, opcodes.OpInstanceMigrate):
790 return op
791 else:
792 return opcodes.OpInstanceFailover(
793 instance_name=op.instance_name,
794 instance_uuid=getattr(op, "instance_uuid", None),
795 target_node=getattr(op, "target_node", None),
796 target_node_uuid=getattr(op, "target_node_uuid", None),
797 ignore_ipolicy=op.ignore_ipolicy,
798 cleanup=op.cleanup)
799
800 - def Exec(self, feedback_fn):
801 inst_names = list(self.owned_locks(locking.LEVEL_INSTANCE))
802
803 assert self.group_uuid not in self.target_uuids
804
805 req = iallocator.IAReqGroupChange(instances=inst_names,
806 target_groups=self.target_uuids)
807 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
808
809 ial.Run(self.op.iallocator)
810
811 if not ial.success:
812 raise errors.OpPrereqError("Can't compute group evacuation using"
813 " iallocator '%s': %s" %
814 (self.op.iallocator, ial.info),
815 errors.ECODE_NORES)
816
817 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
818
819 self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s",
820 len(jobs), self.op.group_name)
821
822 if self.op.force_failover:
823 self.LogInfo("Will insist on failovers")
824 jobs = [[self._MigrateToFailover(op) for op in job] for job in jobs]
825
826 if self.op.sequential:
827 self.LogInfo("Jobs will be submitted to run sequentially")
828 for job in jobs[1:]:
829 for op in job:
830 op.depends = [(-1, ["error", "success"])]
831
832 return ResultWithJobs(jobs)
833
836 """Verifies the status of all disks in a node group.
837
838 """
839 REQ_BGL = False
840
842
843 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
844
845 self.share_locks = ShareAll()
846 self.needed_locks = {
847 locking.LEVEL_INSTANCE: [],
848 locking.LEVEL_NODEGROUP: [],
849 locking.LEVEL_NODE: [],
850 }
851 self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True
852 self.dont_collate_locks[locking.LEVEL_NODE] = True
853
888
905
906 - def _VerifyInstanceLvs(self, node_errors, offline_disk_instance_names,
907 missing_disks):
908 node_lv_to_inst = MapInstanceLvsToNodes(
909 self.cfg,
910 [inst for inst in self.instances.values() if inst.disks_active])
911 if node_lv_to_inst:
912 node_uuids = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
913 set(self.cfg.GetVmCapableNodeList()))
914
915 node_lvs = self.rpc.call_lv_list(node_uuids, [])
916
917 for (node_uuid, node_res) in node_lvs.items():
918 if node_res.offline:
919 continue
920
921 msg = node_res.fail_msg
922 if msg:
923 logging.warning("Error enumerating LVs on node %s: %s",
924 self.cfg.GetNodeName(node_uuid), msg)
925 node_errors[node_uuid] = msg
926 continue
927
928 for lv_name, (_, _, lv_online) in node_res.payload.items():
929 inst = node_lv_to_inst.pop((node_uuid, lv_name), None)
930 if not lv_online and inst is not None:
931 offline_disk_instance_names.add(inst.name)
932
933
934
935 for key, inst in node_lv_to_inst.iteritems():
936 missing_disks.setdefault(inst.name, []).append(list(key))
937
939 node_to_inst = {}
940 for inst in self.instances.values():
941 disks = self.cfg.GetInstanceDisks(inst.uuid)
942 if not (inst.disks_active and
943 utils.AnyDiskOfType(disks, [constants.DT_DRBD8])):
944 continue
945
946 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(inst.uuid)
947 for node_uuid in itertools.chain([inst.primary_node],
948 secondary_nodes):
949 node_to_inst.setdefault(node_uuid, []).append(inst)
950
951 for (node_uuid, insts) in node_to_inst.items():
952 node_disks = [(self.cfg.GetInstanceDisks(inst.uuid), inst)
953 for inst in insts]
954 node_res = self.rpc.call_drbd_needs_activation(node_uuid, node_disks)
955 msg = node_res.fail_msg
956 if msg:
957 logging.warning("Error getting DRBD status on node %s: %s",
958 self.cfg.GetNodeName(node_uuid), msg)
959 node_errors[node_uuid] = msg
960 continue
961
962 faulty_disk_uuids = set(node_res.payload)
963 for inst in self.instances.values():
964 disks = self.cfg.GetInstanceDisks(inst.uuid)
965 inst_disk_uuids = set([disk.uuid for disk in disks])
966 if inst_disk_uuids.intersection(faulty_disk_uuids):
967 offline_disk_instance_names.add(inst.name)
968
969 - def Exec(self, feedback_fn):
970 """Verify integrity of cluster disks.
971
972 @rtype: tuple of three items
973 @return: a tuple of (dict of node-to-node_error, list of instances
974 which need activate-disks, dict of instance: (node, volume) for
975 missing volumes
976
977 """
978 node_errors = {}
979 offline_disk_instance_names = set()
980 missing_disks = {}
981
982 self._VerifyInstanceLvs(node_errors, offline_disk_instance_names,
983 missing_disks)
984 self._VerifyDrbdStates(node_errors, offline_disk_instance_names)
985
986 return (node_errors, list(offline_disk_instance_names), missing_disks)
987