Package ganeti :: Package masterd :: Module iallocator
[hide private]
[frames] | no frames]

Source Code for Module ganeti.masterd.iallocator

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2012, 2013 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module implementing the iallocator code.""" 
 23   
 24  from ganeti import compat 
 25  from ganeti import constants 
 26  from ganeti import errors 
 27  from ganeti import ht 
 28  from ganeti import outils 
 29  from ganeti import opcodes 
 30  from ganeti import rpc 
 31  from ganeti import serializer 
 32  from ganeti import utils 
 33   
 34  import ganeti.masterd.instance as gmi 
 35   
 36   
 37  _STRING_LIST = ht.TListOf(ht.TString) 
 38  _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, { 
 39     # pylint: disable=E1101 
 40     # Class '...' has no 'OP_ID' member 
 41     "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID, 
 42                          opcodes.OpInstanceMigrate.OP_ID, 
 43                          opcodes.OpInstanceReplaceDisks.OP_ID]), 
 44     }))) 
 45   
 46  _NEVAC_MOVED = \ 
 47    ht.TListOf(ht.TAnd(ht.TIsLength(3), 
 48                       ht.TItems([ht.TNonEmptyString, 
 49                                  ht.TNonEmptyString, 
 50                                  ht.TListOf(ht.TNonEmptyString), 
 51                                  ]))) 
 52  _NEVAC_FAILED = \ 
 53    ht.TListOf(ht.TAnd(ht.TIsLength(2), 
 54                       ht.TItems([ht.TNonEmptyString, 
 55                                  ht.TMaybeString, 
 56                                  ]))) 
 57  _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3), 
 58                          ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST])) 
 59   
 60  _INST_NAME = ("name", ht.TNonEmptyString) 
61 62 63 -class _AutoReqParam(outils.AutoSlots):
64 """Meta class for request definitions. 65 66 """ 67 @classmethod
68 - def _GetSlots(mcs, attrs):
69 """Extract the slots out of REQ_PARAMS. 70 71 """ 72 params = attrs.setdefault("REQ_PARAMS", []) 73 return [slot for (slot, _) in params]
74
75 76 -class IARequestBase(outils.ValidatedSlots):
77 """A generic IAllocator request object. 78 79 """ 80 __metaclass__ = _AutoReqParam 81 82 MODE = NotImplemented 83 REQ_PARAMS = [] 84 REQ_RESULT = NotImplemented 85
86 - def __init__(self, **kwargs):
87 """Constructor for IARequestBase. 88 89 The constructor takes only keyword arguments and will set 90 attributes on this object based on the passed arguments. As such, 91 it means that you should not pass arguments which are not in the 92 REQ_PARAMS attribute for this class. 93 94 """ 95 outils.ValidatedSlots.__init__(self, **kwargs) 96 97 self.Validate()
98
99 - def Validate(self):
100 """Validates all parameters of the request. 101 102 """ 103 assert self.MODE in constants.VALID_IALLOCATOR_MODES 104 105 for (param, validator) in self.REQ_PARAMS: 106 if not hasattr(self, param): 107 raise errors.OpPrereqError("Request is missing '%s' parameter" % param, 108 errors.ECODE_INVAL) 109 110 value = getattr(self, param) 111 if not validator(value): 112 raise errors.OpPrereqError(("Request parameter '%s' has invalid" 113 " type %s/value %s") % 114 (param, type(value), value), 115 errors.ECODE_INVAL)
116
117 - def GetRequest(self, cfg):
118 """Gets the request data dict. 119 120 @param cfg: The configuration instance 121 122 """ 123 raise NotImplementedError
124
125 - def ValidateResult(self, ia, result):
126 """Validates the result of an request. 127 128 @param ia: The IAllocator instance 129 @param result: The IAllocator run result 130 @raises ResultValidationError: If validation fails 131 132 """ 133 if ia.success and not self.REQ_RESULT(result): 134 raise errors.ResultValidationError("iallocator returned invalid result," 135 " expected %s, got %s" % 136 (self.REQ_RESULT, result))
137
138 139 -class IAReqInstanceAlloc(IARequestBase):
140 """An instance allocation request. 141 142 """ 143 # pylint: disable=E1101 144 MODE = constants.IALLOCATOR_MODE_ALLOC 145 REQ_PARAMS = [ 146 _INST_NAME, 147 ("memory", ht.TNonNegativeInt), 148 ("spindle_use", ht.TNonNegativeInt), 149 ("disks", ht.TListOf(ht.TDict)), 150 ("disk_template", ht.TString), 151 ("os", ht.TString), 152 ("tags", _STRING_LIST), 153 ("nics", ht.TListOf(ht.TDict)), 154 ("vcpus", ht.TInt), 155 ("hypervisor", ht.TString), 156 ("node_whitelist", ht.TMaybeListOf(ht.TNonEmptyString)), 157 ] 158 REQ_RESULT = ht.TList 159
160 - def RequiredNodes(self):
161 """Calculates the required nodes based on the disk_template. 162 163 """ 164 if self.disk_template in constants.DTS_INT_MIRROR: 165 return 2 166 else: 167 return 1
168
169 - def GetRequest(self, cfg):
170 """Requests a new instance. 171 172 The checks for the completeness of the opcode must have already been 173 done. 174 175 """ 176 disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks) 177 178 return { 179 "name": self.name, 180 "disk_template": self.disk_template, 181 "tags": self.tags, 182 "os": self.os, 183 "vcpus": self.vcpus, 184 "memory": self.memory, 185 "spindle_use": self.spindle_use, 186 "disks": self.disks, 187 "disk_space_total": disk_space, 188 "nics": self.nics, 189 "required_nodes": self.RequiredNodes(), 190 "hypervisor": self.hypervisor, 191 }
192
193 - def ValidateResult(self, ia, result):
194 """Validates an single instance allocation request. 195 196 """ 197 IARequestBase.ValidateResult(self, ia, result) 198 199 if ia.success and len(result) != self.RequiredNodes(): 200 raise errors.ResultValidationError("iallocator returned invalid number" 201 " of nodes (%s), required %s" % 202 (len(result), self.RequiredNodes()))
203
204 205 -class IAReqMultiInstanceAlloc(IARequestBase):
206 """An multi instance allocation request. 207 208 """ 209 # pylint: disable=E1101 210 MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC 211 REQ_PARAMS = [ 212 ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc))), 213 ] 214 _MASUCCESS = \ 215 ht.TListOf(ht.TAnd(ht.TIsLength(2), 216 ht.TItems([ht.TNonEmptyString, 217 ht.TListOf(ht.TNonEmptyString), 218 ]))) 219 _MAFAILED = ht.TListOf(ht.TNonEmptyString) 220 REQ_RESULT = ht.TAnd(ht.TList, ht.TIsLength(2), 221 ht.TItems([_MASUCCESS, _MAFAILED])) 222
223 - def GetRequest(self, cfg):
224 return { 225 "instances": [iareq.GetRequest(cfg) for iareq in self.instances], 226 }
227
228 229 -class IAReqRelocate(IARequestBase):
230 """A relocation request. 231 232 """ 233 # pylint: disable=E1101 234 MODE = constants.IALLOCATOR_MODE_RELOC 235 REQ_PARAMS = [ 236 _INST_NAME, 237 ("relocate_from", _STRING_LIST), 238 ] 239 REQ_RESULT = ht.TList 240
241 - def GetRequest(self, cfg):
242 """Request an relocation of an instance 243 244 The checks for the completeness of the opcode must have already been 245 done. 246 247 """ 248 instance = cfg.GetInstanceInfo(self.name) 249 if instance is None: 250 raise errors.ProgrammerError("Unknown instance '%s' passed to" 251 " IAllocator" % self.name) 252 253 if instance.disk_template not in constants.DTS_MIRRORED: 254 raise errors.OpPrereqError("Can't relocate non-mirrored instances", 255 errors.ECODE_INVAL) 256 257 if (instance.disk_template in constants.DTS_INT_MIRROR and 258 len(instance.secondary_nodes) != 1): 259 raise errors.OpPrereqError("Instance has not exactly one secondary node", 260 errors.ECODE_STATE) 261 262 disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks] 263 disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes) 264 265 return { 266 "name": self.name, 267 "disk_space_total": disk_space, 268 "required_nodes": 1, 269 "relocate_from": self.relocate_from, 270 }
271
272 - def ValidateResult(self, ia, result):
273 """Validates the result of an relocation request. 274 275 """ 276 IARequestBase.ValidateResult(self, ia, result) 277 278 node2group = dict((name, ndata["group"]) 279 for (name, ndata) in ia.in_data["nodes"].items()) 280 281 fn = compat.partial(self._NodesToGroups, node2group, 282 ia.in_data["nodegroups"]) 283 284 instance = ia.cfg.GetInstanceInfo(self.name) 285 request_groups = fn(self.relocate_from + [instance.primary_node]) 286 result_groups = fn(result + [instance.primary_node]) 287 288 if ia.success and not set(result_groups).issubset(request_groups): 289 raise errors.ResultValidationError("Groups of nodes returned by" 290 "iallocator (%s) differ from original" 291 " groups (%s)" % 292 (utils.CommaJoin(result_groups), 293 utils.CommaJoin(request_groups)))
294 295 @staticmethod
296 - def _NodesToGroups(node2group, groups, nodes):
297 """Returns a list of unique group names for a list of nodes. 298 299 @type node2group: dict 300 @param node2group: Map from node name to group UUID 301 @type groups: dict 302 @param groups: Group information 303 @type nodes: list 304 @param nodes: Node names 305 306 """ 307 result = set() 308 309 for node in nodes: 310 try: 311 group_uuid = node2group[node] 312 except KeyError: 313 # Ignore unknown node 314 pass 315 else: 316 try: 317 group = groups[group_uuid] 318 except KeyError: 319 # Can't find group, let's use UUID 320 group_name = group_uuid 321 else: 322 group_name = group["name"] 323 324 result.add(group_name) 325 326 return sorted(result)
327
328 329 -class IAReqNodeEvac(IARequestBase):
330 """A node evacuation request. 331 332 """ 333 # pylint: disable=E1101 334 MODE = constants.IALLOCATOR_MODE_NODE_EVAC 335 REQ_PARAMS = [ 336 ("instances", _STRING_LIST), 337 ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)), 338 ] 339 REQ_RESULT = _NEVAC_RESULT 340
341 - def GetRequest(self, cfg):
342 """Get data for node-evacuate requests. 343 344 """ 345 return { 346 "instances": self.instances, 347 "evac_mode": self.evac_mode, 348 }
349
350 351 -class IAReqGroupChange(IARequestBase):
352 """A group change request. 353 354 """ 355 # pylint: disable=E1101 356 MODE = constants.IALLOCATOR_MODE_CHG_GROUP 357 REQ_PARAMS = [ 358 ("instances", _STRING_LIST), 359 ("target_groups", _STRING_LIST), 360 ] 361 REQ_RESULT = _NEVAC_RESULT 362
363 - def GetRequest(self, cfg):
364 """Get data for node-evacuate requests. 365 366 """ 367 return { 368 "instances": self.instances, 369 "target_groups": self.target_groups, 370 }
371
372 373 -class IAllocator(object):
374 """IAllocator framework. 375 376 An IAllocator instance has three sets of attributes: 377 - cfg that is needed to query the cluster 378 - input data (all members of the _KEYS class attribute are required) 379 - four buffer attributes (in|out_data|text), that represent the 380 input (to the external script) in text and data structure format, 381 and the output from it, again in two formats 382 - the result variables from the script (success, info, nodes) for 383 easy usage 384 385 """ 386 # pylint: disable=R0902 387 # lots of instance attributes 388
389 - def __init__(self, cfg, rpc_runner, req):
390 self.cfg = cfg 391 self.rpc = rpc_runner 392 self.req = req 393 # init buffer variables 394 self.in_text = self.out_text = self.in_data = self.out_data = None 395 # init result fields 396 self.success = self.info = self.result = None 397 398 self._BuildInputData(req)
399
400 - def _ComputeClusterData(self):
401 """Compute the generic allocator input data. 402 403 This is the data that is independent of the actual operation. 404 405 """ 406 cfg = self.cfg 407 cluster_info = cfg.GetClusterInfo() 408 # cluster data 409 data = { 410 "version": constants.IALLOCATOR_VERSION, 411 "cluster_name": cfg.GetClusterName(), 412 "cluster_tags": list(cluster_info.GetTags()), 413 "enabled_hypervisors": list(cluster_info.enabled_hypervisors), 414 "ipolicy": cluster_info.ipolicy, 415 } 416 ninfo = cfg.GetAllNodesInfo() 417 iinfo = cfg.GetAllInstancesInfo().values() 418 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo] 419 420 # node data 421 node_list = [n.name for n in ninfo.values() if n.vm_capable] 422 423 if isinstance(self.req, IAReqInstanceAlloc): 424 hypervisor_name = self.req.hypervisor 425 node_whitelist = self.req.node_whitelist 426 elif isinstance(self.req, IAReqRelocate): 427 hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor 428 node_whitelist = None 429 else: 430 hypervisor_name = cluster_info.primary_hypervisor 431 node_whitelist = None 432 433 es_flags = rpc.GetExclusiveStorageForNodeNames(cfg, node_list) 434 vg_name = cfg.GetVGName() 435 if vg_name is not None: 436 has_lvm = True 437 vg_req = [vg_name] 438 else: 439 has_lvm = False 440 vg_req = [] 441 node_data = self.rpc.call_node_info(node_list, vg_req, 442 [hypervisor_name], es_flags) 443 node_iinfo = \ 444 self.rpc.call_all_instances_info(node_list, 445 cluster_info.enabled_hypervisors) 446 447 data["nodegroups"] = self._ComputeNodeGroupData(cfg) 448 449 config_ndata = self._ComputeBasicNodeData(cfg, ninfo, node_whitelist) 450 data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo, 451 i_list, config_ndata, has_lvm) 452 assert len(data["nodes"]) == len(ninfo), \ 453 "Incomplete node data computed" 454 455 data["instances"] = self._ComputeInstanceData(cluster_info, i_list) 456 457 self.in_data = data
458 459 @staticmethod
460 - def _ComputeNodeGroupData(cfg):
461 """Compute node groups data. 462 463 """ 464 cluster = cfg.GetClusterInfo() 465 ng = dict((guuid, { 466 "name": gdata.name, 467 "alloc_policy": gdata.alloc_policy, 468 "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata), 469 "tags": list(gdata.GetTags()), 470 }) 471 for guuid, gdata in cfg.GetAllNodeGroupsInfo().items()) 472 473 return ng
474 475 @staticmethod
476 - def _ComputeBasicNodeData(cfg, node_cfg, node_whitelist):
477 """Compute global node data. 478 479 @rtype: dict 480 @returns: a dict of name: (node dict, node config) 481 482 """ 483 # fill in static (config-based) values 484 node_results = dict((ninfo.name, { 485 "tags": list(ninfo.GetTags()), 486 "primary_ip": ninfo.primary_ip, 487 "secondary_ip": ninfo.secondary_ip, 488 "offline": (ninfo.offline or 489 not (node_whitelist is None or 490 ninfo.name in node_whitelist)), 491 "drained": ninfo.drained, 492 "master_candidate": ninfo.master_candidate, 493 "group": ninfo.group, 494 "master_capable": ninfo.master_capable, 495 "vm_capable": ninfo.vm_capable, 496 "ndparams": cfg.GetNdParams(ninfo), 497 }) 498 for ninfo in node_cfg.values()) 499 500 return node_results
501 502 @staticmethod
503 - def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list, 504 node_results, has_lvm):
505 """Compute global node data. 506 507 @param node_results: the basic node structures as filled from the config 508 509 """ 510 #TODO(dynmem): compute the right data on MAX and MIN memory 511 # make a copy of the current dict 512 node_results = dict(node_results) 513 for nname, nresult in node_data.items(): 514 assert nname in node_results, "Missing basic data for node %s" % nname 515 ninfo = node_cfg[nname] 516 517 if not (ninfo.offline or ninfo.drained): 518 nresult.Raise("Can't get data for node %s" % nname) 519 node_iinfo[nname].Raise("Can't get node instance info from node %s" % 520 nname) 521 remote_info = rpc.MakeLegacyNodeInfo(nresult.payload, 522 require_vg_info=has_lvm) 523 524 def get_attr(attr): 525 if attr not in remote_info: 526 raise errors.OpExecError("Node '%s' didn't return attribute" 527 " '%s'" % (nname, attr)) 528 value = remote_info[attr] 529 if not isinstance(value, int): 530 raise errors.OpExecError("Node '%s' returned invalid value" 531 " for '%s': %s" % 532 (nname, attr, value)) 533 return value
534 535 mem_free = get_attr("memory_free") 536 537 # compute memory used by primary instances 538 i_p_mem = i_p_up_mem = 0 539 for iinfo, beinfo in i_list: 540 if iinfo.primary_node == nname: 541 i_p_mem += beinfo[constants.BE_MAXMEM] 542 if iinfo.name not in node_iinfo[nname].payload: 543 i_used_mem = 0 544 else: 545 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"]) 546 i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem 547 mem_free -= max(0, i_mem_diff) 548 549 if iinfo.admin_state == constants.ADMINST_UP: 550 i_p_up_mem += beinfo[constants.BE_MAXMEM] 551 552 # TODO: replace this with proper storage reporting 553 if has_lvm: 554 total_disk = get_attr("vg_size") 555 free_disk = get_attr("vg_free") 556 else: 557 # we didn't even ask the node for VG status, so use zeros 558 total_disk = free_disk = 0 559 560 # compute memory used by instances 561 pnr_dyn = { 562 "total_memory": get_attr("memory_total"), 563 "reserved_memory": get_attr("memory_dom0"), 564 "free_memory": mem_free, 565 "total_disk": total_disk, 566 "free_disk": free_disk, 567 "total_cpus": get_attr("cpu_total"), 568 "i_pri_memory": i_p_mem, 569 "i_pri_up_memory": i_p_up_mem, 570 } 571 pnr_dyn.update(node_results[nname]) 572 node_results[nname] = pnr_dyn 573 574 return node_results
575 576 @staticmethod
577 - def _ComputeInstanceData(cluster_info, i_list):
578 """Compute global instance data. 579 580 """ 581 instance_data = {} 582 for iinfo, beinfo in i_list: 583 nic_data = [] 584 for nic in iinfo.nics: 585 filled_params = cluster_info.SimpleFillNIC(nic.nicparams) 586 nic_dict = { 587 "mac": nic.mac, 588 "ip": nic.ip, 589 "mode": filled_params[constants.NIC_MODE], 590 "link": filled_params[constants.NIC_LINK], 591 } 592 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: 593 nic_dict["bridge"] = filled_params[constants.NIC_LINK] 594 nic_data.append(nic_dict) 595 pir = { 596 "tags": list(iinfo.GetTags()), 597 "admin_state": iinfo.admin_state, 598 "vcpus": beinfo[constants.BE_VCPUS], 599 "memory": beinfo[constants.BE_MAXMEM], 600 "spindle_use": beinfo[constants.BE_SPINDLE_USE], 601 "os": iinfo.os, 602 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), 603 "nics": nic_data, 604 "disks": [{constants.IDISK_SIZE: dsk.size, 605 constants.IDISK_MODE: dsk.mode} 606 for dsk in iinfo.disks], 607 "disk_template": iinfo.disk_template, 608 "hypervisor": iinfo.hypervisor, 609 } 610 pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template, 611 pir["disks"]) 612 instance_data[iinfo.name] = pir 613 614 return instance_data
615
616 - def _BuildInputData(self, req):
617 """Build input data structures. 618 619 """ 620 self._ComputeClusterData() 621 622 request = req.GetRequest(self.cfg) 623 request["type"] = req.MODE 624 self.in_data["request"] = request 625 626 self.in_text = serializer.Dump(self.in_data)
627
628 - def Run(self, name, validate=True, call_fn=None):
629 """Run an instance allocator and return the results. 630 631 """ 632 if call_fn is None: 633 call_fn = self.rpc.call_iallocator_runner 634 635 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text) 636 result.Raise("Failure while running the iallocator script") 637 638 self.out_text = result.payload 639 if validate: 640 self._ValidateResult()
641
642 - def _ValidateResult(self):
643 """Process the allocator results. 644 645 This will process and if successful save the result in 646 self.out_data and the other parameters. 647 648 """ 649 try: 650 rdict = serializer.Load(self.out_text) 651 except Exception, err: 652 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err)) 653 654 if not isinstance(rdict, dict): 655 raise errors.OpExecError("Can't parse iallocator results: not a dict") 656 657 # TODO: remove backwards compatiblity in later versions 658 if "nodes" in rdict and "result" not in rdict: 659 rdict["result"] = rdict["nodes"] 660 del rdict["nodes"] 661 662 for key in "success", "info", "result": 663 if key not in rdict: 664 raise errors.OpExecError("Can't parse iallocator results:" 665 " missing key '%s'" % key) 666 setattr(self, key, rdict[key]) 667 668 self.req.ValidateResult(self, self.result) 669 self.out_data = rdict
670