1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Logical units dealing with node groups."""
23
24 import logging
25
26 from ganeti import constants
27 from ganeti import errors
28 from ganeti import locking
29 from ganeti import objects
30 from ganeti import qlang
31 from ganeti import query
32 from ganeti import utils
33 from ganeti.masterd import iallocator
34 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
35 ResultWithJobs
36 from ganeti.cmdlib.common import MergeAndVerifyHvState, \
37 MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \
38 CheckNodeGroupInstances, GetUpdatedIPolicy, \
39 ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \
40 CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceDisksToNodes
41
42 import ganeti.masterd.instance
46 """Logical unit for creating node groups.
47
48 """
49 HPATH = "group-add"
50 HTYPE = constants.HTYPE_GROUP
51 REQ_BGL = False
52
54
55
56
57 self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
58 self.needed_locks = {}
59 self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
60
62 """Check prerequisites.
63
64 This checks that the given group name is not an existing node group
65 already.
66
67 """
68 try:
69 existing_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
70 except errors.OpPrereqError:
71 pass
72 else:
73 raise errors.OpPrereqError("Desired group name '%s' already exists as a"
74 " node group (UUID: %s)" %
75 (self.op.group_name, existing_uuid),
76 errors.ECODE_EXISTS)
77
78 if self.op.ndparams:
79 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
80
81 if self.op.hv_state:
82 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
83 else:
84 self.new_hv_state = None
85
86 if self.op.disk_state:
87 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
88 else:
89 self.new_disk_state = None
90
91 if self.op.diskparams:
92 for templ in constants.DISK_TEMPLATES:
93 if templ in self.op.diskparams:
94 utils.ForceDictType(self.op.diskparams[templ],
95 constants.DISK_DT_TYPES)
96 self.new_diskparams = self.op.diskparams
97 try:
98 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
99 except errors.OpPrereqError, err:
100 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
101 errors.ECODE_INVAL)
102 else:
103 self.new_diskparams = {}
104
105 if self.op.ipolicy:
106 cluster = self.cfg.GetClusterInfo()
107 full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
108 try:
109 objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False)
110 except errors.ConfigurationError, err:
111 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
112 errors.ECODE_INVAL)
113
115 """Build hooks env.
116
117 """
118 return {
119 "GROUP_NAME": self.op.group_name,
120 }
121
123 """Build hooks nodes.
124
125 """
126 mn = self.cfg.GetMasterNode()
127 return ([mn], [mn])
128
129 - def Exec(self, feedback_fn):
130 """Add the node group to the cluster.
131
132 """
133 group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
134 uuid=self.group_uuid,
135 alloc_policy=self.op.alloc_policy,
136 ndparams=self.op.ndparams,
137 diskparams=self.new_diskparams,
138 ipolicy=self.op.ipolicy,
139 hv_state_static=self.new_hv_state,
140 disk_state_static=self.new_disk_state)
141
142 self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
143 del self.remove_locks[locking.LEVEL_NODEGROUP]
144
147 """Logical unit for assigning nodes to groups.
148
149 """
150 REQ_BGL = False
151
153
154 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
155 self.op.nodes = GetWantedNodes(self, self.op.nodes)
156
157
158
159
160 self.needed_locks = {
161 locking.LEVEL_NODEGROUP: set([self.group_uuid]),
162 locking.LEVEL_NODE: self.op.nodes,
163 }
164
166 if level == locking.LEVEL_NODEGROUP:
167 assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
168
169
170
171 groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes)
172
173 self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
174
176 """Check prerequisites.
177
178 """
179 assert self.needed_locks[locking.LEVEL_NODEGROUP]
180 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
181 frozenset(self.op.nodes))
182
183 expected_locks = (set([self.group_uuid]) |
184 self.cfg.GetNodeGroupsFromNodes(self.op.nodes))
185 actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP)
186 if actual_locks != expected_locks:
187 raise errors.OpExecError("Nodes changed groups since locks were acquired,"
188 " current groups are '%s', used to be '%s'" %
189 (utils.CommaJoin(expected_locks),
190 utils.CommaJoin(actual_locks)))
191
192 self.node_data = self.cfg.GetAllNodesInfo()
193 self.group = self.cfg.GetNodeGroup(self.group_uuid)
194 instance_data = self.cfg.GetAllInstancesInfo()
195
196 if self.group is None:
197 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
198 (self.op.group_name, self.group_uuid))
199
200 (new_splits, previous_splits) = \
201 self.CheckAssignmentForSplitInstances([(node, self.group_uuid)
202 for node in self.op.nodes],
203 self.node_data, instance_data)
204
205 if new_splits:
206 fmt_new_splits = utils.CommaJoin(utils.NiceSort(new_splits))
207
208 if not self.op.force:
209 raise errors.OpExecError("The following instances get split by this"
210 " change and --force was not given: %s" %
211 fmt_new_splits)
212 else:
213 self.LogWarning("This operation will split the following instances: %s",
214 fmt_new_splits)
215
216 if previous_splits:
217 self.LogWarning("In addition, these already-split instances continue"
218 " to be split across groups: %s",
219 utils.CommaJoin(utils.NiceSort(previous_splits)))
220
221 - def Exec(self, feedback_fn):
222 """Assign nodes to a new group.
223
224 """
225 mods = [(node_name, self.group_uuid) for node_name in self.op.nodes]
226
227 self.cfg.AssignGroupNodes(mods)
228
229 @staticmethod
231 """Check for split instances after a node assignment.
232
233 This method considers a series of node assignments as an atomic operation,
234 and returns information about split instances after applying the set of
235 changes.
236
237 In particular, it returns information about newly split instances, and
238 instances that were already split, and remain so after the change.
239
240 Only instances whose disk template is listed in constants.DTS_INT_MIRROR are
241 considered.
242
243 @type changes: list of (node_name, new_group_uuid) pairs.
244 @param changes: list of node assignments to consider.
245 @param node_data: a dict with data for all nodes
246 @param instance_data: a dict with all instances to consider
247 @rtype: a two-tuple
248 @return: a list of instances that were previously okay and result split as a
249 consequence of this change, and a list of instances that were previously
250 split and this change does not fix.
251
252 """
253 changed_nodes = dict((node, group) for node, group in changes
254 if node_data[node].group != group)
255
256 all_split_instances = set()
257 previously_split_instances = set()
258
259 def InstanceNodes(instance):
260 return [instance.primary_node] + list(instance.secondary_nodes)
261
262 for inst in instance_data.values():
263 if inst.disk_template not in constants.DTS_INT_MIRROR:
264 continue
265
266 instance_nodes = InstanceNodes(inst)
267
268 if len(set(node_data[node].group for node in instance_nodes)) > 1:
269 previously_split_instances.add(inst.name)
270
271 if len(set(changed_nodes.get(node, node_data[node].group)
272 for node in instance_nodes)) > 1:
273 all_split_instances.add(inst.name)
274
275 return (list(all_split_instances - previously_split_instances),
276 list(previously_split_instances & all_split_instances))
277
280 FIELDS = query.GROUP_FIELDS
281
283 lu.needed_locks = {}
284
285 self._all_groups = lu.cfg.GetAllNodeGroupsInfo()
286 self._cluster = lu.cfg.GetClusterInfo()
287 name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values())
288
289 if not self.names:
290 self.wanted = [name_to_uuid[name]
291 for name in utils.NiceSort(name_to_uuid.keys())]
292 else:
293
294 missing = []
295 self.wanted = []
296 all_uuid = frozenset(self._all_groups.keys())
297
298 for name in self.names:
299 if name in all_uuid:
300 self.wanted.append(name)
301 elif name in name_to_uuid:
302 self.wanted.append(name_to_uuid[name])
303 else:
304 missing.append(name)
305
306 if missing:
307 raise errors.OpPrereqError("Some groups do not exist: %s" %
308 utils.CommaJoin(missing),
309 errors.ECODE_NOENT)
310
313
315 """Computes the list of node groups and their attributes.
316
317 """
318 do_nodes = query.GQ_NODE in self.requested_data
319 do_instances = query.GQ_INST in self.requested_data
320
321 group_to_nodes = None
322 group_to_instances = None
323
324
325
326
327
328
329 if do_nodes or do_instances:
330 all_nodes = lu.cfg.GetAllNodesInfo()
331 group_to_nodes = dict((uuid, []) for uuid in self.wanted)
332 node_to_group = {}
333
334 for node in all_nodes.values():
335 if node.group in group_to_nodes:
336 group_to_nodes[node.group].append(node.name)
337 node_to_group[node.name] = node.group
338
339 if do_instances:
340 all_instances = lu.cfg.GetAllInstancesInfo()
341 group_to_instances = dict((uuid, []) for uuid in self.wanted)
342
343 for instance in all_instances.values():
344 node = instance.primary_node
345 if node in node_to_group:
346 group_to_instances[node_to_group[node]].append(instance.name)
347
348 if not do_nodes:
349
350 group_to_nodes = None
351
352 return query.GroupQueryData(self._cluster,
353 [self._all_groups[uuid]
354 for uuid in self.wanted],
355 group_to_nodes, group_to_instances,
356 query.GQ_DISKPARAMS in self.requested_data)
357
360 """Logical unit for querying node groups.
361
362 """
363 REQ_BGL = False
364
368
371
374
375 - def Exec(self, feedback_fn):
377
380 """Modifies the parameters of a node group.
381
382 """
383 HPATH = "group-modify"
384 HTYPE = constants.HTYPE_GROUP
385 REQ_BGL = False
386
388 all_changes = [
389 self.op.ndparams,
390 self.op.diskparams,
391 self.op.alloc_policy,
392 self.op.hv_state,
393 self.op.disk_state,
394 self.op.ipolicy,
395 ]
396
397 if all_changes.count(None) == len(all_changes):
398 raise errors.OpPrereqError("Please pass at least one modification",
399 errors.ECODE_INVAL)
400
402
403 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
404
405 self.needed_locks = {
406 locking.LEVEL_INSTANCE: [],
407 locking.LEVEL_NODEGROUP: [self.group_uuid],
408 }
409
410 self.share_locks[locking.LEVEL_INSTANCE] = 1
411
413 if level == locking.LEVEL_INSTANCE:
414 assert not self.needed_locks[locking.LEVEL_INSTANCE]
415
416
417
418 self.needed_locks[locking.LEVEL_INSTANCE] = \
419 self.cfg.GetNodeGroupInstances(self.group_uuid)
420
421 @staticmethod
429
431 """Check prerequisites.
432
433 """
434 owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
435
436
437 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
438
439 self.group = self.cfg.GetNodeGroup(self.group_uuid)
440 cluster = self.cfg.GetClusterInfo()
441
442 if self.group is None:
443 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
444 (self.op.group_name, self.group_uuid))
445
446 if self.op.ndparams:
447 new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams)
448 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
449 self.new_ndparams = new_ndparams
450
451 if self.op.diskparams:
452 diskparams = self.group.diskparams
453 uavdp = self._UpdateAndVerifyDiskParams
454
455 new_diskparams = dict((dt,
456 uavdp(diskparams.get(dt, {}),
457 self.op.diskparams[dt]))
458 for dt in constants.DISK_TEMPLATES
459 if dt in self.op.diskparams)
460
461
462 self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
463 try:
464 utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
465 except errors.OpPrereqError, err:
466 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
467 errors.ECODE_INVAL)
468
469 if self.op.hv_state:
470 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
471 self.group.hv_state_static)
472
473 if self.op.disk_state:
474 self.new_disk_state = \
475 MergeAndVerifyDiskState(self.op.disk_state,
476 self.group.disk_state_static)
477
478 if self.op.ipolicy:
479 self.new_ipolicy = GetUpdatedIPolicy(self.group.ipolicy,
480 self.op.ipolicy,
481 group_policy=True)
482
483 new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy)
484 inst_filter = lambda inst: inst.name in owned_instances
485 instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values()
486 gmi = ganeti.masterd.instance
487 violations = \
488 ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster,
489 self.group),
490 new_ipolicy, instances, self.cfg)
491
492 if violations:
493 self.LogWarning("After the ipolicy change the following instances"
494 " violate them: %s",
495 utils.CommaJoin(violations))
496
498 """Build hooks env.
499
500 """
501 return {
502 "GROUP_NAME": self.op.group_name,
503 "NEW_ALLOC_POLICY": self.op.alloc_policy,
504 }
505
507 """Build hooks nodes.
508
509 """
510 mn = self.cfg.GetMasterNode()
511 return ([mn], [mn])
512
513 - def Exec(self, feedback_fn):
514 """Modifies the node group.
515
516 """
517 result = []
518
519 if self.op.ndparams:
520 self.group.ndparams = self.new_ndparams
521 result.append(("ndparams", str(self.group.ndparams)))
522
523 if self.op.diskparams:
524 self.group.diskparams = self.new_diskparams
525 result.append(("diskparams", str(self.group.diskparams)))
526
527 if self.op.alloc_policy:
528 self.group.alloc_policy = self.op.alloc_policy
529
530 if self.op.hv_state:
531 self.group.hv_state_static = self.new_hv_state
532
533 if self.op.disk_state:
534 self.group.disk_state_static = self.new_disk_state
535
536 if self.op.ipolicy:
537 self.group.ipolicy = self.new_ipolicy
538
539 self.cfg.Update(self.group, feedback_fn)
540 return result
541
544 HPATH = "group-remove"
545 HTYPE = constants.HTYPE_GROUP
546 REQ_BGL = False
547
549
550 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
551 self.needed_locks = {
552 locking.LEVEL_NODEGROUP: [self.group_uuid],
553 }
554
556 """Check prerequisites.
557
558 This checks that the given group name exists as a node group, that is
559 empty (i.e., contains no nodes), and that is not the last group of the
560 cluster.
561
562 """
563
564 group_nodes = [node.name
565 for node in self.cfg.GetAllNodesInfo().values()
566 if node.group == self.group_uuid]
567
568 if group_nodes:
569 raise errors.OpPrereqError("Group '%s' not empty, has the following"
570 " nodes: %s" %
571 (self.op.group_name,
572 utils.CommaJoin(utils.NiceSort(group_nodes))),
573 errors.ECODE_STATE)
574
575
576 if len(self.cfg.GetNodeGroupList()) == 1:
577 raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
578 " removed" % self.op.group_name,
579 errors.ECODE_STATE)
580
582 """Build hooks env.
583
584 """
585 return {
586 "GROUP_NAME": self.op.group_name,
587 }
588
590 """Build hooks nodes.
591
592 """
593 mn = self.cfg.GetMasterNode()
594 return ([mn], [mn])
595
596 - def Exec(self, feedback_fn):
597 """Remove the node group.
598
599 """
600 try:
601 self.cfg.RemoveNodeGroup(self.group_uuid)
602 except errors.ConfigurationError:
603 raise errors.OpExecError("Group '%s' with UUID %s disappeared" %
604 (self.op.group_name, self.group_uuid))
605
606 self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
607
610 HPATH = "group-rename"
611 HTYPE = constants.HTYPE_GROUP
612 REQ_BGL = False
613
615
616 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
617
618 self.needed_locks = {
619 locking.LEVEL_NODEGROUP: [self.group_uuid],
620 }
621
623 """Check prerequisites.
624
625 Ensures requested new name is not yet used.
626
627 """
628 try:
629 new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name)
630 except errors.OpPrereqError:
631 pass
632 else:
633 raise errors.OpPrereqError("Desired new name '%s' clashes with existing"
634 " node group (UUID: %s)" %
635 (self.op.new_name, new_name_uuid),
636 errors.ECODE_EXISTS)
637
639 """Build hooks env.
640
641 """
642 return {
643 "OLD_NAME": self.op.group_name,
644 "NEW_NAME": self.op.new_name,
645 }
646
661
662 - def Exec(self, feedback_fn):
663 """Rename the node group.
664
665 """
666 group = self.cfg.GetNodeGroup(self.group_uuid)
667
668 if group is None:
669 raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
670 (self.op.group_name, self.group_uuid))
671
672 group.name = self.op.new_name
673 self.cfg.Update(group, feedback_fn)
674
675 return self.op.new_name
676
679 HPATH = "group-evacuate"
680 HTYPE = constants.HTYPE_GROUP
681 REQ_BGL = False
682
684
685 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
686
687 if self.op.target_groups:
688 self.req_target_uuids = map(self.cfg.LookupNodeGroup,
689 self.op.target_groups)
690 else:
691 self.req_target_uuids = []
692
693 if self.group_uuid in self.req_target_uuids:
694 raise errors.OpPrereqError("Group to be evacuated (%s) can not be used"
695 " as a target group (targets are %s)" %
696 (self.group_uuid,
697 utils.CommaJoin(self.req_target_uuids)),
698 errors.ECODE_INVAL)
699
700 self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator)
701
702 self.share_locks = ShareAll()
703 self.needed_locks = {
704 locking.LEVEL_INSTANCE: [],
705 locking.LEVEL_NODEGROUP: [],
706 locking.LEVEL_NODE: [],
707 }
708
710 if level == locking.LEVEL_INSTANCE:
711 assert not self.needed_locks[locking.LEVEL_INSTANCE]
712
713
714
715 self.needed_locks[locking.LEVEL_INSTANCE] = \
716 self.cfg.GetNodeGroupInstances(self.group_uuid)
717
718 elif level == locking.LEVEL_NODEGROUP:
719 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
720
721 if self.req_target_uuids:
722 lock_groups = set([self.group_uuid] + self.req_target_uuids)
723
724
725
726 lock_groups.update(group_uuid
727 for instance_name in
728 self.owned_locks(locking.LEVEL_INSTANCE)
729 for group_uuid in
730 self.cfg.GetInstanceNodeGroups(instance_name))
731 else:
732
733 lock_groups = locking.ALL_SET
734
735 self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
736
737 elif level == locking.LEVEL_NODE:
738
739
740 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
741 self._LockInstancesNodes()
742
743
744 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
745 assert self.group_uuid in owned_groups
746 member_nodes = [node_name
747 for group in owned_groups
748 for node_name in self.cfg.GetNodeGroup(group).members]
749 self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
750
752 owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
753 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
754 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
755
756 assert owned_groups.issuperset(self.req_target_uuids)
757 assert self.group_uuid in owned_groups
758
759
760 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
761
762
763 self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
764
765
766 CheckInstancesNodeGroups(self.cfg, self.instances,
767 owned_groups, owned_nodes, self.group_uuid)
768
769 if self.req_target_uuids:
770
771 self.target_uuids = self.req_target_uuids
772 else:
773
774 self.target_uuids = [group_uuid for group_uuid in owned_groups
775 if group_uuid != self.group_uuid]
776
777 if not self.target_uuids:
778 raise errors.OpPrereqError("There are no possible target groups",
779 errors.ECODE_INVAL)
780
782 """Build hooks env.
783
784 """
785 return {
786 "GROUP_NAME": self.op.group_name,
787 "TARGET_GROUPS": " ".join(self.target_uuids),
788 }
789
791 """Build hooks nodes.
792
793 """
794 mn = self.cfg.GetMasterNode()
795
796 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
797
798 run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
799
800 return (run_nodes, run_nodes)
801
802 - def Exec(self, feedback_fn):
803 instances = list(self.owned_locks(locking.LEVEL_INSTANCE))
804
805 assert self.group_uuid not in self.target_uuids
806
807 req = iallocator.IAReqGroupChange(instances=instances,
808 target_groups=self.target_uuids)
809 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
810
811 ial.Run(self.op.iallocator)
812
813 if not ial.success:
814 raise errors.OpPrereqError("Can't compute group evacuation using"
815 " iallocator '%s': %s" %
816 (self.op.iallocator, ial.info),
817 errors.ECODE_NORES)
818
819 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
820
821 self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s",
822 len(jobs), self.op.group_name)
823
824 return ResultWithJobs(jobs)
825
828 """Verifies the status of all disks in a node group.
829
830 """
831 REQ_BGL = False
832
849
851 if level == locking.LEVEL_INSTANCE:
852 assert not self.needed_locks[locking.LEVEL_INSTANCE]
853
854
855
856 self.needed_locks[locking.LEVEL_INSTANCE] = \
857 self.cfg.GetNodeGroupInstances(self.group_uuid)
858
859 elif level == locking.LEVEL_NODEGROUP:
860 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
861
862 self.needed_locks[locking.LEVEL_NODEGROUP] = \
863 set([self.group_uuid] +
864
865
866
867 [group_uuid
868 for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
869 for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
870
871 elif level == locking.LEVEL_NODE:
872
873
874 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
875 self._LockInstancesNodes()
876
877
878 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
879 member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
880 self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
881
883 owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
884 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
885 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
886
887 assert self.group_uuid in owned_groups
888
889
890 CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
891
892
893 self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
894
895
896 CheckInstancesNodeGroups(self.cfg, self.instances,
897 owned_groups, owned_nodes, self.group_uuid)
898
899 - def Exec(self, feedback_fn):
900 """Verify integrity of cluster disks.
901
902 @rtype: tuple of three items
903 @return: a tuple of (dict of node-to-node_error, list of instances
904 which need activate-disks, dict of instance: (node, volume) for
905 missing volumes
906
907 """
908 res_nodes = {}
909 res_instances = set()
910 res_missing = {}
911
912 nv_dict = MapInstanceDisksToNodes(
913 [inst for inst in self.instances.values() if inst.disks_active])
914
915 if nv_dict:
916 nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
917 set(self.cfg.GetVmCapableNodeList()))
918
919 node_lvs = self.rpc.call_lv_list(nodes, [])
920
921 for (node, node_res) in node_lvs.items():
922 if node_res.offline:
923 continue
924
925 msg = node_res.fail_msg
926 if msg:
927 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
928 res_nodes[node] = msg
929 continue
930
931 for lv_name, (_, _, lv_online) in node_res.payload.items():
932 inst = nv_dict.pop((node, lv_name), None)
933 if not (lv_online or inst is None):
934 res_instances.add(inst)
935
936
937
938 for key, inst in nv_dict.iteritems():
939 res_missing.setdefault(inst, []).append(list(key))
940
941 return (res_nodes, list(res_instances), res_missing)
942