1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the iallocator code."""
23
24 from ganeti import compat
25 from ganeti import constants
26 from ganeti import errors
27 from ganeti import ht
28 from ganeti import outils
29 from ganeti import opcodes
30 from ganeti import rpc
31 from ganeti import serializer
32 from ganeti import utils
33
34 import ganeti.masterd.instance as gmi
35
36
37 _STRING_LIST = ht.TListOf(ht.TString)
38 _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39
40
41 "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42 opcodes.OpInstanceMigrate.OP_ID,
43 opcodes.OpInstanceReplaceDisks.OP_ID]),
44 })))
45
46 _NEVAC_MOVED = \
47 ht.TListOf(ht.TAnd(ht.TIsLength(3),
48 ht.TItems([ht.TNonEmptyString,
49 ht.TNonEmptyString,
50 ht.TListOf(ht.TNonEmptyString),
51 ])))
52 _NEVAC_FAILED = \
53 ht.TListOf(ht.TAnd(ht.TIsLength(2),
54 ht.TItems([ht.TNonEmptyString,
55 ht.TMaybeString,
56 ])))
57 _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58 ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59
60 _INST_NAME = ("name", ht.TNonEmptyString)
64 """Meta class for request definitions.
65
66 """
67 @classmethod
69 """Extract the slots out of REQ_PARAMS.
70
71 """
72 params = attrs.setdefault("REQ_PARAMS", [])
73 return [slot for (slot, _) in params]
74
77 """A generic IAllocator request object.
78
79 """
80 __metaclass__ = _AutoReqParam
81
82 MODE = NotImplemented
83 REQ_PARAMS = []
84 REQ_RESULT = NotImplemented
85
87 """Constructor for IARequestBase.
88
89 The constructor takes only keyword arguments and will set
90 attributes on this object based on the passed arguments. As such,
91 it means that you should not pass arguments which are not in the
92 REQ_PARAMS attribute for this class.
93
94 """
95 outils.ValidatedSlots.__init__(self, **kwargs)
96
97 self.Validate()
98
116
118 """Gets the request data dict.
119
120 @param cfg: The configuration instance
121
122 """
123 raise NotImplementedError
124
126 """Validates the result of an request.
127
128 @param ia: The IAllocator instance
129 @param result: The IAllocator run result
130 @raises ResultValidationError: If validation fails
131
132 """
133 if ia.success and not self.REQ_RESULT(result):
134 raise errors.ResultValidationError("iallocator returned invalid result,"
135 " expected %s, got %s" %
136 (self.REQ_RESULT, result))
137
140 """An instance allocation request.
141
142 """
143
144 MODE = constants.IALLOCATOR_MODE_ALLOC
145 REQ_PARAMS = [
146 _INST_NAME,
147 ("memory", ht.TNonNegativeInt),
148 ("spindle_use", ht.TNonNegativeInt),
149 ("disks", ht.TListOf(ht.TDict)),
150 ("disk_template", ht.TString),
151 ("os", ht.TString),
152 ("tags", _STRING_LIST),
153 ("nics", ht.TListOf(ht.TDict)),
154 ("vcpus", ht.TInt),
155 ("hypervisor", ht.TString),
156 ("node_whitelist", ht.TMaybeListOf(ht.TNonEmptyString)),
157 ]
158 REQ_RESULT = ht.TList
159
161 """Calculates the required nodes based on the disk_template.
162
163 """
164 if self.disk_template in constants.DTS_INT_MIRROR:
165 return 2
166 else:
167 return 1
168
170 """Requests a new instance.
171
172 The checks for the completeness of the opcode must have already been
173 done.
174
175 """
176 disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
177
178 return {
179 "name": self.name,
180 "disk_template": self.disk_template,
181 "tags": self.tags,
182 "os": self.os,
183 "vcpus": self.vcpus,
184 "memory": self.memory,
185 "spindle_use": self.spindle_use,
186 "disks": self.disks,
187 "disk_space_total": disk_space,
188 "nics": self.nics,
189 "required_nodes": self.RequiredNodes(),
190 "hypervisor": self.hypervisor,
191 }
192
203
227
230 """A relocation request.
231
232 """
233
234 MODE = constants.IALLOCATOR_MODE_RELOC
235 REQ_PARAMS = [
236 _INST_NAME,
237 ("relocate_from", _STRING_LIST),
238 ]
239 REQ_RESULT = ht.TList
240
271
273 """Validates the result of an relocation request.
274
275 """
276 IARequestBase.ValidateResult(self, ia, result)
277
278 node2group = dict((name, ndata["group"])
279 for (name, ndata) in ia.in_data["nodes"].items())
280
281 fn = compat.partial(self._NodesToGroups, node2group,
282 ia.in_data["nodegroups"])
283
284 instance = ia.cfg.GetInstanceInfo(self.name)
285 request_groups = fn(self.relocate_from + [instance.primary_node])
286 result_groups = fn(result + [instance.primary_node])
287
288 if ia.success and not set(result_groups).issubset(request_groups):
289 raise errors.ResultValidationError("Groups of nodes returned by"
290 "iallocator (%s) differ from original"
291 " groups (%s)" %
292 (utils.CommaJoin(result_groups),
293 utils.CommaJoin(request_groups)))
294
295 @staticmethod
297 """Returns a list of unique group names for a list of nodes.
298
299 @type node2group: dict
300 @param node2group: Map from node name to group UUID
301 @type groups: dict
302 @param groups: Group information
303 @type nodes: list
304 @param nodes: Node names
305
306 """
307 result = set()
308
309 for node in nodes:
310 try:
311 group_uuid = node2group[node]
312 except KeyError:
313
314 pass
315 else:
316 try:
317 group = groups[group_uuid]
318 except KeyError:
319
320 group_name = group_uuid
321 else:
322 group_name = group["name"]
323
324 result.add(group_name)
325
326 return sorted(result)
327
349
371
374 """IAllocator framework.
375
376 An IAllocator instance has three sets of attributes:
377 - cfg that is needed to query the cluster
378 - input data (all members of the _KEYS class attribute are required)
379 - four buffer attributes (in|out_data|text), that represent the
380 input (to the external script) in text and data structure format,
381 and the output from it, again in two formats
382 - the result variables from the script (success, info, nodes) for
383 easy usage
384
385 """
386
387
388
389 - def __init__(self, cfg, rpc_runner, req):
390 self.cfg = cfg
391 self.rpc = rpc_runner
392 self.req = req
393
394 self.in_text = self.out_text = self.in_data = self.out_data = None
395
396 self.success = self.info = self.result = None
397
398 self._BuildInputData(req)
399
401 """Compute the generic allocator input data.
402
403 This is the data that is independent of the actual operation.
404
405 """
406 cfg = self.cfg
407 cluster_info = cfg.GetClusterInfo()
408
409 data = {
410 "version": constants.IALLOCATOR_VERSION,
411 "cluster_name": cfg.GetClusterName(),
412 "cluster_tags": list(cluster_info.GetTags()),
413 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
414 "ipolicy": cluster_info.ipolicy,
415 }
416 ninfo = cfg.GetAllNodesInfo()
417 iinfo = cfg.GetAllInstancesInfo().values()
418 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
419
420
421 node_list = [n.name for n in ninfo.values() if n.vm_capable]
422
423 if isinstance(self.req, IAReqInstanceAlloc):
424 hypervisor_name = self.req.hypervisor
425 node_whitelist = self.req.node_whitelist
426 elif isinstance(self.req, IAReqRelocate):
427 hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
428 node_whitelist = None
429 else:
430 hypervisor_name = cluster_info.primary_hypervisor
431 node_whitelist = None
432
433 es_flags = rpc.GetExclusiveStorageForNodeNames(cfg, node_list)
434 vg_name = cfg.GetVGName()
435 if vg_name is not None:
436 has_lvm = True
437 vg_req = [vg_name]
438 else:
439 has_lvm = False
440 vg_req = []
441 node_data = self.rpc.call_node_info(node_list, vg_req,
442 [hypervisor_name], es_flags)
443 node_iinfo = \
444 self.rpc.call_all_instances_info(node_list,
445 cluster_info.enabled_hypervisors)
446
447 data["nodegroups"] = self._ComputeNodeGroupData(cfg)
448
449 config_ndata = self._ComputeBasicNodeData(cfg, ninfo, node_whitelist)
450 data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
451 i_list, config_ndata, has_lvm)
452 assert len(data["nodes"]) == len(ninfo), \
453 "Incomplete node data computed"
454
455 data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
456
457 self.in_data = data
458
459 @staticmethod
461 """Compute node groups data.
462
463 """
464 cluster = cfg.GetClusterInfo()
465 ng = dict((guuid, {
466 "name": gdata.name,
467 "alloc_policy": gdata.alloc_policy,
468 "networks": [net_uuid for net_uuid, _ in gdata.networks.items()],
469 "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
470 "tags": list(gdata.GetTags()),
471 })
472 for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
473
474 return ng
475
476 @staticmethod
478 """Compute global node data.
479
480 @rtype: dict
481 @returns: a dict of name: (node dict, node config)
482
483 """
484
485 node_results = dict((ninfo.name, {
486 "tags": list(ninfo.GetTags()),
487 "primary_ip": ninfo.primary_ip,
488 "secondary_ip": ninfo.secondary_ip,
489 "offline": (ninfo.offline or
490 not (node_whitelist is None or
491 ninfo.name in node_whitelist)),
492 "drained": ninfo.drained,
493 "master_candidate": ninfo.master_candidate,
494 "group": ninfo.group,
495 "master_capable": ninfo.master_capable,
496 "vm_capable": ninfo.vm_capable,
497 "ndparams": cfg.GetNdParams(ninfo),
498 })
499 for ninfo in node_cfg.values())
500
501 return node_results
502
503 @staticmethod
506 """Compute global node data.
507
508 @param node_results: the basic node structures as filled from the config
509
510 """
511
512
513 node_results = dict(node_results)
514 for nname, nresult in node_data.items():
515 assert nname in node_results, "Missing basic data for node %s" % nname
516 ninfo = node_cfg[nname]
517
518 if not ninfo.offline:
519 nresult.Raise("Can't get data for node %s" % nname)
520 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
521 nname)
522 remote_info = rpc.MakeLegacyNodeInfo(nresult.payload,
523 require_vg_info=has_lvm)
524
525 def get_attr(attr):
526 if attr not in remote_info:
527 raise errors.OpExecError("Node '%s' didn't return attribute"
528 " '%s'" % (nname, attr))
529 value = remote_info[attr]
530 if not isinstance(value, int):
531 raise errors.OpExecError("Node '%s' returned invalid value"
532 " for '%s': %s" %
533 (nname, attr, value))
534 return value
535
536 mem_free = get_attr("memory_free")
537
538
539 i_p_mem = i_p_up_mem = 0
540 for iinfo, beinfo in i_list:
541 if iinfo.primary_node == nname:
542 i_p_mem += beinfo[constants.BE_MAXMEM]
543 if iinfo.name not in node_iinfo[nname].payload:
544 i_used_mem = 0
545 else:
546 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
547 i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
548 mem_free -= max(0, i_mem_diff)
549
550 if iinfo.admin_state == constants.ADMINST_UP:
551 i_p_up_mem += beinfo[constants.BE_MAXMEM]
552
553
554 if has_lvm:
555 total_disk = get_attr("vg_size")
556 free_disk = get_attr("vg_free")
557 else:
558
559 total_disk = free_disk = 0
560
561
562 pnr_dyn = {
563 "total_memory": get_attr("memory_total"),
564 "reserved_memory": get_attr("memory_dom0"),
565 "free_memory": mem_free,
566 "total_disk": total_disk,
567 "free_disk": free_disk,
568 "total_cpus": get_attr("cpu_total"),
569 "i_pri_memory": i_p_mem,
570 "i_pri_up_memory": i_p_up_mem,
571 }
572 pnr_dyn.update(node_results[nname])
573 node_results[nname] = pnr_dyn
574
575 return node_results
576
577 @staticmethod
579 """Compute global instance data.
580
581 """
582 instance_data = {}
583 for iinfo, beinfo in i_list:
584 nic_data = []
585 for nic in iinfo.nics:
586 filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
587 nic_dict = {
588 "mac": nic.mac,
589 "ip": nic.ip,
590 "mode": filled_params[constants.NIC_MODE],
591 "link": filled_params[constants.NIC_LINK],
592 }
593 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
594 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
595 nic_data.append(nic_dict)
596 pir = {
597 "tags": list(iinfo.GetTags()),
598 "admin_state": iinfo.admin_state,
599 "vcpus": beinfo[constants.BE_VCPUS],
600 "memory": beinfo[constants.BE_MAXMEM],
601 "spindle_use": beinfo[constants.BE_SPINDLE_USE],
602 "os": iinfo.os,
603 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
604 "nics": nic_data,
605 "disks": [{constants.IDISK_SIZE: dsk.size,
606 constants.IDISK_MODE: dsk.mode}
607 for dsk in iinfo.disks],
608 "disk_template": iinfo.disk_template,
609 "disks_active": iinfo.disks_active,
610 "hypervisor": iinfo.hypervisor,
611 }
612 pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
613 pir["disks"])
614 instance_data[iinfo.name] = pir
615
616 return instance_data
617
629
630 - def Run(self, name, validate=True, call_fn=None):
631 """Run an instance allocator and return the results.
632
633 """
634 if call_fn is None:
635 call_fn = self.rpc.call_iallocator_runner
636
637 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
638 result.Raise("Failure while running the iallocator script")
639
640 self.out_text = result.payload
641 if validate:
642 self._ValidateResult()
643
645 """Process the allocator results.
646
647 This will process and if successful save the result in
648 self.out_data and the other parameters.
649
650 """
651 try:
652 rdict = serializer.Load(self.out_text)
653 except Exception, err:
654 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
655
656 if not isinstance(rdict, dict):
657 raise errors.OpExecError("Can't parse iallocator results: not a dict")
658
659
660 if "nodes" in rdict and "result" not in rdict:
661 rdict["result"] = rdict["nodes"]
662 del rdict["nodes"]
663
664 for key in "success", "info", "result":
665 if key not in rdict:
666 raise errors.OpExecError("Can't parse iallocator results:"
667 " missing key '%s'" % key)
668 setattr(self, key, rdict[key])
669
670 self.req.ValidateResult(self, self.result)
671 self.out_data = rdict
672