1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with node groups."""
32
33 import itertools
34 import logging
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import objects
40 from ganeti import opcodes
41 from ganeti import utils
42 from ganeti.masterd import iallocator
43 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
44 from ganeti.cmdlib.common import MergeAndVerifyHvState, \
45 MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \
46 CheckNodeGroupInstances, GetUpdatedIPolicy, \
47 ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \
48 CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceLvsToNodes, \
49 CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
50 CheckDiskAccessModeConsistency, ConnectInstanceCommunicationNetworkOp
51
52 import ganeti.masterd.instance
56 """Logical unit for creating node groups.
57
58 """
59 HPATH = "group-add"
60 HTYPE = constants.HTYPE_GROUP
61 REQ_BGL = False
62
64
65
66
67 self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
68 self.needed_locks = {}
69 self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
70
85
131
133 """Build hooks env.
134
135 """
136 return {
137 "GROUP_NAME": self.op.group_name,
138 }
139
141 """Build hooks nodes.
142
143 """
144 mn = self.cfg.GetMasterNode()
145 return ([mn], [mn])
146
147 @staticmethod
149 """Connect a node group to the instance communication network.
150
151 The group is connected to the instance communication network via
152 the Opcode 'OpNetworkConnect'.
153
154 @type cfg: L{ganeti.config.ConfigWriter}
155 @param cfg: Ganeti configuration
156
157 @type group_uuid: string
158 @param group_uuid: UUID of the group to connect
159
160 @type network_name: string
161 @param network_name: name of the network to connect to
162
163 @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None}
164 @return: L{ganeti.cmdlib.ResultWithJobs} if the group needs to be
165 connected, otherwise (the group is already connected)
166 L{None}
167
168 """
169 try:
170 cfg.LookupNetwork(network_name)
171 network_exists = True
172 except errors.OpPrereqError:
173 network_exists = False
174
175 if network_exists:
176 op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name)
177 return ResultWithJobs([[op]])
178 else:
179 return None
180
181 - def Exec(self, feedback_fn):
182 """Add the node group to the cluster.
183
184 """
185 group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
186 uuid=self.group_uuid,
187 alloc_policy=self.op.alloc_policy,
188 ndparams=self.op.ndparams,
189 diskparams=self.new_diskparams,
190 ipolicy=self.op.ipolicy,
191 hv_state_static=self.new_hv_state,
192 disk_state_static=self.new_disk_state)
193
194 self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
195
196 network_name = self.cfg.GetClusterInfo().instance_communication_network
197 if network_name:
198 return self._ConnectInstanceCommunicationNetwork(self.cfg,
199 self.group_uuid,
200 network_name)
201
204 """Logical unit for assigning nodes to groups.
205
206 """
207 REQ_BGL = False
208
210
211 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
212 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
213
214
215
216
217 self.needed_locks = {
218 locking.LEVEL_NODEGROUP: set([self.group_uuid]),
219 locking.LEVEL_NODE: self.op.node_uuids,
220 }
221
223 if level == locking.LEVEL_NODEGROUP:
224 assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
225
226
227
228 groups = self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids)
229
230 self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
231
233 """Check prerequisites.
234
235 """
236 assert self.needed_locks[locking.LEVEL_NODEGROUP]
237 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
238 frozenset(self.op.node_uuids))
239
240 expected_locks = (set([self.group_uuid]) |
241 self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids))
242 actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP)
243 if actual_locks != expected_locks:
244 raise errors.OpExecError("Nodes changed groups since locks were acquired,"
245 " current groups are '%s', used to be '%s'" %
246 (utils.CommaJoin(expected_locks),
247 utils.CommaJoin(actual_locks)))
248
249 self.node_data = self.cfg.GetAllNodesInfo()
250 self.group = self.cfg.GetNodeGroup(self.group_uuid)
251 instance_data = self.cfg.GetAllInstancesInfo()
252
253 if self.group is None:
254 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
255 (self.op.group_name, self.group_uuid))
256
257 (new_splits, previous_splits) = \
258 self.CheckAssignmentForSplitInstances([(uuid, self.group_uuid)
259 for uuid in self.op.node_uuids],
260 self.node_data, instance_data)
261
262 if new_splits:
263 fmt_new_splits = utils.CommaJoin(utils.NiceSort(
264 self.cfg.GetInstanceNames(new_splits)))
265
266 if not self.op.force:
267 raise errors.OpExecError("The following instances get split by this"
268 " change and --force was not given: %s" %
269 fmt_new_splits)
270 else:
271 self.LogWarning("This operation will split the following instances: %s",
272 fmt_new_splits)
273
274 if previous_splits:
275 self.LogWarning("In addition, these already-split instances continue"
276 " to be split across groups: %s",
277 utils.CommaJoin(utils.NiceSort(
278 self.cfg.GetInstanceNames(previous_splits))))
279
280 - def Exec(self, feedback_fn):
281 """Assign nodes to a new group.
282
283 """
284 mods = [(node_uuid, self.group_uuid) for node_uuid in self.op.node_uuids]
285
286 self.cfg.AssignGroupNodes(mods)
287
289 """Check for split instances after a node assignment.
290
291 This method considers a series of node assignments as an atomic operation,
292 and returns information about split instances after applying the set of
293 changes.
294
295 In particular, it returns information about newly split instances, and
296 instances that were already split, and remain so after the change.
297
298 Only instances whose disk template is listed in constants.DTS_INT_MIRROR are
299 considered.
300
301 @type changes: list of (node_uuid, new_group_uuid) pairs.
302 @param changes: list of node assignments to consider.
303 @param node_data: a dict with data for all nodes
304 @param instance_data: a dict with all instances to consider
305 @rtype: a two-tuple
306 @return: a list of instances that were previously okay and result split as a
307 consequence of this change, and a list of instances that were previously
308 split and this change does not fix.
309
310 """
311 changed_nodes = dict((uuid, group) for uuid, group in changes
312 if node_data[uuid].group != group)
313
314 all_split_instances = set()
315 previously_split_instances = set()
316
317 for inst in instance_data.values():
318 if inst.disk_template not in constants.DTS_INT_MIRROR:
319 continue
320
321 inst_nodes = self.cfg.GetInstanceNodes(inst.uuid)
322 if len(set(node_data[node_uuid].group
323 for node_uuid in inst_nodes)) > 1:
324 previously_split_instances.add(inst.uuid)
325
326 if len(set(changed_nodes.get(node_uuid, node_data[node_uuid].group)
327 for node_uuid in inst_nodes)) > 1:
328 all_split_instances.add(inst.uuid)
329
330 return (list(all_split_instances - previously_split_instances),
331 list(previously_split_instances & all_split_instances))
332
335 """Modifies the parameters of a node group.
336
337 """
338 HPATH = "group-modify"
339 HTYPE = constants.HTYPE_GROUP
340 REQ_BGL = False
341
343 all_changes = [
344 self.op.ndparams,
345 self.op.diskparams,
346 self.op.alloc_policy,
347 self.op.hv_state,
348 self.op.disk_state,
349 self.op.ipolicy,
350 ]
351
352 if all_changes.count(None) == len(all_changes):
353 raise errors.OpPrereqError("Please pass at least one modification",
354 errors.ECODE_INVAL)
355
356 if self.op.diskparams:
357 CheckDiskAccessModeValidity(self.op.diskparams)
358
360
361 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
362
363 self.needed_locks = {
364 locking.LEVEL_INSTANCE: [],
365 locking.LEVEL_NODEGROUP: [self.group_uuid],
366 }
367
368 self.share_locks[locking.LEVEL_INSTANCE] = 1
369
379
380 @staticmethod
388
419
421 """Check prerequisites.
422
423 """
424 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
425
426
427 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
428
429 self.group = self.cfg.GetNodeGroup(self.group_uuid)
430 cluster = self.cfg.GetClusterInfo()
431
432 if self.group is None:
433 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
434 (self.op.group_name, self.group_uuid))
435
436 if self.op.ndparams:
437 new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams)
438 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
439 self.new_ndparams = new_ndparams
440
441 if self.op.diskparams:
442 diskparams = self.group.diskparams
443 uavdp = self._UpdateAndVerifyDiskParams
444
445 new_diskparams = dict((dt,
446 uavdp(diskparams.get(dt, {}),
447 self.op.diskparams[dt]))
448 for dt in constants.DISK_TEMPLATES
449 if dt in self.op.diskparams)
450
451
452 self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
453
454 try:
455 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
456 CheckDiskAccessModeConsistency(self.new_diskparams, self.cfg,
457 group=self.group)
458 except errors.OpPrereqError, err:
459 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
460 errors.ECODE_INVAL)
461
462 if self.op.hv_state:
463 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
464 self.group.hv_state_static)
465
466 if self.op.disk_state:
467 self.new_disk_state = \
468 MergeAndVerifyDiskState(self.op.disk_state,
469 self.group.disk_state_static)
470
471 self._CheckIpolicy(cluster, owned_instance_names)
472
474 """Build hooks env.
475
476 """
477 return {
478 "GROUP_NAME": self.op.group_name,
479 "NEW_ALLOC_POLICY": self.op.alloc_policy,
480 }
481
483 """Build hooks nodes.
484
485 """
486 mn = self.cfg.GetMasterNode()
487 return ([mn], [mn])
488
489 - def Exec(self, feedback_fn):
490 """Modifies the node group.
491
492 """
493 result = []
494
495 if self.op.ndparams:
496 self.group.ndparams = self.new_ndparams
497 result.append(("ndparams", str(self.group.ndparams)))
498
499 if self.op.diskparams:
500 self.group.diskparams = self.new_diskparams
501 result.append(("diskparams", str(self.group.diskparams)))
502
503 if self.op.alloc_policy:
504 self.group.alloc_policy = self.op.alloc_policy
505
506 if self.op.hv_state:
507 self.group.hv_state_static = self.new_hv_state
508
509 if self.op.disk_state:
510 self.group.disk_state_static = self.new_disk_state
511
512 if self.op.ipolicy:
513 self.group.ipolicy = self.new_ipolicy
514
515 self.cfg.Update(self.group, feedback_fn)
516 return result
517
520 HPATH = "group-remove"
521 HTYPE = constants.HTYPE_GROUP
522 REQ_BGL = False
523
525
526 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
527 self.needed_locks = {
528 locking.LEVEL_NODEGROUP: [self.group_uuid],
529 }
530
532 """Check prerequisites.
533
534 This checks that the given group name exists as a node group, that is
535 empty (i.e., contains no nodes), and that is not the last group of the
536 cluster.
537
538 """
539
540 group_nodes = [node.uuid
541 for node in self.cfg.GetAllNodesInfo().values()
542 if node.group == self.group_uuid]
543
544 if group_nodes:
545 raise errors.OpPrereqError("Group '%s' not empty, has the following"
546 " nodes: %s" %
547 (self.op.group_name,
548 utils.CommaJoin(utils.NiceSort(group_nodes))),
549 errors.ECODE_STATE)
550
551
552 if len(self.cfg.GetNodeGroupList()) == 1:
553 raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
554 " removed" % self.op.group_name,
555 errors.ECODE_STATE)
556
558 """Build hooks env.
559
560 """
561 return {
562 "GROUP_NAME": self.op.group_name,
563 }
564
566 """Build hooks nodes.
567
568 """
569 mn = self.cfg.GetMasterNode()
570 return ([mn], [mn])
571
572 - def Exec(self, feedback_fn):
581
584 HPATH = "group-rename"
585 HTYPE = constants.HTYPE_GROUP
586 REQ_BGL = False
587
589
590 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
591
592 self.needed_locks = {
593 locking.LEVEL_NODEGROUP: [self.group_uuid],
594 }
595
597 """Check prerequisites.
598
599 Ensures requested new name is not yet used.
600
601 """
602 try:
603 new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name)
604 except errors.OpPrereqError:
605 pass
606 else:
607 raise errors.OpPrereqError("Desired new name '%s' clashes with existing"
608 " node group (UUID: %s)" %
609 (self.op.new_name, new_name_uuid),
610 errors.ECODE_EXISTS)
611
613 """Build hooks env.
614
615 """
616 return {
617 "OLD_NAME": self.op.group_name,
618 "NEW_NAME": self.op.new_name,
619 }
620
635
636 - def Exec(self, feedback_fn):
637 """Rename the node group.
638
639 """
640 group = self.cfg.GetNodeGroup(self.group_uuid)
641
642 if group is None:
643 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
644 (self.op.group_name, self.group_uuid))
645
646 group.name = self.op.new_name
647 self.cfg.Update(group, feedback_fn)
648
649 return self.op.new_name
650
653 HPATH = "group-evacuate"
654 HTYPE = constants.HTYPE_GROUP
655 REQ_BGL = False
656
658
659 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
660
661 if self.op.target_groups:
662 self.req_target_uuids = map(self.cfg.LookupNodeGroup,
663 self.op.target_groups)
664 else:
665 self.req_target_uuids = []
666
667 if self.group_uuid in self.req_target_uuids:
668 raise errors.OpPrereqError("Group to be evacuated (%s) can not be used"
669 " as a target group (targets are %s)" %
670 (self.group_uuid,
671 utils.CommaJoin(self.req_target_uuids)),
672 errors.ECODE_INVAL)
673
674 self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator)
675
676 self.share_locks = ShareAll()
677 self.needed_locks = {
678 locking.LEVEL_INSTANCE: [],
679 locking.LEVEL_NODEGROUP: [],
680 locking.LEVEL_NODE: [],
681 }
682
728
730 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
731 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
732 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
733
734 assert owned_groups.issuperset(self.req_target_uuids)
735 assert self.group_uuid in owned_groups
736
737
738 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
739
740
741 self.instances = \
742 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names))
743
744
745 CheckInstancesNodeGroups(self.cfg, self.instances,
746 owned_groups, owned_node_uuids, self.group_uuid)
747
748 if self.req_target_uuids:
749
750 self.target_uuids = self.req_target_uuids
751 else:
752
753 self.target_uuids = [group_uuid for group_uuid in owned_groups
754 if group_uuid != self.group_uuid]
755
756 if not self.target_uuids:
757 raise errors.OpPrereqError("There are no possible target groups",
758 errors.ECODE_INVAL)
759
761 """Build hooks env.
762
763 """
764 return {
765 "GROUP_NAME": self.op.group_name,
766 "TARGET_GROUPS": " ".join(self.target_uuids),
767 }
768
770 """Build hooks nodes.
771
772 """
773 mn = self.cfg.GetMasterNode()
774
775 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
776
777 run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
778
779 return (run_nodes, run_nodes)
780
781 @staticmethod
783 """Return an equivalent failover opcode for a migrate one.
784
785 If the argument is not a failover opcode, return it unchanged.
786
787 """
788 if not isinstance(op, opcodes.OpInstanceMigrate):
789 return op
790 else:
791 return opcodes.OpInstanceFailover(
792 instance_name=op.instance_name,
793 instance_uuid=getattr(op, "instance_uuid", None),
794 target_node=getattr(op, "target_node", None),
795 target_node_uuid=getattr(op, "target_node_uuid", None),
796 ignore_ipolicy=op.ignore_ipolicy,
797 cleanup=op.cleanup)
798
799 - def Exec(self, feedback_fn):
800 inst_names = list(self.owned_locks(locking.LEVEL_INSTANCE))
801
802 assert self.group_uuid not in self.target_uuids
803
804 req = iallocator.IAReqGroupChange(instances=inst_names,
805 target_groups=self.target_uuids)
806 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
807
808 ial.Run(self.op.iallocator)
809
810 if not ial.success:
811 raise errors.OpPrereqError("Can't compute group evacuation using"
812 " iallocator '%s': %s" %
813 (self.op.iallocator, ial.info),
814 errors.ECODE_NORES)
815
816 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
817
818 self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s",
819 len(jobs), self.op.group_name)
820
821 if self.op.force_failover:
822 self.LogInfo("Will insist on failovers")
823 jobs = [[self._MigrateToFailover(op) for op in job] for job in jobs]
824
825 if self.op.sequential:
826 self.LogInfo("Jobs will be submitted to run sequentially")
827 for job in jobs[1:]:
828 for op in job:
829 op.depends = [(-1, ["error", "success"])]
830
831 return ResultWithJobs(jobs)
832
835 """Verifies the status of all disks in a node group.
836
837 """
838 REQ_BGL = False
839
856
891
908
909 - def _VerifyInstanceLvs(self, node_errors, offline_disk_instance_names,
910 missing_disks):
911 node_lv_to_inst = MapInstanceLvsToNodes(
912 self.cfg,
913 [inst for inst in self.instances.values() if inst.disks_active])
914 if node_lv_to_inst:
915 node_uuids = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
916 set(self.cfg.GetVmCapableNodeList()))
917
918 node_lvs = self.rpc.call_lv_list(node_uuids, [])
919
920 for (node_uuid, node_res) in node_lvs.items():
921 if node_res.offline:
922 continue
923
924 msg = node_res.fail_msg
925 if msg:
926 logging.warning("Error enumerating LVs on node %s: %s",
927 self.cfg.GetNodeName(node_uuid), msg)
928 node_errors[node_uuid] = msg
929 continue
930
931 for lv_name, (_, _, lv_online) in node_res.payload.items():
932 inst = node_lv_to_inst.pop((node_uuid, lv_name), None)
933 if not lv_online and inst is not None:
934 offline_disk_instance_names.add(inst.name)
935
936
937
938 for key, inst in node_lv_to_inst.iteritems():
939 missing_disks.setdefault(inst.name, []).append(list(key))
940
942 node_to_inst = {}
943 for inst in self.instances.values():
944 if not inst.disks_active or inst.disk_template != constants.DT_DRBD8:
945 continue
946
947 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(inst.uuid)
948 for node_uuid in itertools.chain([inst.primary_node],
949 secondary_nodes):
950 node_to_inst.setdefault(node_uuid, []).append(inst)
951
952 for (node_uuid, insts) in node_to_inst.items():
953 node_disks = [(self.cfg.GetInstanceDisks(inst.uuid), inst)
954 for inst in insts]
955 node_res = self.rpc.call_drbd_needs_activation(node_uuid, node_disks)
956 msg = node_res.fail_msg
957 if msg:
958 logging.warning("Error getting DRBD status on node %s: %s",
959 self.cfg.GetNodeName(node_uuid), msg)
960 node_errors[node_uuid] = msg
961 continue
962
963 faulty_disk_uuids = set(node_res.payload)
964 for inst in self.instances.values():
965 disks = self.cfg.GetInstanceDisks(inst.uuid)
966 inst_disk_uuids = set([disk.uuid for disk in disks])
967 if inst_disk_uuids.intersection(faulty_disk_uuids):
968 offline_disk_instance_names.add(inst.name)
969
970 - def Exec(self, feedback_fn):
971 """Verify integrity of cluster disks.
972
973 @rtype: tuple of three items
974 @return: a tuple of (dict of node-to-node_error, list of instances
975 which need activate-disks, dict of instance: (node, volume) for
976 missing volumes
977
978 """
979 node_errors = {}
980 offline_disk_instance_names = set()
981 missing_disks = {}
982
983 self._VerifyInstanceLvs(node_errors, offline_disk_instance_names,
984 missing_disks)
985 self._VerifyDrbdStates(node_errors, offline_disk_instance_names)
986
987 return (node_errors, list(offline_disk_instance_names), missing_disks)
988