1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with node groups."""
32
33 import itertools
34 import logging
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import objects
40 from ganeti import opcodes
41 from ganeti import qlang
42 from ganeti import query
43 from ganeti import utils
44 from ganeti.masterd import iallocator
45 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
46 ResultWithJobs
47 from ganeti.cmdlib.common import MergeAndVerifyHvState, \
48 MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \
49 CheckNodeGroupInstances, GetUpdatedIPolicy, \
50 ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \
51 CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceLvsToNodes, \
52 CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
53 CheckDiskAccessModeConsistency
54
55 import ganeti.masterd.instance
59 """Logical unit for creating node groups.
60
61 """
62 HPATH = "group-add"
63 HTYPE = constants.HTYPE_GROUP
64 REQ_BGL = False
65
67
68
69
70 self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
71 self.needed_locks = {}
72 self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
73
88
134
136 """Build hooks env.
137
138 """
139 return {
140 "GROUP_NAME": self.op.group_name,
141 }
142
144 """Build hooks nodes.
145
146 """
147 mn = self.cfg.GetMasterNode()
148 return ([mn], [mn])
149
150 - def Exec(self, feedback_fn):
151 """Add the node group to the cluster.
152
153 """
154 group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
155 uuid=self.group_uuid,
156 alloc_policy=self.op.alloc_policy,
157 ndparams=self.op.ndparams,
158 diskparams=self.new_diskparams,
159 ipolicy=self.op.ipolicy,
160 hv_state_static=self.new_hv_state,
161 disk_state_static=self.new_disk_state)
162
163 self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
164 del self.remove_locks[locking.LEVEL_NODEGROUP]
165
168 """Logical unit for assigning nodes to groups.
169
170 """
171 REQ_BGL = False
172
174
175 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
176 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
177
178
179
180
181 self.needed_locks = {
182 locking.LEVEL_NODEGROUP: set([self.group_uuid]),
183 locking.LEVEL_NODE: self.op.node_uuids,
184 }
185
187 if level == locking.LEVEL_NODEGROUP:
188 assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
189
190
191
192 groups = self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids)
193
194 self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
195
197 """Check prerequisites.
198
199 """
200 assert self.needed_locks[locking.LEVEL_NODEGROUP]
201 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
202 frozenset(self.op.node_uuids))
203
204 expected_locks = (set([self.group_uuid]) |
205 self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids))
206 actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP)
207 if actual_locks != expected_locks:
208 raise errors.OpExecError("Nodes changed groups since locks were acquired,"
209 " current groups are '%s', used to be '%s'" %
210 (utils.CommaJoin(expected_locks),
211 utils.CommaJoin(actual_locks)))
212
213 self.node_data = self.cfg.GetAllNodesInfo()
214 self.group = self.cfg.GetNodeGroup(self.group_uuid)
215 instance_data = self.cfg.GetAllInstancesInfo()
216
217 if self.group is None:
218 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
219 (self.op.group_name, self.group_uuid))
220
221 (new_splits, previous_splits) = \
222 self.CheckAssignmentForSplitInstances([(uuid, self.group_uuid)
223 for uuid in self.op.node_uuids],
224 self.node_data, instance_data)
225
226 if new_splits:
227 fmt_new_splits = utils.CommaJoin(utils.NiceSort(
228 self.cfg.GetInstanceNames(new_splits)))
229
230 if not self.op.force:
231 raise errors.OpExecError("The following instances get split by this"
232 " change and --force was not given: %s" %
233 fmt_new_splits)
234 else:
235 self.LogWarning("This operation will split the following instances: %s",
236 fmt_new_splits)
237
238 if previous_splits:
239 self.LogWarning("In addition, these already-split instances continue"
240 " to be split across groups: %s",
241 utils.CommaJoin(utils.NiceSort(
242 self.cfg.GetInstanceNames(previous_splits))))
243
244 - def Exec(self, feedback_fn):
245 """Assign nodes to a new group.
246
247 """
248 mods = [(node_uuid, self.group_uuid) for node_uuid in self.op.node_uuids]
249
250 self.cfg.AssignGroupNodes(mods)
251
252 @staticmethod
254 """Check for split instances after a node assignment.
255
256 This method considers a series of node assignments as an atomic operation,
257 and returns information about split instances after applying the set of
258 changes.
259
260 In particular, it returns information about newly split instances, and
261 instances that were already split, and remain so after the change.
262
263 Only instances whose disk template is listed in constants.DTS_INT_MIRROR are
264 considered.
265
266 @type changes: list of (node_uuid, new_group_uuid) pairs.
267 @param changes: list of node assignments to consider.
268 @param node_data: a dict with data for all nodes
269 @param instance_data: a dict with all instances to consider
270 @rtype: a two-tuple
271 @return: a list of instances that were previously okay and result split as a
272 consequence of this change, and a list of instances that were previously
273 split and this change does not fix.
274
275 """
276 changed_nodes = dict((uuid, group) for uuid, group in changes
277 if node_data[uuid].group != group)
278
279 all_split_instances = set()
280 previously_split_instances = set()
281
282 for inst in instance_data.values():
283 if inst.disk_template not in constants.DTS_INT_MIRROR:
284 continue
285
286 if len(set(node_data[node_uuid].group
287 for node_uuid in inst.all_nodes)) > 1:
288 previously_split_instances.add(inst.uuid)
289
290 if len(set(changed_nodes.get(node_uuid, node_data[node_uuid].group)
291 for node_uuid in inst.all_nodes)) > 1:
292 all_split_instances.add(inst.uuid)
293
294 return (list(all_split_instances - previously_split_instances),
295 list(previously_split_instances & all_split_instances))
296
299 FIELDS = query.GROUP_FIELDS
300
302 lu.needed_locks = {}
303
304 self._all_groups = lu.cfg.GetAllNodeGroupsInfo()
305 self._cluster = lu.cfg.GetClusterInfo()
306 name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values())
307
308 if not self.names:
309 self.wanted = [name_to_uuid[name]
310 for name in utils.NiceSort(name_to_uuid.keys())]
311 else:
312
313 missing = []
314 self.wanted = []
315 all_uuid = frozenset(self._all_groups.keys())
316
317 for name in self.names:
318 if name in all_uuid:
319 self.wanted.append(name)
320 elif name in name_to_uuid:
321 self.wanted.append(name_to_uuid[name])
322 else:
323 missing.append(name)
324
325 if missing:
326 raise errors.OpPrereqError("Some groups do not exist: %s" %
327 utils.CommaJoin(missing),
328 errors.ECODE_NOENT)
329
332
334 """Computes the list of node groups and their attributes.
335
336 """
337 do_nodes = query.GQ_NODE in self.requested_data
338 do_instances = query.GQ_INST in self.requested_data
339
340 group_to_nodes = None
341 group_to_instances = None
342
343
344
345
346
347
348 if do_nodes or do_instances:
349 all_nodes = lu.cfg.GetAllNodesInfo()
350 group_to_nodes = dict((uuid, []) for uuid in self.wanted)
351 node_to_group = {}
352
353 for node in all_nodes.values():
354 if node.group in group_to_nodes:
355 group_to_nodes[node.group].append(node.uuid)
356 node_to_group[node.uuid] = node.group
357
358 if do_instances:
359 all_instances = lu.cfg.GetAllInstancesInfo()
360 group_to_instances = dict((uuid, []) for uuid in self.wanted)
361
362 for instance in all_instances.values():
363 node = instance.primary_node
364 if node in node_to_group:
365 group_to_instances[node_to_group[node]].append(instance.uuid)
366
367 if not do_nodes:
368
369 group_to_nodes = None
370
371 return query.GroupQueryData(self._cluster,
372 [self._all_groups[uuid]
373 for uuid in self.wanted],
374 group_to_nodes, group_to_instances,
375 query.GQ_DISKPARAMS in self.requested_data)
376
379 """Logical unit for querying node groups.
380
381 """
382 REQ_BGL = False
383
387
390
393
394 - def Exec(self, feedback_fn):
396
399 """Modifies the parameters of a node group.
400
401 """
402 HPATH = "group-modify"
403 HTYPE = constants.HTYPE_GROUP
404 REQ_BGL = False
405
407 all_changes = [
408 self.op.ndparams,
409 self.op.diskparams,
410 self.op.alloc_policy,
411 self.op.hv_state,
412 self.op.disk_state,
413 self.op.ipolicy,
414 ]
415
416 if all_changes.count(None) == len(all_changes):
417 raise errors.OpPrereqError("Please pass at least one modification",
418 errors.ECODE_INVAL)
419
420 if self.op.diskparams:
421 CheckDiskAccessModeValidity(self.op.diskparams)
422
424
425 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
426
427 self.needed_locks = {
428 locking.LEVEL_INSTANCE: [],
429 locking.LEVEL_NODEGROUP: [self.group_uuid],
430 }
431
432 self.share_locks[locking.LEVEL_INSTANCE] = 1
433
443
444 @staticmethod
452
483
485 """Check prerequisites.
486
487 """
488 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
489
490
491 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
492
493 self.group = self.cfg.GetNodeGroup(self.group_uuid)
494 cluster = self.cfg.GetClusterInfo()
495
496 if self.group is None:
497 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
498 (self.op.group_name, self.group_uuid))
499
500 if self.op.ndparams:
501 new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams)
502 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
503 self.new_ndparams = new_ndparams
504
505 if self.op.diskparams:
506 diskparams = self.group.diskparams
507 uavdp = self._UpdateAndVerifyDiskParams
508
509 new_diskparams = dict((dt,
510 uavdp(diskparams.get(dt, {}),
511 self.op.diskparams[dt]))
512 for dt in constants.DISK_TEMPLATES
513 if dt in self.op.diskparams)
514
515
516 self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
517
518 try:
519 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
520 CheckDiskAccessModeConsistency(self.new_diskparams, self.cfg,
521 group=self.group)
522 except errors.OpPrereqError, err:
523 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
524 errors.ECODE_INVAL)
525
526 if self.op.hv_state:
527 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
528 self.group.hv_state_static)
529
530 if self.op.disk_state:
531 self.new_disk_state = \
532 MergeAndVerifyDiskState(self.op.disk_state,
533 self.group.disk_state_static)
534
535 self._CheckIpolicy(cluster, owned_instance_names)
536
538 """Build hooks env.
539
540 """
541 return {
542 "GROUP_NAME": self.op.group_name,
543 "NEW_ALLOC_POLICY": self.op.alloc_policy,
544 }
545
547 """Build hooks nodes.
548
549 """
550 mn = self.cfg.GetMasterNode()
551 return ([mn], [mn])
552
553 - def Exec(self, feedback_fn):
554 """Modifies the node group.
555
556 """
557 result = []
558
559 if self.op.ndparams:
560 self.group.ndparams = self.new_ndparams
561 result.append(("ndparams", str(self.group.ndparams)))
562
563 if self.op.diskparams:
564 self.group.diskparams = self.new_diskparams
565 result.append(("diskparams", str(self.group.diskparams)))
566
567 if self.op.alloc_policy:
568 self.group.alloc_policy = self.op.alloc_policy
569
570 if self.op.hv_state:
571 self.group.hv_state_static = self.new_hv_state
572
573 if self.op.disk_state:
574 self.group.disk_state_static = self.new_disk_state
575
576 if self.op.ipolicy:
577 self.group.ipolicy = self.new_ipolicy
578
579 self.cfg.Update(self.group, feedback_fn)
580 return result
581
584 HPATH = "group-remove"
585 HTYPE = constants.HTYPE_GROUP
586 REQ_BGL = False
587
589
590 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
591 self.needed_locks = {
592 locking.LEVEL_NODEGROUP: [self.group_uuid],
593 }
594
596 """Check prerequisites.
597
598 This checks that the given group name exists as a node group, that is
599 empty (i.e., contains no nodes), and that is not the last group of the
600 cluster.
601
602 """
603
604 group_nodes = [node.uuid
605 for node in self.cfg.GetAllNodesInfo().values()
606 if node.group == self.group_uuid]
607
608 if group_nodes:
609 raise errors.OpPrereqError("Group '%s' not empty, has the following"
610 " nodes: %s" %
611 (self.op.group_name,
612 utils.CommaJoin(utils.NiceSort(group_nodes))),
613 errors.ECODE_STATE)
614
615
616 if len(self.cfg.GetNodeGroupList()) == 1:
617 raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
618 " removed" % self.op.group_name,
619 errors.ECODE_STATE)
620
622 """Build hooks env.
623
624 """
625 return {
626 "GROUP_NAME": self.op.group_name,
627 }
628
630 """Build hooks nodes.
631
632 """
633 mn = self.cfg.GetMasterNode()
634 return ([mn], [mn])
635
636 - def Exec(self, feedback_fn):
637 """Remove the node group.
638
639 """
640 try:
641 self.cfg.RemoveNodeGroup(self.group_uuid)
642 except errors.ConfigurationError:
643 raise errors.OpExecError("Group '%s' with UUID %s disappeared" %
644 (self.op.group_name, self.group_uuid))
645
646 self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
647
650 HPATH = "group-rename"
651 HTYPE = constants.HTYPE_GROUP
652 REQ_BGL = False
653
655
656 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
657
658 self.needed_locks = {
659 locking.LEVEL_NODEGROUP: [self.group_uuid],
660 }
661
663 """Check prerequisites.
664
665 Ensures requested new name is not yet used.
666
667 """
668 try:
669 new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name)
670 except errors.OpPrereqError:
671 pass
672 else:
673 raise errors.OpPrereqError("Desired new name '%s' clashes with existing"
674 " node group (UUID: %s)" %
675 (self.op.new_name, new_name_uuid),
676 errors.ECODE_EXISTS)
677
679 """Build hooks env.
680
681 """
682 return {
683 "OLD_NAME": self.op.group_name,
684 "NEW_NAME": self.op.new_name,
685 }
686
701
702 - def Exec(self, feedback_fn):
703 """Rename the node group.
704
705 """
706 group = self.cfg.GetNodeGroup(self.group_uuid)
707
708 if group is None:
709 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
710 (self.op.group_name, self.group_uuid))
711
712 group.name = self.op.new_name
713 self.cfg.Update(group, feedback_fn)
714
715 return self.op.new_name
716
719 HPATH = "group-evacuate"
720 HTYPE = constants.HTYPE_GROUP
721 REQ_BGL = False
722
724
725 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
726
727 if self.op.target_groups:
728 self.req_target_uuids = map(self.cfg.LookupNodeGroup,
729 self.op.target_groups)
730 else:
731 self.req_target_uuids = []
732
733 if self.group_uuid in self.req_target_uuids:
734 raise errors.OpPrereqError("Group to be evacuated (%s) can not be used"
735 " as a target group (targets are %s)" %
736 (self.group_uuid,
737 utils.CommaJoin(self.req_target_uuids)),
738 errors.ECODE_INVAL)
739
740 self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator)
741
742 self.share_locks = ShareAll()
743 self.needed_locks = {
744 locking.LEVEL_INSTANCE: [],
745 locking.LEVEL_NODEGROUP: [],
746 locking.LEVEL_NODE: [],
747 }
748
750 if level == locking.LEVEL_INSTANCE:
751 assert not self.needed_locks[locking.LEVEL_INSTANCE]
752
753
754
755 self.needed_locks[locking.LEVEL_INSTANCE] = \
756 self.cfg.GetInstanceNames(
757 self.cfg.GetNodeGroupInstances(self.group_uuid))
758
759 elif level == locking.LEVEL_NODEGROUP:
760 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
761
762 if self.req_target_uuids:
763 lock_groups = set([self.group_uuid] + self.req_target_uuids)
764
765
766
767 lock_groups.update(group_uuid
768 for instance_name in
769 self.owned_locks(locking.LEVEL_INSTANCE)
770 for group_uuid in
771 self.cfg.GetInstanceNodeGroups(
772 self.cfg.GetInstanceInfoByName(instance_name)
773 .uuid))
774 else:
775
776 lock_groups = locking.ALL_SET
777
778 self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
779
780 elif level == locking.LEVEL_NODE:
781
782
783 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
784 self._LockInstancesNodes()
785
786
787 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
788 assert self.group_uuid in owned_groups
789 member_node_uuids = [node_uuid
790 for group in owned_groups
791 for node_uuid in
792 self.cfg.GetNodeGroup(group).members]
793 self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
794
796 owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
797 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
798 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
799
800 assert owned_groups.issuperset(self.req_target_uuids)
801 assert self.group_uuid in owned_groups
802
803
804 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
805
806
807 self.instances = \
808 dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names))
809
810
811 CheckInstancesNodeGroups(self.cfg, self.instances,
812 owned_groups, owned_node_uuids, self.group_uuid)
813
814 if self.req_target_uuids:
815
816 self.target_uuids = self.req_target_uuids
817 else:
818
819 self.target_uuids = [group_uuid for group_uuid in owned_groups
820 if group_uuid != self.group_uuid]
821
822 if not self.target_uuids:
823 raise errors.OpPrereqError("There are no possible target groups",
824 errors.ECODE_INVAL)
825
827 """Build hooks env.
828
829 """
830 return {
831 "GROUP_NAME": self.op.group_name,
832 "TARGET_GROUPS": " ".join(self.target_uuids),
833 }
834
836 """Build hooks nodes.
837
838 """
839 mn = self.cfg.GetMasterNode()
840
841 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
842
843 run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
844
845 return (run_nodes, run_nodes)
846
847 @staticmethod
849 """Return an equivalent failover opcode for a migrate one.
850
851 If the argument is not a failover opcode, return it unchanged.
852
853 """
854 if not isinstance(op, opcodes.OpInstanceMigrate):
855 return op
856 else:
857 return opcodes.OpInstanceFailover(
858 instance_name=op.instance_name,
859 instance_uuid=getattr(op, "instance_uuid", None),
860 target_node=getattr(op, "target_node", None),
861 target_node_uuid=getattr(op, "target_node_uuid", None),
862 ignore_ipolicy=op.ignore_ipolicy,
863 cleanup=op.cleanup)
864
865 - def Exec(self, feedback_fn):
866 inst_names = list(self.owned_locks(locking.LEVEL_INSTANCE))
867
868 assert self.group_uuid not in self.target_uuids
869
870 req = iallocator.IAReqGroupChange(instances=inst_names,
871 target_groups=self.target_uuids)
872 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
873
874 ial.Run(self.op.iallocator)
875
876 if not ial.success:
877 raise errors.OpPrereqError("Can't compute group evacuation using"
878 " iallocator '%s': %s" %
879 (self.op.iallocator, ial.info),
880 errors.ECODE_NORES)
881
882 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
883
884 self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s",
885 len(jobs), self.op.group_name)
886
887 if self.op.force_failover:
888 self.LogInfo("Will insist on failovers")
889 jobs = [[self._MigrateToFailover(op) for op in job] for job in jobs]
890
891 if self.op.sequential:
892 self.LogInfo("Jobs will be submitted to run sequentially")
893 for job in jobs[1:]:
894 for op in job:
895 op.depends = [(-1, ["error", "success"])]
896
897 return ResultWithJobs(jobs)
898
901 """Verifies the status of all disks in a node group.
902
903 """
904 REQ_BGL = False
905
922
957
959 owned_inst_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
960 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
961 owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
962
963 assert self.group_uuid in owned_groups
964
965
966 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_inst_names)
967
968
969 self.instances = dict(self.cfg.GetMultiInstanceInfoByName(owned_inst_names))
970
971
972 CheckInstancesNodeGroups(self.cfg, self.instances,
973 owned_groups, owned_node_uuids, self.group_uuid)
974
975 - def _VerifyInstanceLvs(self, node_errors, offline_disk_instance_names,
976 missing_disks):
977 node_lv_to_inst = MapInstanceLvsToNodes(
978 [inst for inst in self.instances.values() if inst.disks_active])
979 if node_lv_to_inst:
980 node_uuids = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
981 set(self.cfg.GetVmCapableNodeList()))
982
983 node_lvs = self.rpc.call_lv_list(node_uuids, [])
984
985 for (node_uuid, node_res) in node_lvs.items():
986 if node_res.offline:
987 continue
988
989 msg = node_res.fail_msg
990 if msg:
991 logging.warning("Error enumerating LVs on node %s: %s",
992 self.cfg.GetNodeName(node_uuid), msg)
993 node_errors[node_uuid] = msg
994 continue
995
996 for lv_name, (_, _, lv_online) in node_res.payload.items():
997 inst = node_lv_to_inst.pop((node_uuid, lv_name), None)
998 if not lv_online and inst is not None:
999 offline_disk_instance_names.add(inst.name)
1000
1001
1002
1003 for key, inst in node_lv_to_inst.iteritems():
1004 missing_disks.setdefault(inst.name, []).append(list(key))
1005
1007 node_to_inst = {}
1008 for inst in self.instances.values():
1009 if not inst.disks_active or inst.disk_template != constants.DT_DRBD8:
1010 continue
1011
1012 for node_uuid in itertools.chain([inst.primary_node],
1013 inst.secondary_nodes):
1014 node_to_inst.setdefault(node_uuid, []).append(inst)
1015
1016 for (node_uuid, insts) in node_to_inst.items():
1017 node_disks = [(inst.disks, inst) for inst in insts]
1018 node_res = self.rpc.call_drbd_needs_activation(node_uuid, node_disks)
1019 msg = node_res.fail_msg
1020 if msg:
1021 logging.warning("Error getting DRBD status on node %s: %s",
1022 self.cfg.GetNodeName(node_uuid), msg)
1023 node_errors[node_uuid] = msg
1024 continue
1025
1026 faulty_disk_uuids = set(node_res.payload)
1027 for inst in self.instances.values():
1028 inst_disk_uuids = set([disk.uuid for disk in inst.disks])
1029 if inst_disk_uuids.intersection(faulty_disk_uuids):
1030 offline_disk_instance_names.add(inst.name)
1031
1032 - def Exec(self, feedback_fn):
1033 """Verify integrity of cluster disks.
1034
1035 @rtype: tuple of three items
1036 @return: a tuple of (dict of node-to-node_error, list of instances
1037 which need activate-disks, dict of instance: (node, volume) for
1038 missing volumes
1039
1040 """
1041 node_errors = {}
1042 offline_disk_instance_names = set()
1043 missing_disks = {}
1044
1045 self._VerifyInstanceLvs(node_errors, offline_disk_instance_names,
1046 missing_disks)
1047 self._VerifyDrbdStates(node_errors, offline_disk_instance_names)
1048
1049 return (node_errors, list(offline_disk_instance_names), missing_disks)
1050