1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the iallocator code."""
23
24 from ganeti import compat
25 from ganeti import constants
26 from ganeti import errors
27 from ganeti import ht
28 from ganeti import outils
29 from ganeti import opcodes
30 from ganeti import rpc
31 from ganeti import serializer
32 from ganeti import utils
33
34 import ganeti.masterd.instance as gmi
35
36
37 _STRING_LIST = ht.TListOf(ht.TString)
38 _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39
40
41 "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42 opcodes.OpInstanceMigrate.OP_ID,
43 opcodes.OpInstanceReplaceDisks.OP_ID]),
44 })))
45
46 _NEVAC_MOVED = \
47 ht.TListOf(ht.TAnd(ht.TIsLength(3),
48 ht.TItems([ht.TNonEmptyString,
49 ht.TNonEmptyString,
50 ht.TListOf(ht.TNonEmptyString),
51 ])))
52 _NEVAC_FAILED = \
53 ht.TListOf(ht.TAnd(ht.TIsLength(2),
54 ht.TItems([ht.TNonEmptyString,
55 ht.TMaybeString,
56 ])))
57 _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58 ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59
60 _INST_NAME = ("name", ht.TNonEmptyString)
64 """Meta class for request definitions.
65
66 """
67 @classmethod
69 """Extract the slots out of REQ_PARAMS.
70
71 """
72 params = attrs.setdefault("REQ_PARAMS", [])
73 return [slot for (slot, _) in params]
74
77 """A generic IAllocator request object.
78
79 """
80 __metaclass__ = _AutoReqParam
81
82 MODE = NotImplemented
83 REQ_PARAMS = []
84 REQ_RESULT = NotImplemented
85
87 """Constructor for IARequestBase.
88
89 The constructor takes only keyword arguments and will set
90 attributes on this object based on the passed arguments. As such,
91 it means that you should not pass arguments which are not in the
92 REQ_PARAMS attribute for this class.
93
94 """
95 outils.ValidatedSlots.__init__(self, **kwargs)
96
97 self.Validate()
98
116
118 """Gets the request data dict.
119
120 @param cfg: The configuration instance
121
122 """
123 raise NotImplementedError
124
126 """Validates the result of an request.
127
128 @param ia: The IAllocator instance
129 @param result: The IAllocator run result
130 @raises ResultValidationError: If validation fails
131
132 """
133 if ia.success and not self.REQ_RESULT(result):
134 raise errors.ResultValidationError("iallocator returned invalid result,"
135 " expected %s, got %s" %
136 (self.REQ_RESULT, result))
137
140 """An instance allocation request.
141
142 """
143
144 MODE = constants.IALLOCATOR_MODE_ALLOC
145 REQ_PARAMS = [
146 _INST_NAME,
147 ("memory", ht.TNonNegativeInt),
148 ("spindle_use", ht.TNonNegativeInt),
149 ("disks", ht.TListOf(ht.TDict)),
150 ("disk_template", ht.TString),
151 ("os", ht.TString),
152 ("tags", _STRING_LIST),
153 ("nics", ht.TListOf(ht.TDict)),
154 ("vcpus", ht.TInt),
155 ("hypervisor", ht.TString),
156 ("node_whitelist", ht.TMaybeListOf(ht.TNonEmptyString)),
157 ]
158 REQ_RESULT = ht.TList
159
161 """Calculates the required nodes based on the disk_template.
162
163 """
164 if self.disk_template in constants.DTS_INT_MIRROR:
165 return 2
166 else:
167 return 1
168
170 """Requests a new instance.
171
172 The checks for the completeness of the opcode must have already been
173 done.
174
175 """
176 disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
177
178 return {
179 "name": self.name,
180 "disk_template": self.disk_template,
181 "tags": self.tags,
182 "os": self.os,
183 "vcpus": self.vcpus,
184 "memory": self.memory,
185 "spindle_use": self.spindle_use,
186 "disks": self.disks,
187 "disk_space_total": disk_space,
188 "nics": self.nics,
189 "required_nodes": self.RequiredNodes(),
190 "hypervisor": self.hypervisor,
191 }
192
203
227
230 """A relocation request.
231
232 """
233
234 MODE = constants.IALLOCATOR_MODE_RELOC
235 REQ_PARAMS = [
236 _INST_NAME,
237 ("relocate_from", _STRING_LIST),
238 ]
239 REQ_RESULT = ht.TList
240
271
273 """Validates the result of an relocation request.
274
275 """
276 IARequestBase.ValidateResult(self, ia, result)
277
278 node2group = dict((name, ndata["group"])
279 for (name, ndata) in ia.in_data["nodes"].items())
280
281 fn = compat.partial(self._NodesToGroups, node2group,
282 ia.in_data["nodegroups"])
283
284 instance = ia.cfg.GetInstanceInfo(self.name)
285 request_groups = fn(self.relocate_from + [instance.primary_node])
286 result_groups = fn(result + [instance.primary_node])
287
288 if ia.success and not set(result_groups).issubset(request_groups):
289 raise errors.ResultValidationError("Groups of nodes returned by"
290 "iallocator (%s) differ from original"
291 " groups (%s)" %
292 (utils.CommaJoin(result_groups),
293 utils.CommaJoin(request_groups)))
294
295 @staticmethod
297 """Returns a list of unique group names for a list of nodes.
298
299 @type node2group: dict
300 @param node2group: Map from node name to group UUID
301 @type groups: dict
302 @param groups: Group information
303 @type nodes: list
304 @param nodes: Node names
305
306 """
307 result = set()
308
309 for node in nodes:
310 try:
311 group_uuid = node2group[node]
312 except KeyError:
313
314 pass
315 else:
316 try:
317 group = groups[group_uuid]
318 except KeyError:
319
320 group_name = group_uuid
321 else:
322 group_name = group["name"]
323
324 result.add(group_name)
325
326 return sorted(result)
327
349
371
374 """IAllocator framework.
375
376 An IAllocator instance has three sets of attributes:
377 - cfg that is needed to query the cluster
378 - input data (all members of the _KEYS class attribute are required)
379 - four buffer attributes (in|out_data|text), that represent the
380 input (to the external script) in text and data structure format,
381 and the output from it, again in two formats
382 - the result variables from the script (success, info, nodes) for
383 easy usage
384
385 """
386
387
388
389 - def __init__(self, cfg, rpc_runner, req):
390 self.cfg = cfg
391 self.rpc = rpc_runner
392 self.req = req
393
394 self.in_text = self.out_text = self.in_data = self.out_data = None
395
396 self.success = self.info = self.result = None
397
398 self._BuildInputData(req)
399
401 """Compute the generic allocator input data.
402
403 This is the data that is independent of the actual operation.
404
405 """
406 cfg = self.cfg
407 cluster_info = cfg.GetClusterInfo()
408
409 data = {
410 "version": constants.IALLOCATOR_VERSION,
411 "cluster_name": cfg.GetClusterName(),
412 "cluster_tags": list(cluster_info.GetTags()),
413 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
414 "ipolicy": cluster_info.ipolicy,
415 }
416 ninfo = cfg.GetAllNodesInfo()
417 iinfo = cfg.GetAllInstancesInfo().values()
418 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
419
420
421 node_list = [n.name for n in ninfo.values() if n.vm_capable]
422
423 if isinstance(self.req, IAReqInstanceAlloc):
424 hypervisor_name = self.req.hypervisor
425 node_whitelist = self.req.node_whitelist
426 elif isinstance(self.req, IAReqRelocate):
427 hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
428 node_whitelist = None
429 else:
430 hypervisor_name = cluster_info.primary_hypervisor
431 node_whitelist = None
432
433 es_flags = rpc.GetExclusiveStorageForNodeNames(cfg, node_list)
434 vg_name = cfg.GetVGName()
435 if vg_name is not None:
436 has_lvm = True
437 vg_req = [vg_name]
438 else:
439 has_lvm = False
440 vg_req = []
441 node_data = self.rpc.call_node_info(node_list, vg_req,
442 [hypervisor_name], es_flags)
443 node_iinfo = \
444 self.rpc.call_all_instances_info(node_list,
445 cluster_info.enabled_hypervisors)
446
447 data["nodegroups"] = self._ComputeNodeGroupData(cfg)
448
449 config_ndata = self._ComputeBasicNodeData(cfg, ninfo, node_whitelist)
450 data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
451 i_list, config_ndata, has_lvm)
452 assert len(data["nodes"]) == len(ninfo), \
453 "Incomplete node data computed"
454
455 data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
456
457 self.in_data = data
458
459 @staticmethod
461 """Compute node groups data.
462
463 """
464 cluster = cfg.GetClusterInfo()
465 ng = dict((guuid, {
466 "name": gdata.name,
467 "alloc_policy": gdata.alloc_policy,
468 "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
469 "tags": list(gdata.GetTags()),
470 })
471 for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
472
473 return ng
474
475 @staticmethod
477 """Compute global node data.
478
479 @rtype: dict
480 @returns: a dict of name: (node dict, node config)
481
482 """
483
484 node_results = dict((ninfo.name, {
485 "tags": list(ninfo.GetTags()),
486 "primary_ip": ninfo.primary_ip,
487 "secondary_ip": ninfo.secondary_ip,
488 "offline": (ninfo.offline or
489 not (node_whitelist is None or
490 ninfo.name in node_whitelist)),
491 "drained": ninfo.drained,
492 "master_candidate": ninfo.master_candidate,
493 "group": ninfo.group,
494 "master_capable": ninfo.master_capable,
495 "vm_capable": ninfo.vm_capable,
496 "ndparams": cfg.GetNdParams(ninfo),
497 })
498 for ninfo in node_cfg.values())
499
500 return node_results
501
502 @staticmethod
505 """Compute global node data.
506
507 @param node_results: the basic node structures as filled from the config
508
509 """
510
511
512 node_results = dict(node_results)
513 for nname, nresult in node_data.items():
514 assert nname in node_results, "Missing basic data for node %s" % nname
515 ninfo = node_cfg[nname]
516
517 if not (ninfo.offline or ninfo.drained):
518 nresult.Raise("Can't get data for node %s" % nname)
519 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
520 nname)
521 remote_info = rpc.MakeLegacyNodeInfo(nresult.payload,
522 require_vg_info=has_lvm)
523
524 def get_attr(attr):
525 if attr not in remote_info:
526 raise errors.OpExecError("Node '%s' didn't return attribute"
527 " '%s'" % (nname, attr))
528 value = remote_info[attr]
529 if not isinstance(value, int):
530 raise errors.OpExecError("Node '%s' returned invalid value"
531 " for '%s': %s" %
532 (nname, attr, value))
533 return value
534
535 mem_free = get_attr("memory_free")
536
537
538 i_p_mem = i_p_up_mem = 0
539 for iinfo, beinfo in i_list:
540 if iinfo.primary_node == nname:
541 i_p_mem += beinfo[constants.BE_MAXMEM]
542 if iinfo.name not in node_iinfo[nname].payload:
543 i_used_mem = 0
544 else:
545 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
546 i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
547 mem_free -= max(0, i_mem_diff)
548
549 if iinfo.admin_state == constants.ADMINST_UP:
550 i_p_up_mem += beinfo[constants.BE_MAXMEM]
551
552
553 if has_lvm:
554 total_disk = get_attr("vg_size")
555 free_disk = get_attr("vg_free")
556 else:
557
558 total_disk = free_disk = 0
559
560
561 pnr_dyn = {
562 "total_memory": get_attr("memory_total"),
563 "reserved_memory": get_attr("memory_dom0"),
564 "free_memory": mem_free,
565 "total_disk": total_disk,
566 "free_disk": free_disk,
567 "total_cpus": get_attr("cpu_total"),
568 "i_pri_memory": i_p_mem,
569 "i_pri_up_memory": i_p_up_mem,
570 }
571 pnr_dyn.update(node_results[nname])
572 node_results[nname] = pnr_dyn
573
574 return node_results
575
576 @staticmethod
578 """Compute global instance data.
579
580 """
581 instance_data = {}
582 for iinfo, beinfo in i_list:
583 nic_data = []
584 for nic in iinfo.nics:
585 filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
586 nic_dict = {
587 "mac": nic.mac,
588 "ip": nic.ip,
589 "mode": filled_params[constants.NIC_MODE],
590 "link": filled_params[constants.NIC_LINK],
591 }
592 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
593 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
594 nic_data.append(nic_dict)
595 pir = {
596 "tags": list(iinfo.GetTags()),
597 "admin_state": iinfo.admin_state,
598 "vcpus": beinfo[constants.BE_VCPUS],
599 "memory": beinfo[constants.BE_MAXMEM],
600 "spindle_use": beinfo[constants.BE_SPINDLE_USE],
601 "os": iinfo.os,
602 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
603 "nics": nic_data,
604 "disks": [{constants.IDISK_SIZE: dsk.size,
605 constants.IDISK_MODE: dsk.mode}
606 for dsk in iinfo.disks],
607 "disk_template": iinfo.disk_template,
608 "hypervisor": iinfo.hypervisor,
609 }
610 pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
611 pir["disks"])
612 instance_data[iinfo.name] = pir
613
614 return instance_data
615
627
628 - def Run(self, name, validate=True, call_fn=None):
629 """Run an instance allocator and return the results.
630
631 """
632 if call_fn is None:
633 call_fn = self.rpc.call_iallocator_runner
634
635 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
636 result.Raise("Failure while running the iallocator script")
637
638 self.out_text = result.payload
639 if validate:
640 self._ValidateResult()
641
643 """Process the allocator results.
644
645 This will process and if successful save the result in
646 self.out_data and the other parameters.
647
648 """
649 try:
650 rdict = serializer.Load(self.out_text)
651 except Exception, err:
652 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
653
654 if not isinstance(rdict, dict):
655 raise errors.OpExecError("Can't parse iallocator results: not a dict")
656
657
658 if "nodes" in rdict and "result" not in rdict:
659 rdict["result"] = rdict["nodes"]
660 del rdict["nodes"]
661
662 for key in "success", "info", "result":
663 if key not in rdict:
664 raise errors.OpExecError("Can't parse iallocator results:"
665 " missing key '%s'" % key)
666 setattr(self, key, rdict[key])
667
668 self.req.ValidateResult(self, self.result)
669 self.out_data = rdict
670