1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the iallocator code."""
32
33 from ganeti import compat
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import ht
37 from ganeti import outils
38 from ganeti import opcodes
39 import ganeti.rpc.node as rpc
40 from ganeti import serializer
41 from ganeti import utils
42
43 import ganeti.masterd.instance as gmi
44
45 import logging
46
47 _STRING_LIST = ht.TListOf(ht.TString)
48 _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
49
50
51 "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
52 opcodes.OpInstanceMigrate.OP_ID,
53 opcodes.OpInstanceReplaceDisks.OP_ID]),
54 })))
55
56 _NEVAC_MOVED = \
57 ht.TListOf(ht.TAnd(ht.TIsLength(3),
58 ht.TItems([ht.TNonEmptyString,
59 ht.TNonEmptyString,
60 ht.TListOf(ht.TNonEmptyString),
61 ])))
62 _NEVAC_FAILED = \
63 ht.TListOf(ht.TAnd(ht.TIsLength(2),
64 ht.TItems([ht.TNonEmptyString,
65 ht.TMaybeString,
66 ])))
67 _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
68 ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
69
70 _INST_NAME = ("name", ht.TNonEmptyString)
71 _INST_UUID = ("inst_uuid", ht.TNonEmptyString)
75 """Meta class for request definitions.
76
77 """
78 @classmethod
80 """Extract the slots out of REQ_PARAMS.
81
82 """
83 params = attrs.setdefault("REQ_PARAMS", [])
84 return [slot for (slot, _) in params]
85
88 """A generic IAllocator request object.
89
90 """
91 __metaclass__ = _AutoReqParam
92
93 MODE = NotImplemented
94 REQ_PARAMS = []
95 REQ_RESULT = NotImplemented
96
98 """Constructor for IARequestBase.
99
100 The constructor takes only keyword arguments and will set
101 attributes on this object based on the passed arguments. As such,
102 it means that you should not pass arguments which are not in the
103 REQ_PARAMS attribute for this class.
104
105 """
106 outils.ValidatedSlots.__init__(self, **kwargs)
107
108 self.Validate()
109
111 """Validates all parameters of the request.
112
113
114 This method returns L{None} if the validation succeeds, or raises
115 an exception otherwise.
116
117 @rtype: NoneType
118 @return: L{None}, if the validation succeeds
119
120 @raise Exception: validation fails
121
122 """
123 assert self.MODE in constants.VALID_IALLOCATOR_MODES
124
125 for (param, validator) in self.REQ_PARAMS:
126 if not hasattr(self, param):
127 raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
128 errors.ECODE_INVAL)
129
130 value = getattr(self, param)
131 if not validator(value):
132 raise errors.OpPrereqError(("Request parameter '%s' has invalid"
133 " type %s/value %s") %
134 (param, type(value), value),
135 errors.ECODE_INVAL)
136
138 """Gets the request data dict.
139
140 @param cfg: The configuration instance
141
142 """
143 raise NotImplementedError
144
146 """Gets extra parameters to the IAllocator call.
147
148 """
149 return {}
150
152 """Validates the result of an request.
153
154 @param ia: The IAllocator instance
155 @param result: The IAllocator run result
156 @raises ResultValidationError: If validation fails
157
158 """
159 if ia.success and not self.REQ_RESULT(result):
160 raise errors.ResultValidationError("iallocator returned invalid result,"
161 " expected %s, got %s" %
162 (self.REQ_RESULT, result))
163
166 """An instance allocation request.
167
168 """
169
170 MODE = constants.IALLOCATOR_MODE_ALLOC
171 REQ_PARAMS = [
172 _INST_NAME,
173 ("memory", ht.TNonNegativeInt),
174 ("spindle_use", ht.TNonNegativeInt),
175 ("disks", ht.TListOf(ht.TDict)),
176 ("disk_template", ht.TString),
177 ("group_name", ht.TMaybe(ht.TNonEmptyString)),
178 ("os", ht.TString),
179 ("tags", _STRING_LIST),
180 ("nics", ht.TListOf(ht.TDict)),
181 ("vcpus", ht.TInt),
182 ("hypervisor", ht.TString),
183 ("node_whitelist", ht.TMaybeListOf(ht.TNonEmptyString)),
184 ]
185 REQ_RESULT = ht.TList
186
188 """Calculates the required nodes based on the disk_template.
189
190 """
191 if self.disk_template in constants.DTS_INT_MIRROR:
192 return 2
193 else:
194 return 1
195
197 """Requests a new instance.
198
199 The checks for the completeness of the opcode must have already been
200 done.
201
202 """
203 for d in self.disks:
204 d[constants.IDISK_TYPE] = self.disk_template
205 disk_space = gmi.ComputeDiskSize(self.disks)
206
207 return {
208 "name": self.name,
209 "disk_template": self.disk_template,
210 "group_name": self.group_name,
211 "tags": self.tags,
212 "os": self.os,
213 "vcpus": self.vcpus,
214 "memory": self.memory,
215 "spindle_use": self.spindle_use,
216 "disks": self.disks,
217 "disk_space_total": disk_space,
218 "nics": self.nics,
219 "required_nodes": self.RequiredNodes(),
220 "hypervisor": self.hypervisor,
221 }
222
233
250
274
277 """A relocation request.
278
279 """
280
281 MODE = constants.IALLOCATOR_MODE_RELOC
282 REQ_PARAMS = [
283 _INST_UUID,
284 ("relocate_from_node_uuids", _STRING_LIST),
285 ]
286 REQ_RESULT = ht.TList
287
321
345
346 @staticmethod
348 """Returns a list of unique group names for a list of nodes.
349
350 @type node2group: dict
351 @param node2group: Map from node name to group UUID
352 @type groups: dict
353 @param groups: Group information
354 @type nodes: list
355 @param nodes: Node names
356
357 """
358 result = set()
359
360 for node in nodes:
361 try:
362 group_uuid = node2group[node]
363 except KeyError:
364
365 pass
366 else:
367 try:
368 group = groups[group_uuid]
369 except KeyError:
370
371 group_name = group_uuid
372 else:
373 group_name = group["name"]
374
375 result.add(group_name)
376
377 return sorted(result)
378
381 """A node evacuation request.
382
383 """
384
385 MODE = constants.IALLOCATOR_MODE_NODE_EVAC
386 REQ_PARAMS = [
387 ("instances", _STRING_LIST),
388 ("evac_mode", ht.TEvacMode),
389 ("ignore_soft_errors", ht.TMaybe(ht.TBool)),
390 ]
391 REQ_RESULT = _NEVAC_RESULT
392
394 """Get data for node-evacuate requests.
395
396 """
397 return {
398 "instances": self.instances,
399 "evac_mode": self.evac_mode,
400 }
401
403 """Get extra iallocator command line options for
404 node-evacuate requests.
405
406 """
407 if self.ignore_soft_errors:
408 return {"ignore-soft-errors": None}
409 else:
410 return {}
411
433
436 """IAllocator framework.
437
438 An IAllocator instance has three sets of attributes:
439 - cfg that is needed to query the cluster
440 - input data (all members of the _KEYS class attribute are required)
441 - four buffer attributes (in|out_data|text), that represent the
442 input (to the external script) in text and data structure format,
443 and the output from it, again in two formats
444 - the result variables from the script (success, info, nodes) for
445 easy usage
446
447 """
448
449
450
451 - def __init__(self, cfg, rpc_runner, req):
452 self.cfg = cfg
453 self.rpc = rpc_runner
454 self.req = req
455
456 self.in_text = self.out_text = self.in_data = self.out_data = None
457
458 self.success = self.info = self.result = None
459
460 self._BuildInputData(req)
461
464 """Prepare and execute node info call.
465
466 @type disk_templates: list of string
467 @param disk_templates: the disk templates of the instances to be allocated
468 @type node_list: list of strings
469 @param node_list: list of nodes' UUIDs
470 @type cluster_info: L{objects.Cluster}
471 @param cluster_info: the cluster's information from the config
472 @type hypervisor_name: string
473 @param hypervisor_name: the hypervisor name
474 @rtype: same as the result of the node info RPC call
475 @return: the result of the node info RPC call
476
477 """
478 storage_units_raw = utils.storage.GetStorageUnits(self.cfg, disk_templates)
479 storage_units = rpc.PrepareStorageUnitsForNodes(self.cfg, storage_units_raw,
480 node_list)
481 hvspecs = [(hypervisor_name, cluster_info.hvparams[hypervisor_name])]
482 return self.rpc.call_node_info(node_list, storage_units, hvspecs)
483
485 """Compute the generic allocator input data.
486
487 @type disk_template: list of string
488 @param disk_template: the disk templates of the instances to be allocated
489
490 """
491 cfg = self.cfg.GetDetachedConfig()
492 cluster_info = cfg.GetClusterInfo()
493
494 data = {
495 "version": constants.IALLOCATOR_VERSION,
496 "cluster_name": cluster_info.cluster_name,
497 "cluster_tags": list(cluster_info.GetTags()),
498 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
499 "ipolicy": cluster_info.ipolicy,
500 }
501 ginfo = cfg.GetAllNodeGroupsInfo()
502 ninfo = cfg.GetAllNodesInfo()
503 iinfo = cfg.GetAllInstancesInfo()
504 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo.values()]
505
506
507 node_list = [n.uuid for n in ninfo.values() if n.vm_capable]
508
509 if isinstance(self.req, IAReqInstanceAlloc):
510 hypervisor_name = self.req.hypervisor
511 elif isinstance(self.req, IAReqRelocate):
512 hypervisor_name = iinfo[self.req.inst_uuid].hypervisor
513 else:
514 hypervisor_name = cluster_info.primary_hypervisor
515
516 if not disk_template:
517 disk_template = cluster_info.enabled_disk_templates[0]
518
519 node_data = self._ComputeClusterDataNodeInfo([disk_template], node_list,
520 cluster_info, hypervisor_name)
521
522 node_iinfo = \
523 self.rpc.call_all_instances_info(node_list,
524 cluster_info.enabled_hypervisors,
525 cluster_info.hvparams)
526
527 data["nodegroups"] = self._ComputeNodeGroupData(cluster_info, ginfo)
528
529 config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
530 data["nodes"] = self._ComputeDynamicNodeData(
531 ninfo, node_data, node_iinfo, i_list, config_ndata, disk_template)
532 assert len(data["nodes"]) == len(ninfo), \
533 "Incomplete node data computed"
534
535 data["instances"] = self._ComputeInstanceData(cfg, cluster_info, i_list)
536
537 self.in_data = data
538
539 @staticmethod
541 """Compute node groups data.
542
543 """
544 ng = dict((guuid, {
545 "name": gdata.name,
546 "alloc_policy": gdata.alloc_policy,
547 "networks": [net_uuid for net_uuid, _ in gdata.networks.items()],
548 "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
549 "tags": list(gdata.GetTags()),
550 })
551 for guuid, gdata in ginfo.items())
552
553 return ng
554
555 @staticmethod
557 """Compute global node data.
558
559 @rtype: dict
560 @returns: a dict of name: (node dict, node config)
561
562 """
563
564 node_results = dict((ninfo.name, {
565 "tags": list(ninfo.GetTags()),
566 "primary_ip": ninfo.primary_ip,
567 "secondary_ip": ninfo.secondary_ip,
568 "offline": ninfo.offline,
569 "drained": ninfo.drained,
570 "master_candidate": ninfo.master_candidate,
571 "group": ninfo.group,
572 "master_capable": ninfo.master_capable,
573 "vm_capable": ninfo.vm_capable,
574 "ndparams": cfg.GetNdParams(ninfo),
575 "hv_state": cfg.GetFilledHvStateParams(ninfo)
576 })
577 for ninfo in node_cfg.values())
578
579 return node_results
580
581 @staticmethod
583 """Extract an attribute from the hypervisor's node information.
584
585 This is a helper function to extract data from the hypervisor's information
586 about the node, as part of the result of a node_info query.
587
588 @type hv_info: dict of strings
589 @param hv_info: dictionary of node information from the hypervisor
590 @type node_name: string
591 @param node_name: name of the node
592 @type attr: string
593 @param attr: key of the attribute in the hv_info dictionary
594 @rtype: integer
595 @return: the value of the attribute
596 @raises errors.OpExecError: if key not in dictionary or value not
597 integer
598
599 """
600 if attr not in hv_info:
601 raise errors.OpExecError("Node '%s' didn't return attribute"
602 " '%s'" % (node_name, attr))
603 value = hv_info[attr]
604 if not isinstance(value, int):
605 raise errors.OpExecError("Node '%s' returned invalid value"
606 " for '%s': %s" %
607 (node_name, attr, value))
608 return value
609
610 @staticmethod
613 """Extract storage data from node info.
614
615 @type space_info: see result of the RPC call node info
616 @param space_info: the storage reporting part of the result of the RPC call
617 node info
618 @type node_name: string
619 @param node_name: the node's name
620 @type disk_template: string
621 @param disk_template: the disk template to report space for
622 @rtype: 4-tuple of integers
623 @return: tuple of storage info (total_disk, free_disk, total_spindles,
624 free_spindles)
625
626 """
627 storage_type = constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[disk_template]
628 if storage_type not in constants.STS_REPORT:
629 total_disk = total_spindles = 0
630 free_disk = free_spindles = 0
631 else:
632 template_space_info = utils.storage.LookupSpaceInfoByDiskTemplate(
633 space_info, disk_template)
634 if not template_space_info:
635 raise errors.OpExecError("Node '%s' didn't return space info for disk"
636 "template '%s'" % (node_name, disk_template))
637 total_disk = template_space_info["storage_size"]
638 free_disk = template_space_info["storage_free"]
639
640 total_spindles = 0
641 free_spindles = 0
642 if disk_template in constants.DTS_LVM:
643 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
644 space_info, constants.ST_LVM_PV)
645 if lvm_pv_info:
646 total_spindles = lvm_pv_info["storage_size"]
647 free_spindles = lvm_pv_info["storage_free"]
648 return (total_disk, free_disk, total_spindles, free_spindles)
649
650 @staticmethod
652 """Extract storage data from node info.
653
654 @type space_info: see result of the RPC call node info
655 @param space_info: the storage reporting part of the result of the RPC call
656 node info
657 @type node_name: string
658 @param node_name: the node's name
659 @type has_lvm: boolean
660 @param has_lvm: whether or not LVM storage information is requested
661 @rtype: 4-tuple of integers
662 @return: tuple of storage info (total_disk, free_disk, total_spindles,
663 free_spindles)
664
665 """
666
667 if has_lvm:
668 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
669 space_info, constants.ST_LVM_VG)
670 if not lvm_vg_info:
671 raise errors.OpExecError("Node '%s' didn't return LVM vg space info."
672 % (node_name))
673 total_disk = lvm_vg_info["storage_size"]
674 free_disk = lvm_vg_info["storage_free"]
675 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
676 space_info, constants.ST_LVM_PV)
677 if not lvm_pv_info:
678 raise errors.OpExecError("Node '%s' didn't return LVM pv space info."
679 % (node_name))
680 total_spindles = lvm_pv_info["storage_size"]
681 free_spindles = lvm_pv_info["storage_free"]
682 else:
683
684 total_disk = free_disk = 0
685 total_spindles = free_spindles = 0
686 return (total_disk, free_disk, total_spindles, free_spindles)
687
688 @staticmethod
691 """Compute memory used by primary instances.
692
693 @rtype: tuple (int, int, int)
694 @returns: A tuple of three integers: 1. the sum of memory used by primary
695 instances on the node (including the ones that are currently down), 2.
696 the sum of memory used by primary instances of the node that are up, 3.
697 the amount of memory that is free on the node considering the current
698 usage of the instances.
699
700 """
701 i_p_mem = i_p_up_mem = 0
702 mem_free = input_mem_free
703 for iinfo, beinfo in instance_list:
704 if iinfo.primary_node == node_uuid:
705 i_p_mem += beinfo[constants.BE_MAXMEM]
706 if iinfo.name not in node_instances_info[node_uuid].payload:
707 i_used_mem = 0
708 else:
709 i_used_mem = int(node_instances_info[node_uuid]
710 .payload[iinfo.name]["memory"])
711 i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
712 if iinfo.admin_state == constants.ADMINST_UP \
713 and not iinfo.forthcoming:
714 mem_free -= max(0, i_mem_diff)
715 i_p_up_mem += beinfo[constants.BE_MAXMEM]
716 return (i_p_mem, i_p_up_mem, mem_free)
717
720 """Compute global node data.
721
722 @param node_results: the basic node structures as filled from the config
723
724 """
725
726
727 node_results = dict(node_results)
728 for nuuid, nresult in node_data.items():
729 ninfo = node_cfg[nuuid]
730 assert ninfo.name in node_results, "Missing basic data for node %s" % \
731 ninfo.name
732
733 if not ninfo.offline:
734 nresult.Raise("Can't get data for node %s" % ninfo.name)
735 node_iinfo[nuuid].Raise("Can't get node instance info from node %s" %
736 ninfo.name)
737 (_, space_info, (hv_info, )) = nresult.payload
738
739 mem_free = self._GetAttributeFromHypervisorNodeData(hv_info, ninfo.name,
740 "memory_free")
741
742 (i_p_mem, i_p_up_mem, mem_free) = self._ComputeInstanceMemory(
743 i_list, node_iinfo, nuuid, mem_free)
744 (total_disk, free_disk, total_spindles, free_spindles) = \
745 self._ComputeStorageDataFromSpaceInfoByTemplate(
746 space_info, ninfo.name, disk_template)
747
748
749 pnr_dyn = {
750 "total_memory": self._GetAttributeFromHypervisorNodeData(
751 hv_info, ninfo.name, "memory_total"),
752 "reserved_memory": self._GetAttributeFromHypervisorNodeData(
753 hv_info, ninfo.name, "memory_dom0"),
754 "free_memory": mem_free,
755 "total_disk": total_disk,
756 "free_disk": free_disk,
757 "total_spindles": total_spindles,
758 "free_spindles": free_spindles,
759 "total_cpus": self._GetAttributeFromHypervisorNodeData(
760 hv_info, ninfo.name, "cpu_total"),
761 "reserved_cpus": self._GetAttributeFromHypervisorNodeData(
762 hv_info, ninfo.name, "cpu_dom0"),
763 "i_pri_memory": i_p_mem,
764 "i_pri_up_memory": i_p_up_mem,
765 }
766 pnr_dyn.update(node_results[ninfo.name])
767 node_results[ninfo.name] = pnr_dyn
768
769 return node_results
770
771 @staticmethod
773 """Compute global instance data.
774
775 """
776 instance_data = {}
777 for iinfo, beinfo in i_list:
778 nic_data = []
779 for nic in iinfo.nics:
780 filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
781 nic_dict = {
782 "mac": nic.mac,
783 "ip": nic.ip,
784 "mode": filled_params[constants.NIC_MODE],
785 "link": filled_params[constants.NIC_LINK],
786 }
787 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
788 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
789 nic_data.append(nic_dict)
790 inst_disks = cfg.GetInstanceDisks(iinfo.uuid)
791 inst_disktemplate = cfg.GetInstanceDiskTemplate(iinfo.uuid)
792 pir = {
793 "tags": list(iinfo.GetTags()),
794 "admin_state": iinfo.admin_state,
795 "vcpus": beinfo[constants.BE_VCPUS],
796 "memory": beinfo[constants.BE_MAXMEM],
797 "spindle_use": beinfo[constants.BE_SPINDLE_USE],
798 "os": iinfo.os,
799 "nodes": [cfg.GetNodeName(iinfo.primary_node)] +
800 cfg.GetNodeNames(
801 cfg.GetInstanceSecondaryNodes(iinfo.uuid)),
802 "nics": nic_data,
803 "disks": [{constants.IDISK_TYPE: dsk.dev_type,
804 constants.IDISK_SIZE: dsk.size,
805 constants.IDISK_MODE: dsk.mode,
806 constants.IDISK_SPINDLES: dsk.spindles}
807 for dsk in inst_disks],
808 "disk_template": inst_disktemplate,
809 "disks_active": iinfo.disks_active,
810 "hypervisor": iinfo.hypervisor,
811 }
812 pir["disk_space_total"] = gmi.ComputeDiskSize(pir["disks"])
813 instance_data[iinfo.name] = pir
814
815 return instance_data
816
842
843 - def Run(self, name, validate=True, call_fn=None):
844 """Run an instance allocator and return the results.
845
846 """
847 if call_fn is None:
848 call_fn = self.rpc.call_iallocator_runner
849
850 ial_params = self.cfg.GetDefaultIAllocatorParameters()
851
852 for ial_param in self.req.GetExtraParams().items():
853 ial_params[ial_param[0]] = ial_param[1]
854
855 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text, ial_params)
856 result.Raise("Failure while running the iallocator script")
857
858 self.out_text = result.payload
859 if validate:
860 self._ValidateResult()
861
863 """Process the allocator results.
864
865 This will process and if successful save the result in
866 self.out_data and the other parameters.
867
868 """
869 try:
870 rdict = serializer.Load(self.out_text)
871 except Exception, err:
872 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
873
874 if not isinstance(rdict, dict):
875 raise errors.OpExecError("Can't parse iallocator results: not a dict")
876
877
878 if "nodes" in rdict and "result" not in rdict:
879 rdict["result"] = rdict["nodes"]
880 del rdict["nodes"]
881
882 for key in "success", "info", "result":
883 if key not in rdict:
884 raise errors.OpExecError("Can't parse iallocator results:"
885 " missing key '%s'" % key)
886 setattr(self, key, rdict[key])
887
888 self.req.ValidateResult(self, self.result)
889 self.out_data = rdict
890