Package ganeti :: Module config
[hide private]
[frames] | no frames]

Source Code for Module ganeti.config

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Configuration management for Ganeti 
  23   
  24  This module provides the interface to the Ganeti cluster configuration. 
  25   
  26  The configuration data is stored on every node but is updated on the master 
  27  only. After each update, the master distributes the data to the other nodes. 
  28   
  29  Currently, the data storage format is JSON. YAML was slow and consuming too 
  30  much memory. 
  31   
  32  """ 
  33   
  34  # pylint: disable-msg=R0904 
  35  # R0904: Too many public methods 
  36   
  37  import os 
  38  import random 
  39  import logging 
  40  import time 
  41   
  42  from ganeti import errors 
  43  from ganeti import locking 
  44  from ganeti import utils 
  45  from ganeti import constants 
  46  from ganeti import rpc 
  47  from ganeti import objects 
  48  from ganeti import serializer 
  49  from ganeti import uidpool 
  50  from ganeti import netutils 
  51  from ganeti import runtime 
  52   
  53   
  54  _config_lock = locking.SharedLock("ConfigWriter") 
  55   
  56  # job id used for resource management at config upgrade time 
  57  _UPGRADE_CONFIG_JID = "jid-cfg-upgrade" 
58 59 60 -def _ValidateConfig(data):
61 """Verifies that a configuration objects looks valid. 62 63 This only verifies the version of the configuration. 64 65 @raise errors.ConfigurationError: if the version differs from what 66 we expect 67 68 """ 69 if data.version != constants.CONFIG_VERSION: 70 raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
71
72 73 -class TemporaryReservationManager:
74 """A temporary resource reservation manager. 75 76 This is used to reserve resources in a job, before using them, making sure 77 other jobs cannot get them in the meantime. 78 79 """
80 - def __init__(self):
81 self._ec_reserved = {}
82
83 - def Reserved(self, resource):
84 for holder_reserved in self._ec_reserved.values(): 85 if resource in holder_reserved: 86 return True 87 return False
88
89 - def Reserve(self, ec_id, resource):
90 if self.Reserved(resource): 91 raise errors.ReservationError("Duplicate reservation for resource '%s'" 92 % str(resource)) 93 if ec_id not in self._ec_reserved: 94 self._ec_reserved[ec_id] = set([resource]) 95 else: 96 self._ec_reserved[ec_id].add(resource)
97
98 - def DropECReservations(self, ec_id):
99 if ec_id in self._ec_reserved: 100 del self._ec_reserved[ec_id]
101
102 - def GetReserved(self):
103 all_reserved = set() 104 for holder_reserved in self._ec_reserved.values(): 105 all_reserved.update(holder_reserved) 106 return all_reserved
107
108 - def Generate(self, existing, generate_one_fn, ec_id):
109 """Generate a new resource of this type 110 111 """ 112 assert callable(generate_one_fn) 113 114 all_elems = self.GetReserved() 115 all_elems.update(existing) 116 retries = 64 117 while retries > 0: 118 new_resource = generate_one_fn() 119 if new_resource is not None and new_resource not in all_elems: 120 break 121 else: 122 raise errors.ConfigurationError("Not able generate new resource" 123 " (last tried: %s)" % new_resource) 124 self.Reserve(ec_id, new_resource) 125 return new_resource
126
127 128 -class ConfigWriter:
129 """The interface to the cluster configuration. 130 131 @ivar _temporary_lvs: reservation manager for temporary LVs 132 @ivar _all_rms: a list of all temporary reservation managers 133 134 """
135 - def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts, 136 accept_foreign=False):
137 self.write_count = 0 138 self._lock = _config_lock 139 self._config_data = None 140 self._offline = offline 141 if cfg_file is None: 142 self._cfg_file = constants.CLUSTER_CONF_FILE 143 else: 144 self._cfg_file = cfg_file 145 self._getents = _getents 146 self._temporary_ids = TemporaryReservationManager() 147 self._temporary_drbds = {} 148 self._temporary_macs = TemporaryReservationManager() 149 self._temporary_secrets = TemporaryReservationManager() 150 self._temporary_lvs = TemporaryReservationManager() 151 self._all_rms = [self._temporary_ids, self._temporary_macs, 152 self._temporary_secrets, self._temporary_lvs] 153 # Note: in order to prevent errors when resolving our name in 154 # _DistributeConfig, we compute it here once and reuse it; it's 155 # better to raise an error before starting to modify the config 156 # file than after it was modified 157 self._my_hostname = netutils.Hostname.GetSysName() 158 self._last_cluster_serial = -1 159 self._cfg_id = None 160 self._OpenConfig(accept_foreign)
161 162 # this method needs to be static, so that we can call it on the class 163 @staticmethod
164 - def IsCluster():
165 """Check if the cluster is configured. 166 167 """ 168 return os.path.exists(constants.CLUSTER_CONF_FILE)
169
170 - def _GenerateOneMAC(self):
171 """Generate one mac address 172 173 """ 174 prefix = self._config_data.cluster.mac_prefix 175 byte1 = random.randrange(0, 256) 176 byte2 = random.randrange(0, 256) 177 byte3 = random.randrange(0, 256) 178 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3) 179 return mac
180 181 @locking.ssynchronized(_config_lock, shared=1)
182 - def GetNdParams(self, node):
183 """Get the node params populated with cluster defaults. 184 185 @type node: L{object.Node} 186 @param node: The node we want to know the params for 187 @return: A dict with the filled in node params 188 189 """ 190 nodegroup = self._UnlockedGetNodeGroup(node.group) 191 return self._config_data.cluster.FillND(node, nodegroup)
192 193 @locking.ssynchronized(_config_lock, shared=1)
194 - def GenerateMAC(self, ec_id):
195 """Generate a MAC for an instance. 196 197 This should check the current instances for duplicates. 198 199 """ 200 existing = self._AllMACs() 201 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
202 203 @locking.ssynchronized(_config_lock, shared=1)
204 - def ReserveMAC(self, mac, ec_id):
205 """Reserve a MAC for an instance. 206 207 This only checks instances managed by this cluster, it does not 208 check for potential collisions elsewhere. 209 210 """ 211 all_macs = self._AllMACs() 212 if mac in all_macs: 213 raise errors.ReservationError("mac already in use") 214 else: 215 self._temporary_macs.Reserve(ec_id, mac)
216 217 @locking.ssynchronized(_config_lock, shared=1)
218 - def ReserveLV(self, lv_name, ec_id):
219 """Reserve an VG/LV pair for an instance. 220 221 @type lv_name: string 222 @param lv_name: the logical volume name to reserve 223 224 """ 225 all_lvs = self._AllLVs() 226 if lv_name in all_lvs: 227 raise errors.ReservationError("LV already in use") 228 else: 229 self._temporary_lvs.Reserve(ec_id, lv_name)
230 231 @locking.ssynchronized(_config_lock, shared=1)
232 - def GenerateDRBDSecret(self, ec_id):
233 """Generate a DRBD secret. 234 235 This checks the current disks for duplicates. 236 237 """ 238 return self._temporary_secrets.Generate(self._AllDRBDSecrets(), 239 utils.GenerateSecret, 240 ec_id)
241
242 - def _AllLVs(self):
243 """Compute the list of all LVs. 244 245 """ 246 lvnames = set() 247 for instance in self._config_data.instances.values(): 248 node_data = instance.MapLVsByNode() 249 for lv_list in node_data.values(): 250 lvnames.update(lv_list) 251 return lvnames
252
253 - def _AllIDs(self, include_temporary):
254 """Compute the list of all UUIDs and names we have. 255 256 @type include_temporary: boolean 257 @param include_temporary: whether to include the _temporary_ids set 258 @rtype: set 259 @return: a set of IDs 260 261 """ 262 existing = set() 263 if include_temporary: 264 existing.update(self._temporary_ids.GetReserved()) 265 existing.update(self._AllLVs()) 266 existing.update(self._config_data.instances.keys()) 267 existing.update(self._config_data.nodes.keys()) 268 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid]) 269 return existing
270
271 - def _GenerateUniqueID(self, ec_id):
272 """Generate an unique UUID. 273 274 This checks the current node, instances and disk names for 275 duplicates. 276 277 @rtype: string 278 @return: the unique id 279 280 """ 281 existing = self._AllIDs(include_temporary=False) 282 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
283 284 @locking.ssynchronized(_config_lock, shared=1)
285 - def GenerateUniqueID(self, ec_id):
286 """Generate an unique ID. 287 288 This is just a wrapper over the unlocked version. 289 290 @type ec_id: string 291 @param ec_id: unique id for the job to reserve the id to 292 293 """ 294 return self._GenerateUniqueID(ec_id)
295
296 - def _AllMACs(self):
297 """Return all MACs present in the config. 298 299 @rtype: list 300 @return: the list of all MACs 301 302 """ 303 result = [] 304 for instance in self._config_data.instances.values(): 305 for nic in instance.nics: 306 result.append(nic.mac) 307 308 return result
309
310 - def _AllDRBDSecrets(self):
311 """Return all DRBD secrets present in the config. 312 313 @rtype: list 314 @return: the list of all DRBD secrets 315 316 """ 317 def helper(disk, result): 318 """Recursively gather secrets from this disk.""" 319 if disk.dev_type == constants.DT_DRBD8: 320 result.append(disk.logical_id[5]) 321 if disk.children: 322 for child in disk.children: 323 helper(child, result)
324 325 result = [] 326 for instance in self._config_data.instances.values(): 327 for disk in instance.disks: 328 helper(disk, result) 329 330 return result
331
332 - def _CheckDiskIDs(self, disk, l_ids, p_ids):
333 """Compute duplicate disk IDs 334 335 @type disk: L{objects.Disk} 336 @param disk: the disk at which to start searching 337 @type l_ids: list 338 @param l_ids: list of current logical ids 339 @type p_ids: list 340 @param p_ids: list of current physical ids 341 @rtype: list 342 @return: a list of error messages 343 344 """ 345 result = [] 346 if disk.logical_id is not None: 347 if disk.logical_id in l_ids: 348 result.append("duplicate logical id %s" % str(disk.logical_id)) 349 else: 350 l_ids.append(disk.logical_id) 351 if disk.physical_id is not None: 352 if disk.physical_id in p_ids: 353 result.append("duplicate physical id %s" % str(disk.physical_id)) 354 else: 355 p_ids.append(disk.physical_id) 356 357 if disk.children: 358 for child in disk.children: 359 result.extend(self._CheckDiskIDs(child, l_ids, p_ids)) 360 return result
361
362 - def _UnlockedVerifyConfig(self):
363 """Verify function. 364 365 @rtype: list 366 @return: a list of error messages; a non-empty list signifies 367 configuration errors 368 369 """ 370 # pylint: disable-msg=R0914 371 result = [] 372 seen_macs = [] 373 ports = {} 374 data = self._config_data 375 cluster = data.cluster 376 seen_lids = [] 377 seen_pids = [] 378 379 # global cluster checks 380 if not cluster.enabled_hypervisors: 381 result.append("enabled hypervisors list doesn't have any entries") 382 invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES 383 if invalid_hvs: 384 result.append("enabled hypervisors contains invalid entries: %s" % 385 invalid_hvs) 386 missing_hvp = (set(cluster.enabled_hypervisors) - 387 set(cluster.hvparams.keys())) 388 if missing_hvp: 389 result.append("hypervisor parameters missing for the enabled" 390 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp)) 391 392 if cluster.master_node not in data.nodes: 393 result.append("cluster has invalid primary node '%s'" % 394 cluster.master_node) 395 396 def _helper(owner, attr, value, template): 397 try: 398 utils.ForceDictType(value, template) 399 except errors.GenericError, err: 400 result.append("%s has invalid %s: %s" % (owner, attr, err))
401 402 def _helper_nic(owner, params): 403 try: 404 objects.NIC.CheckParameterSyntax(params) 405 except errors.ConfigurationError, err: 406 result.append("%s has invalid nicparams: %s" % (owner, err)) 407 408 # check cluster parameters 409 _helper("cluster", "beparams", cluster.SimpleFillBE({}), 410 constants.BES_PARAMETER_TYPES) 411 _helper("cluster", "nicparams", cluster.SimpleFillNIC({}), 412 constants.NICS_PARAMETER_TYPES) 413 _helper_nic("cluster", cluster.SimpleFillNIC({})) 414 _helper("cluster", "ndparams", cluster.SimpleFillND({}), 415 constants.NDS_PARAMETER_TYPES) 416 417 # per-instance checks 418 for instance_name in data.instances: 419 instance = data.instances[instance_name] 420 if instance.name != instance_name: 421 result.append("instance '%s' is indexed by wrong name '%s'" % 422 (instance.name, instance_name)) 423 if instance.primary_node not in data.nodes: 424 result.append("instance '%s' has invalid primary node '%s'" % 425 (instance_name, instance.primary_node)) 426 for snode in instance.secondary_nodes: 427 if snode not in data.nodes: 428 result.append("instance '%s' has invalid secondary node '%s'" % 429 (instance_name, snode)) 430 for idx, nic in enumerate(instance.nics): 431 if nic.mac in seen_macs: 432 result.append("instance '%s' has NIC %d mac %s duplicate" % 433 (instance_name, idx, nic.mac)) 434 else: 435 seen_macs.append(nic.mac) 436 if nic.nicparams: 437 filled = cluster.SimpleFillNIC(nic.nicparams) 438 owner = "instance %s nic %d" % (instance.name, idx) 439 _helper(owner, "nicparams", 440 filled, constants.NICS_PARAMETER_TYPES) 441 _helper_nic(owner, filled) 442 443 # parameter checks 444 if instance.beparams: 445 _helper("instance %s" % instance.name, "beparams", 446 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES) 447 448 # gather the drbd ports for duplicate checks 449 for dsk in instance.disks: 450 if dsk.dev_type in constants.LDS_DRBD: 451 tcp_port = dsk.logical_id[2] 452 if tcp_port not in ports: 453 ports[tcp_port] = [] 454 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name)) 455 # gather network port reservation 456 net_port = getattr(instance, "network_port", None) 457 if net_port is not None: 458 if net_port not in ports: 459 ports[net_port] = [] 460 ports[net_port].append((instance.name, "network port")) 461 462 # instance disk verify 463 for idx, disk in enumerate(instance.disks): 464 result.extend(["instance '%s' disk %d error: %s" % 465 (instance.name, idx, msg) for msg in disk.Verify()]) 466 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids)) 467 468 # cluster-wide pool of free ports 469 for free_port in cluster.tcpudp_port_pool: 470 if free_port not in ports: 471 ports[free_port] = [] 472 ports[free_port].append(("cluster", "port marked as free")) 473 474 # compute tcp/udp duplicate ports 475 keys = ports.keys() 476 keys.sort() 477 for pnum in keys: 478 pdata = ports[pnum] 479 if len(pdata) > 1: 480 txt = utils.CommaJoin(["%s/%s" % val for val in pdata]) 481 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt)) 482 483 # highest used tcp port check 484 if keys: 485 if keys[-1] > cluster.highest_used_port: 486 result.append("Highest used port mismatch, saved %s, computed %s" % 487 (cluster.highest_used_port, keys[-1])) 488 489 if not data.nodes[cluster.master_node].master_candidate: 490 result.append("Master node is not a master candidate") 491 492 # master candidate checks 493 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats() 494 if mc_now < mc_max: 495 result.append("Not enough master candidates: actual %d, target %d" % 496 (mc_now, mc_max)) 497 498 # node checks 499 for node_name, node in data.nodes.items(): 500 if node.name != node_name: 501 result.append("Node '%s' is indexed by wrong name '%s'" % 502 (node.name, node_name)) 503 if [node.master_candidate, node.drained, node.offline].count(True) > 1: 504 result.append("Node %s state is invalid: master_candidate=%s," 505 " drain=%s, offline=%s" % 506 (node.name, node.master_candidate, node.drained, 507 node.offline)) 508 if node.group not in data.nodegroups: 509 result.append("Node '%s' has invalid group '%s'" % 510 (node.name, node.group)) 511 else: 512 _helper("node %s" % node.name, "ndparams", 513 cluster.FillND(node, data.nodegroups[node.group]), 514 constants.NDS_PARAMETER_TYPES) 515 516 # nodegroups checks 517 nodegroups_names = set() 518 for nodegroup_uuid in data.nodegroups: 519 nodegroup = data.nodegroups[nodegroup_uuid] 520 if nodegroup.uuid != nodegroup_uuid: 521 result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'" 522 % (nodegroup.name, nodegroup.uuid, nodegroup_uuid)) 523 if utils.UUID_RE.match(nodegroup.name.lower()): 524 result.append("node group '%s' (uuid: '%s') has uuid-like name" % 525 (nodegroup.name, nodegroup.uuid)) 526 if nodegroup.name in nodegroups_names: 527 result.append("duplicate node group name '%s'" % nodegroup.name) 528 else: 529 nodegroups_names.add(nodegroup.name) 530 if nodegroup.ndparams: 531 _helper("group %s" % nodegroup.name, "ndparams", 532 cluster.SimpleFillND(nodegroup.ndparams), 533 constants.NDS_PARAMETER_TYPES) 534 535 536 # drbd minors check 537 _, duplicates = self._UnlockedComputeDRBDMap() 538 for node, minor, instance_a, instance_b in duplicates: 539 result.append("DRBD minor %d on node %s is assigned twice to instances" 540 " %s and %s" % (minor, node, instance_a, instance_b)) 541 542 # IP checks 543 default_nicparams = cluster.nicparams[constants.PP_DEFAULT] 544 ips = {} 545 546 def _AddIpAddress(ip, name): 547 ips.setdefault(ip, []).append(name) 548 549 _AddIpAddress(cluster.master_ip, "cluster_ip") 550 551 for node in data.nodes.values(): 552 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name) 553 if node.secondary_ip != node.primary_ip: 554 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name) 555 556 for instance in data.instances.values(): 557 for idx, nic in enumerate(instance.nics): 558 if nic.ip is None: 559 continue 560 561 nicparams = objects.FillDict(default_nicparams, nic.nicparams) 562 nic_mode = nicparams[constants.NIC_MODE] 563 nic_link = nicparams[constants.NIC_LINK] 564 565 if nic_mode == constants.NIC_MODE_BRIDGED: 566 link = "bridge:%s" % nic_link 567 elif nic_mode == constants.NIC_MODE_ROUTED: 568 link = "route:%s" % nic_link 569 else: 570 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode) 571 572 _AddIpAddress("%s/%s" % (link, nic.ip), 573 "instance:%s/nic:%d" % (instance.name, idx)) 574 575 for ip, owners in ips.items(): 576 if len(owners) > 1: 577 result.append("IP address %s is used by multiple owners: %s" % 578 (ip, utils.CommaJoin(owners))) 579 580 return result 581 582 @locking.ssynchronized(_config_lock, shared=1)
583 - def VerifyConfig(self):
584 """Verify function. 585 586 This is just a wrapper over L{_UnlockedVerifyConfig}. 587 588 @rtype: list 589 @return: a list of error messages; a non-empty list signifies 590 configuration errors 591 592 """ 593 return self._UnlockedVerifyConfig()
594
595 - def _UnlockedSetDiskID(self, disk, node_name):
596 """Convert the unique ID to the ID needed on the target nodes. 597 598 This is used only for drbd, which needs ip/port configuration. 599 600 The routine descends down and updates its children also, because 601 this helps when the only the top device is passed to the remote 602 node. 603 604 This function is for internal use, when the config lock is already held. 605 606 """ 607 if disk.children: 608 for child in disk.children: 609 self._UnlockedSetDiskID(child, node_name) 610 611 if disk.logical_id is None and disk.physical_id is not None: 612 return 613 if disk.dev_type == constants.LD_DRBD8: 614 pnode, snode, port, pminor, sminor, secret = disk.logical_id 615 if node_name not in (pnode, snode): 616 raise errors.ConfigurationError("DRBD device not knowing node %s" % 617 node_name) 618 pnode_info = self._UnlockedGetNodeInfo(pnode) 619 snode_info = self._UnlockedGetNodeInfo(snode) 620 if pnode_info is None or snode_info is None: 621 raise errors.ConfigurationError("Can't find primary or secondary node" 622 " for %s" % str(disk)) 623 p_data = (pnode_info.secondary_ip, port) 624 s_data = (snode_info.secondary_ip, port) 625 if pnode == node_name: 626 disk.physical_id = p_data + s_data + (pminor, secret) 627 else: # it must be secondary, we tested above 628 disk.physical_id = s_data + p_data + (sminor, secret) 629 else: 630 disk.physical_id = disk.logical_id 631 return
632 633 @locking.ssynchronized(_config_lock)
634 - def SetDiskID(self, disk, node_name):
635 """Convert the unique ID to the ID needed on the target nodes. 636 637 This is used only for drbd, which needs ip/port configuration. 638 639 The routine descends down and updates its children also, because 640 this helps when the only the top device is passed to the remote 641 node. 642 643 """ 644 return self._UnlockedSetDiskID(disk, node_name)
645 646 @locking.ssynchronized(_config_lock)
647 - def AddTcpUdpPort(self, port):
648 """Adds a new port to the available port pool. 649 650 """ 651 if not isinstance(port, int): 652 raise errors.ProgrammerError("Invalid type passed for port") 653 654 self._config_data.cluster.tcpudp_port_pool.add(port) 655 self._WriteConfig()
656 657 @locking.ssynchronized(_config_lock, shared=1)
658 - def GetPortList(self):
659 """Returns a copy of the current port list. 660 661 """ 662 return self._config_data.cluster.tcpudp_port_pool.copy()
663 664 @locking.ssynchronized(_config_lock)
665 - def AllocatePort(self):
666 """Allocate a port. 667 668 The port will be taken from the available port pool or from the 669 default port range (and in this case we increase 670 highest_used_port). 671 672 """ 673 # If there are TCP/IP ports configured, we use them first. 674 if self._config_data.cluster.tcpudp_port_pool: 675 port = self._config_data.cluster.tcpudp_port_pool.pop() 676 else: 677 port = self._config_data.cluster.highest_used_port + 1 678 if port >= constants.LAST_DRBD_PORT: 679 raise errors.ConfigurationError("The highest used port is greater" 680 " than %s. Aborting." % 681 constants.LAST_DRBD_PORT) 682 self._config_data.cluster.highest_used_port = port 683 684 self._WriteConfig() 685 return port
686
687 - def _UnlockedComputeDRBDMap(self):
688 """Compute the used DRBD minor/nodes. 689 690 @rtype: (dict, list) 691 @return: dictionary of node_name: dict of minor: instance_name; 692 the returned dict will have all the nodes in it (even if with 693 an empty list), and a list of duplicates; if the duplicates 694 list is not empty, the configuration is corrupted and its caller 695 should raise an exception 696 697 """ 698 def _AppendUsedPorts(instance_name, disk, used): 699 duplicates = [] 700 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5: 701 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5] 702 for node, port in ((node_a, minor_a), (node_b, minor_b)): 703 assert node in used, ("Node '%s' of instance '%s' not found" 704 " in node list" % (node, instance_name)) 705 if port in used[node]: 706 duplicates.append((node, port, instance_name, used[node][port])) 707 else: 708 used[node][port] = instance_name 709 if disk.children: 710 for child in disk.children: 711 duplicates.extend(_AppendUsedPorts(instance_name, child, used)) 712 return duplicates
713 714 duplicates = [] 715 my_dict = dict((node, {}) for node in self._config_data.nodes) 716 for instance in self._config_data.instances.itervalues(): 717 for disk in instance.disks: 718 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict)) 719 for (node, minor), instance in self._temporary_drbds.iteritems(): 720 if minor in my_dict[node] and my_dict[node][minor] != instance: 721 duplicates.append((node, minor, instance, my_dict[node][minor])) 722 else: 723 my_dict[node][minor] = instance 724 return my_dict, duplicates 725 726 @locking.ssynchronized(_config_lock)
727 - def ComputeDRBDMap(self):
728 """Compute the used DRBD minor/nodes. 729 730 This is just a wrapper over L{_UnlockedComputeDRBDMap}. 731 732 @return: dictionary of node_name: dict of minor: instance_name; 733 the returned dict will have all the nodes in it (even if with 734 an empty list). 735 736 """ 737 d_map, duplicates = self._UnlockedComputeDRBDMap() 738 if duplicates: 739 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 740 str(duplicates)) 741 return d_map
742 743 @locking.ssynchronized(_config_lock)
744 - def AllocateDRBDMinor(self, nodes, instance):
745 """Allocate a drbd minor. 746 747 The free minor will be automatically computed from the existing 748 devices. A node can be given multiple times in order to allocate 749 multiple minors. The result is the list of minors, in the same 750 order as the passed nodes. 751 752 @type instance: string 753 @param instance: the instance for which we allocate minors 754 755 """ 756 assert isinstance(instance, basestring), \ 757 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance 758 759 d_map, duplicates = self._UnlockedComputeDRBDMap() 760 if duplicates: 761 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 762 str(duplicates)) 763 result = [] 764 for nname in nodes: 765 ndata = d_map[nname] 766 if not ndata: 767 # no minors used, we can start at 0 768 result.append(0) 769 ndata[0] = instance 770 self._temporary_drbds[(nname, 0)] = instance 771 continue 772 keys = ndata.keys() 773 keys.sort() 774 ffree = utils.FirstFree(keys) 775 if ffree is None: 776 # return the next minor 777 # TODO: implement high-limit check 778 minor = keys[-1] + 1 779 else: 780 minor = ffree 781 # double-check minor against current instances 782 assert minor not in d_map[nname], \ 783 ("Attempt to reuse allocated DRBD minor %d on node %s," 784 " already allocated to instance %s" % 785 (minor, nname, d_map[nname][minor])) 786 ndata[minor] = instance 787 # double-check minor against reservation 788 r_key = (nname, minor) 789 assert r_key not in self._temporary_drbds, \ 790 ("Attempt to reuse reserved DRBD minor %d on node %s," 791 " reserved for instance %s" % 792 (minor, nname, self._temporary_drbds[r_key])) 793 self._temporary_drbds[r_key] = instance 794 result.append(minor) 795 logging.debug("Request to allocate drbd minors, input: %s, returning %s", 796 nodes, result) 797 return result
798
799 - def _UnlockedReleaseDRBDMinors(self, instance):
800 """Release temporary drbd minors allocated for a given instance. 801 802 @type instance: string 803 @param instance: the instance for which temporary minors should be 804 released 805 806 """ 807 assert isinstance(instance, basestring), \ 808 "Invalid argument passed to ReleaseDRBDMinors" 809 for key, name in self._temporary_drbds.items(): 810 if name == instance: 811 del self._temporary_drbds[key]
812 813 @locking.ssynchronized(_config_lock)
814 - def ReleaseDRBDMinors(self, instance):
815 """Release temporary drbd minors allocated for a given instance. 816 817 This should be called on the error paths, on the success paths 818 it's automatically called by the ConfigWriter add and update 819 functions. 820 821 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}. 822 823 @type instance: string 824 @param instance: the instance for which temporary minors should be 825 released 826 827 """ 828 self._UnlockedReleaseDRBDMinors(instance)
829 830 @locking.ssynchronized(_config_lock, shared=1)
831 - def GetConfigVersion(self):
832 """Get the configuration version. 833 834 @return: Config version 835 836 """ 837 return self._config_data.version
838 839 @locking.ssynchronized(_config_lock, shared=1)
840 - def GetClusterName(self):
841 """Get cluster name. 842 843 @return: Cluster name 844 845 """ 846 return self._config_data.cluster.cluster_name
847 848 @locking.ssynchronized(_config_lock, shared=1)
849 - def GetMasterNode(self):
850 """Get the hostname of the master node for this cluster. 851 852 @return: Master hostname 853 854 """ 855 return self._config_data.cluster.master_node
856 857 @locking.ssynchronized(_config_lock, shared=1)
858 - def GetMasterIP(self):
859 """Get the IP of the master node for this cluster. 860 861 @return: Master IP 862 863 """ 864 return self._config_data.cluster.master_ip
865 866 @locking.ssynchronized(_config_lock, shared=1)
867 - def GetMasterNetdev(self):
868 """Get the master network device for this cluster. 869 870 """ 871 return self._config_data.cluster.master_netdev
872 873 @locking.ssynchronized(_config_lock, shared=1)
874 - def GetFileStorageDir(self):
875 """Get the file storage dir for this cluster. 876 877 """ 878 return self._config_data.cluster.file_storage_dir
879 880 @locking.ssynchronized(_config_lock, shared=1)
881 - def GetHypervisorType(self):
882 """Get the hypervisor type for this cluster. 883 884 """ 885 return self._config_data.cluster.enabled_hypervisors[0]
886 887 @locking.ssynchronized(_config_lock, shared=1)
888 - def GetHostKey(self):
889 """Return the rsa hostkey from the config. 890 891 @rtype: string 892 @return: the rsa hostkey 893 894 """ 895 return self._config_data.cluster.rsahostkeypub
896 897 @locking.ssynchronized(_config_lock, shared=1)
898 - def GetDefaultIAllocator(self):
899 """Get the default instance allocator for this cluster. 900 901 """ 902 return self._config_data.cluster.default_iallocator
903 904 @locking.ssynchronized(_config_lock, shared=1)
905 - def GetPrimaryIPFamily(self):
906 """Get cluster primary ip family. 907 908 @return: primary ip family 909 910 """ 911 return self._config_data.cluster.primary_ip_family
912 913 @locking.ssynchronized(_config_lock)
914 - def AddNodeGroup(self, group, ec_id, check_uuid=True):
915 """Add a node group to the configuration. 916 917 This method calls group.UpgradeConfig() to fill any missing attributes 918 according to their default values. 919 920 @type group: L{objects.NodeGroup} 921 @param group: the NodeGroup object to add 922 @type ec_id: string 923 @param ec_id: unique id for the job to use when creating a missing UUID 924 @type check_uuid: bool 925 @param check_uuid: add an UUID to the group if it doesn't have one or, if 926 it does, ensure that it does not exist in the 927 configuration already 928 929 """ 930 self._UnlockedAddNodeGroup(group, ec_id, check_uuid) 931 self._WriteConfig()
932
933 - def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
934 """Add a node group to the configuration. 935 936 """ 937 logging.info("Adding node group %s to configuration", group.name) 938 939 # Some code might need to add a node group with a pre-populated UUID 940 # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass 941 # the "does this UUID" exist already check. 942 if check_uuid: 943 self._EnsureUUID(group, ec_id) 944 945 try: 946 existing_uuid = self._UnlockedLookupNodeGroup(group.name) 947 except errors.OpPrereqError: 948 pass 949 else: 950 raise errors.OpPrereqError("Desired group name '%s' already exists as a" 951 " node group (UUID: %s)" % 952 (group.name, existing_uuid), 953 errors.ECODE_EXISTS) 954 955 group.serial_no = 1 956 group.ctime = group.mtime = time.time() 957 group.UpgradeConfig() 958 959 self._config_data.nodegroups[group.uuid] = group 960 self._config_data.cluster.serial_no += 1
961 962 @locking.ssynchronized(_config_lock)
963 - def RemoveNodeGroup(self, group_uuid):
964 """Remove a node group from the configuration. 965 966 @type group_uuid: string 967 @param group_uuid: the UUID of the node group to remove 968 969 """ 970 logging.info("Removing node group %s from configuration", group_uuid) 971 972 if group_uuid not in self._config_data.nodegroups: 973 raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid) 974 975 assert len(self._config_data.nodegroups) != 1, \ 976 "Group '%s' is the only group, cannot be removed" % group_uuid 977 978 del self._config_data.nodegroups[group_uuid] 979 self._config_data.cluster.serial_no += 1 980 self._WriteConfig()
981
982 - def _UnlockedLookupNodeGroup(self, target):
983 """Lookup a node group's UUID. 984 985 @type target: string or None 986 @param target: group name or UUID or None to look for the default 987 @rtype: string 988 @return: nodegroup UUID 989 @raises errors.OpPrereqError: when the target group cannot be found 990 991 """ 992 if target is None: 993 if len(self._config_data.nodegroups) != 1: 994 raise errors.OpPrereqError("More than one node group exists. Target" 995 " group must be specified explicitely.") 996 else: 997 return self._config_data.nodegroups.keys()[0] 998 if target in self._config_data.nodegroups: 999 return target 1000 for nodegroup in self._config_data.nodegroups.values(): 1001 if nodegroup.name == target: 1002 return nodegroup.uuid 1003 raise errors.OpPrereqError("Node group '%s' not found" % target, 1004 errors.ECODE_NOENT)
1005 1006 @locking.ssynchronized(_config_lock, shared=1)
1007 - def LookupNodeGroup(self, target):
1008 """Lookup a node group's UUID. 1009 1010 This function is just a wrapper over L{_UnlockedLookupNodeGroup}. 1011 1012 @type target: string or None 1013 @param target: group name or UUID or None to look for the default 1014 @rtype: string 1015 @return: nodegroup UUID 1016 1017 """ 1018 return self._UnlockedLookupNodeGroup(target)
1019
1020 - def _UnlockedGetNodeGroup(self, uuid):
1021 """Lookup a node group. 1022 1023 @type uuid: string 1024 @param uuid: group UUID 1025 @rtype: L{objects.NodeGroup} or None 1026 @return: nodegroup object, or None if not found 1027 1028 """ 1029 if uuid not in self._config_data.nodegroups: 1030 return None 1031 1032 return self._config_data.nodegroups[uuid]
1033 1034 @locking.ssynchronized(_config_lock, shared=1)
1035 - def GetNodeGroup(self, uuid):
1036 """Lookup a node group. 1037 1038 @type uuid: string 1039 @param uuid: group UUID 1040 @rtype: L{objects.NodeGroup} or None 1041 @return: nodegroup object, or None if not found 1042 1043 """ 1044 return self._UnlockedGetNodeGroup(uuid)
1045 1046 @locking.ssynchronized(_config_lock, shared=1)
1047 - def GetAllNodeGroupsInfo(self):
1048 """Get the configuration of all node groups. 1049 1050 """ 1051 return dict(self._config_data.nodegroups)
1052 1053 @locking.ssynchronized(_config_lock, shared=1)
1054 - def GetNodeGroupList(self):
1055 """Get a list of node groups. 1056 1057 """ 1058 return self._config_data.nodegroups.keys()
1059 1060 @locking.ssynchronized(_config_lock)
1061 - def AddInstance(self, instance, ec_id):
1062 """Add an instance to the config. 1063 1064 This should be used after creating a new instance. 1065 1066 @type instance: L{objects.Instance} 1067 @param instance: the instance object 1068 1069 """ 1070 if not isinstance(instance, objects.Instance): 1071 raise errors.ProgrammerError("Invalid type passed to AddInstance") 1072 1073 if instance.disk_template != constants.DT_DISKLESS: 1074 all_lvs = instance.MapLVsByNode() 1075 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs) 1076 1077 all_macs = self._AllMACs() 1078 for nic in instance.nics: 1079 if nic.mac in all_macs: 1080 raise errors.ConfigurationError("Cannot add instance %s:" 1081 " MAC address '%s' already in use." % 1082 (instance.name, nic.mac)) 1083 1084 self._EnsureUUID(instance, ec_id) 1085 1086 instance.serial_no = 1 1087 instance.ctime = instance.mtime = time.time() 1088 self._config_data.instances[instance.name] = instance 1089 self._config_data.cluster.serial_no += 1 1090 self._UnlockedReleaseDRBDMinors(instance.name) 1091 self._WriteConfig()
1092
1093 - def _EnsureUUID(self, item, ec_id):
1094 """Ensures a given object has a valid UUID. 1095 1096 @param item: the instance or node to be checked 1097 @param ec_id: the execution context id for the uuid reservation 1098 1099 """ 1100 if not item.uuid: 1101 item.uuid = self._GenerateUniqueID(ec_id) 1102 elif item.uuid in self._AllIDs(include_temporary=True): 1103 raise errors.ConfigurationError("Cannot add '%s': UUID %s already" 1104 " in use" % (item.name, item.uuid))
1105
1106 - def _SetInstanceStatus(self, instance_name, status):
1107 """Set the instance's status to a given value. 1108 1109 """ 1110 assert isinstance(status, bool), \ 1111 "Invalid status '%s' passed to SetInstanceStatus" % (status,) 1112 1113 if instance_name not in self._config_data.instances: 1114 raise errors.ConfigurationError("Unknown instance '%s'" % 1115 instance_name) 1116 instance = self._config_data.instances[instance_name] 1117 if instance.admin_up != status: 1118 instance.admin_up = status 1119 instance.serial_no += 1 1120 instance.mtime = time.time() 1121 self._WriteConfig()
1122 1123 @locking.ssynchronized(_config_lock)
1124 - def MarkInstanceUp(self, instance_name):
1125 """Mark the instance status to up in the config. 1126 1127 """ 1128 self._SetInstanceStatus(instance_name, True)
1129 1130 @locking.ssynchronized(_config_lock)
1131 - def RemoveInstance(self, instance_name):
1132 """Remove the instance from the configuration. 1133 1134 """ 1135 if instance_name not in self._config_data.instances: 1136 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) 1137 del self._config_data.instances[instance_name] 1138 self._config_data.cluster.serial_no += 1 1139 self._WriteConfig()
1140 1141 @locking.ssynchronized(_config_lock)
1142 - def RenameInstance(self, old_name, new_name):
1143 """Rename an instance. 1144 1145 This needs to be done in ConfigWriter and not by RemoveInstance 1146 combined with AddInstance as only we can guarantee an atomic 1147 rename. 1148 1149 """ 1150 if old_name not in self._config_data.instances: 1151 raise errors.ConfigurationError("Unknown instance '%s'" % old_name) 1152 inst = self._config_data.instances[old_name] 1153 del self._config_data.instances[old_name] 1154 inst.name = new_name 1155 1156 for disk in inst.disks: 1157 if disk.dev_type == constants.LD_FILE: 1158 # rename the file paths in logical and physical id 1159 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1])) 1160 disk_fname = "disk%s" % disk.iv_name.split("/")[1] 1161 disk.physical_id = disk.logical_id = (disk.logical_id[0], 1162 utils.PathJoin(file_storage_dir, 1163 inst.name, 1164 disk_fname)) 1165 1166 # Force update of ssconf files 1167 self._config_data.cluster.serial_no += 1 1168 1169 self._config_data.instances[inst.name] = inst 1170 self._WriteConfig()
1171 1172 @locking.ssynchronized(_config_lock)
1173 - def MarkInstanceDown(self, instance_name):
1174 """Mark the status of an instance to down in the configuration. 1175 1176 """ 1177 self._SetInstanceStatus(instance_name, False)
1178
1179 - def _UnlockedGetInstanceList(self):
1180 """Get the list of instances. 1181 1182 This function is for internal use, when the config lock is already held. 1183 1184 """ 1185 return self._config_data.instances.keys()
1186 1187 @locking.ssynchronized(_config_lock, shared=1)
1188 - def GetInstanceList(self):
1189 """Get the list of instances. 1190 1191 @return: array of instances, ex. ['instance2.example.com', 1192 'instance1.example.com'] 1193 1194 """ 1195 return self._UnlockedGetInstanceList()
1196 1197 @locking.ssynchronized(_config_lock, shared=1)
1198 - def ExpandInstanceName(self, short_name):
1199 """Attempt to expand an incomplete instance name. 1200 1201 """ 1202 return utils.MatchNameComponent(short_name, 1203 self._config_data.instances.keys(), 1204 case_sensitive=False)
1205
1206 - def _UnlockedGetInstanceInfo(self, instance_name):
1207 """Returns information about an instance. 1208 1209 This function is for internal use, when the config lock is already held. 1210 1211 """ 1212 if instance_name not in self._config_data.instances: 1213 return None 1214 1215 return self._config_data.instances[instance_name]
1216 1217 @locking.ssynchronized(_config_lock, shared=1)
1218 - def GetInstanceInfo(self, instance_name):
1219 """Returns information about an instance. 1220 1221 It takes the information from the configuration file. Other information of 1222 an instance are taken from the live systems. 1223 1224 @param instance_name: name of the instance, e.g. 1225 I{instance1.example.com} 1226 1227 @rtype: L{objects.Instance} 1228 @return: the instance object 1229 1230 """ 1231 return self._UnlockedGetInstanceInfo(instance_name)
1232 1233 @locking.ssynchronized(_config_lock, shared=1)
1234 - def GetAllInstancesInfo(self):
1235 """Get the configuration of all instances. 1236 1237 @rtype: dict 1238 @return: dict of (instance, instance_info), where instance_info is what 1239 would GetInstanceInfo return for the node 1240 1241 """ 1242 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance)) 1243 for instance in self._UnlockedGetInstanceList()]) 1244 return my_dict
1245 1246 @locking.ssynchronized(_config_lock)
1247 - def AddNode(self, node, ec_id):
1248 """Add a node to the configuration. 1249 1250 @type node: L{objects.Node} 1251 @param node: a Node instance 1252 1253 """ 1254 logging.info("Adding node %s to configuration", node.name) 1255 1256 self._EnsureUUID(node, ec_id) 1257 1258 node.serial_no = 1 1259 node.ctime = node.mtime = time.time() 1260 self._UnlockedAddNodeToGroup(node.name, node.group) 1261 self._config_data.nodes[node.name] = node 1262 self._config_data.cluster.serial_no += 1 1263 self._WriteConfig()
1264 1265 @locking.ssynchronized(_config_lock)
1266 - def RemoveNode(self, node_name):
1267 """Remove a node from the configuration. 1268 1269 """ 1270 logging.info("Removing node %s from configuration", node_name) 1271 1272 if node_name not in self._config_data.nodes: 1273 raise errors.ConfigurationError("Unknown node '%s'" % node_name) 1274 1275 self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name]) 1276 del self._config_data.nodes[node_name] 1277 self._config_data.cluster.serial_no += 1 1278 self._WriteConfig()
1279 1280 @locking.ssynchronized(_config_lock, shared=1)
1281 - def ExpandNodeName(self, short_name):
1282 """Attempt to expand an incomplete instance name. 1283 1284 """ 1285 return utils.MatchNameComponent(short_name, 1286 self._config_data.nodes.keys(), 1287 case_sensitive=False)
1288
1289 - def _UnlockedGetNodeInfo(self, node_name):
1290 """Get the configuration of a node, as stored in the config. 1291 1292 This function is for internal use, when the config lock is already 1293 held. 1294 1295 @param node_name: the node name, e.g. I{node1.example.com} 1296 1297 @rtype: L{objects.Node} 1298 @return: the node object 1299 1300 """ 1301 if node_name not in self._config_data.nodes: 1302 return None 1303 1304 return self._config_data.nodes[node_name]
1305 1306 @locking.ssynchronized(_config_lock, shared=1)
1307 - def GetNodeInfo(self, node_name):
1308 """Get the configuration of a node, as stored in the config. 1309 1310 This is just a locked wrapper over L{_UnlockedGetNodeInfo}. 1311 1312 @param node_name: the node name, e.g. I{node1.example.com} 1313 1314 @rtype: L{objects.Node} 1315 @return: the node object 1316 1317 """ 1318 return self._UnlockedGetNodeInfo(node_name)
1319 1320 @locking.ssynchronized(_config_lock, shared=1)
1321 - def GetNodeInstances(self, node_name):
1322 """Get the instances of a node, as stored in the config. 1323 1324 @param node_name: the node name, e.g. I{node1.example.com} 1325 1326 @rtype: (list, list) 1327 @return: a tuple with two lists: the primary and the secondary instances 1328 1329 """ 1330 pri = [] 1331 sec = [] 1332 for inst in self._config_data.instances.values(): 1333 if inst.primary_node == node_name: 1334 pri.append(inst.name) 1335 if node_name in inst.secondary_nodes: 1336 sec.append(inst.name) 1337 return (pri, sec)
1338
1339 - def _UnlockedGetNodeList(self):
1340 """Return the list of nodes which are in the configuration. 1341 1342 This function is for internal use, when the config lock is already 1343 held. 1344 1345 @rtype: list 1346 1347 """ 1348 return self._config_data.nodes.keys()
1349 1350 @locking.ssynchronized(_config_lock, shared=1)
1351 - def GetNodeList(self):
1352 """Return the list of nodes which are in the configuration. 1353 1354 """ 1355 return self._UnlockedGetNodeList()
1356
1357 - def _UnlockedGetOnlineNodeList(self):
1358 """Return the list of nodes which are online. 1359 1360 """ 1361 all_nodes = [self._UnlockedGetNodeInfo(node) 1362 for node in self._UnlockedGetNodeList()] 1363 return [node.name for node in all_nodes if not node.offline]
1364 1365 @locking.ssynchronized(_config_lock, shared=1)
1366 - def GetOnlineNodeList(self):
1367 """Return the list of nodes which are online. 1368 1369 """ 1370 return self._UnlockedGetOnlineNodeList()
1371 1372 @locking.ssynchronized(_config_lock, shared=1)
1373 - def GetVmCapableNodeList(self):
1374 """Return the list of nodes which are not vm capable. 1375 1376 """ 1377 all_nodes = [self._UnlockedGetNodeInfo(node) 1378 for node in self._UnlockedGetNodeList()] 1379 return [node.name for node in all_nodes if node.vm_capable]
1380 1381 @locking.ssynchronized(_config_lock, shared=1)
1382 - def GetNonVmCapableNodeList(self):
1383 """Return the list of nodes which are not vm capable. 1384 1385 """ 1386 all_nodes = [self._UnlockedGetNodeInfo(node) 1387 for node in self._UnlockedGetNodeList()] 1388 return [node.name for node in all_nodes if not node.vm_capable]
1389 1390 @locking.ssynchronized(_config_lock, shared=1)
1391 - def GetAllNodesInfo(self):
1392 """Get the configuration of all nodes. 1393 1394 @rtype: dict 1395 @return: dict of (node, node_info), where node_info is what 1396 would GetNodeInfo return for the node 1397 1398 """ 1399 my_dict = dict([(node, self._UnlockedGetNodeInfo(node)) 1400 for node in self._UnlockedGetNodeList()]) 1401 return my_dict
1402 1403 @locking.ssynchronized(_config_lock, shared=1)
1404 - def GetNodeGroupsFromNodes(self, nodes):
1405 """Returns groups for a list of nodes. 1406 1407 @type nodes: list of string 1408 @param nodes: List of node names 1409 @rtype: frozenset 1410 1411 """ 1412 return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1413
1414 - def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1415 """Get the number of current and maximum desired and possible candidates. 1416 1417 @type exceptions: list 1418 @param exceptions: if passed, list of nodes that should be ignored 1419 @rtype: tuple 1420 @return: tuple of (current, desired and possible, possible) 1421 1422 """ 1423 mc_now = mc_should = mc_max = 0 1424 for node in self._config_data.nodes.values(): 1425 if exceptions and node.name in exceptions: 1426 continue 1427 if not (node.offline or node.drained) and node.master_capable: 1428 mc_max += 1 1429 if node.master_candidate: 1430 mc_now += 1 1431 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size) 1432 return (mc_now, mc_should, mc_max)
1433 1434 @locking.ssynchronized(_config_lock, shared=1)
1435 - def GetMasterCandidateStats(self, exceptions=None):
1436 """Get the number of current and maximum possible candidates. 1437 1438 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}. 1439 1440 @type exceptions: list 1441 @param exceptions: if passed, list of nodes that should be ignored 1442 @rtype: tuple 1443 @return: tuple of (current, max) 1444 1445 """ 1446 return self._UnlockedGetMasterCandidateStats(exceptions)
1447 1448 @locking.ssynchronized(_config_lock)
1449 - def MaintainCandidatePool(self, exceptions):
1450 """Try to grow the candidate pool to the desired size. 1451 1452 @type exceptions: list 1453 @param exceptions: if passed, list of nodes that should be ignored 1454 @rtype: list 1455 @return: list with the adjusted nodes (L{objects.Node} instances) 1456 1457 """ 1458 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions) 1459 mod_list = [] 1460 if mc_now < mc_max: 1461 node_list = self._config_data.nodes.keys() 1462 random.shuffle(node_list) 1463 for name in node_list: 1464 if mc_now >= mc_max: 1465 break 1466 node = self._config_data.nodes[name] 1467 if (node.master_candidate or node.offline or node.drained or 1468 node.name in exceptions or not node.master_capable): 1469 continue 1470 mod_list.append(node) 1471 node.master_candidate = True 1472 node.serial_no += 1 1473 mc_now += 1 1474 if mc_now != mc_max: 1475 # this should not happen 1476 logging.warning("Warning: MaintainCandidatePool didn't manage to" 1477 " fill the candidate pool (%d/%d)", mc_now, mc_max) 1478 if mod_list: 1479 self._config_data.cluster.serial_no += 1 1480 self._WriteConfig() 1481 1482 return mod_list
1483
1484 - def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1485 """Add a given node to the specified group. 1486 1487 """ 1488 if nodegroup_uuid not in self._config_data.nodegroups: 1489 # This can happen if a node group gets deleted between its lookup and 1490 # when we're adding the first node to it, since we don't keep a lock in 1491 # the meantime. It's ok though, as we'll fail cleanly if the node group 1492 # is not found anymore. 1493 raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid) 1494 if node_name not in self._config_data.nodegroups[nodegroup_uuid].members: 1495 self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1496
1497 - def _UnlockedRemoveNodeFromGroup(self, node):
1498 """Remove a given node from its group. 1499 1500 """ 1501 nodegroup = node.group 1502 if nodegroup not in self._config_data.nodegroups: 1503 logging.warning("Warning: node '%s' has unknown node group '%s'" 1504 " (while being removed from it)", node.name, nodegroup) 1505 nodegroup_obj = self._config_data.nodegroups[nodegroup] 1506 if node.name not in nodegroup_obj.members: 1507 logging.warning("Warning: node '%s' not a member of its node group '%s'" 1508 " (while being removed from it)", node.name, nodegroup) 1509 else: 1510 nodegroup_obj.members.remove(node.name)
1511
1512 - def _BumpSerialNo(self):
1513 """Bump up the serial number of the config. 1514 1515 """ 1516 self._config_data.serial_no += 1 1517 self._config_data.mtime = time.time()
1518
1519 - def _AllUUIDObjects(self):
1520 """Returns all objects with uuid attributes. 1521 1522 """ 1523 return (self._config_data.instances.values() + 1524 self._config_data.nodes.values() + 1525 self._config_data.nodegroups.values() + 1526 [self._config_data.cluster])
1527
1528 - def _OpenConfig(self, accept_foreign):
1529 """Read the config data from disk. 1530 1531 """ 1532 raw_data = utils.ReadFile(self._cfg_file) 1533 1534 try: 1535 data = objects.ConfigData.FromDict(serializer.Load(raw_data)) 1536 except Exception, err: 1537 raise errors.ConfigurationError(err) 1538 1539 # Make sure the configuration has the right version 1540 _ValidateConfig(data) 1541 1542 if (not hasattr(data, 'cluster') or 1543 not hasattr(data.cluster, 'rsahostkeypub')): 1544 raise errors.ConfigurationError("Incomplete configuration" 1545 " (missing cluster.rsahostkeypub)") 1546 1547 if data.cluster.master_node != self._my_hostname and not accept_foreign: 1548 msg = ("The configuration denotes node %s as master, while my" 1549 " hostname is %s; opening a foreign configuration is only" 1550 " possible in accept_foreign mode" % 1551 (data.cluster.master_node, self._my_hostname)) 1552 raise errors.ConfigurationError(msg) 1553 1554 # Upgrade configuration if needed 1555 data.UpgradeConfig() 1556 1557 self._config_data = data 1558 # reset the last serial as -1 so that the next write will cause 1559 # ssconf update 1560 self._last_cluster_serial = -1 1561 1562 # And finally run our (custom) config upgrade sequence 1563 self._UpgradeConfig() 1564 1565 self._cfg_id = utils.GetFileID(path=self._cfg_file)
1566
1567 - def _UpgradeConfig(self):
1568 """Run upgrade steps that cannot be done purely in the objects. 1569 1570 This is because some data elements need uniqueness across the 1571 whole configuration, etc. 1572 1573 @warning: this function will call L{_WriteConfig()}, but also 1574 L{DropECReservations} so it needs to be called only from a 1575 "safe" place (the constructor). If one wanted to call it with 1576 the lock held, a DropECReservationUnlocked would need to be 1577 created first, to avoid causing deadlock. 1578 1579 """ 1580 modified = False 1581 for item in self._AllUUIDObjects(): 1582 if item.uuid is None: 1583 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID) 1584 modified = True 1585 if not self._config_data.nodegroups: 1586 default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME 1587 default_nodegroup = objects.NodeGroup(name=default_nodegroup_name, 1588 members=[]) 1589 self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True) 1590 modified = True 1591 for node in self._config_data.nodes.values(): 1592 if not node.group: 1593 node.group = self.LookupNodeGroup(None) 1594 modified = True 1595 # This is technically *not* an upgrade, but needs to be done both when 1596 # nodegroups are being added, and upon normally loading the config, 1597 # because the members list of a node group is discarded upon 1598 # serializing/deserializing the object. 1599 self._UnlockedAddNodeToGroup(node.name, node.group) 1600 if modified: 1601 self._WriteConfig() 1602 # This is ok even if it acquires the internal lock, as _UpgradeConfig is 1603 # only called at config init time, without the lock held 1604 self.DropECReservations(_UPGRADE_CONFIG_JID)
1605
1606 - def _DistributeConfig(self, feedback_fn):
1607 """Distribute the configuration to the other nodes. 1608 1609 Currently, this only copies the configuration file. In the future, 1610 it could be used to encapsulate the 2/3-phase update mechanism. 1611 1612 """ 1613 if self._offline: 1614 return True 1615 1616 bad = False 1617 1618 node_list = [] 1619 addr_list = [] 1620 myhostname = self._my_hostname 1621 # we can skip checking whether _UnlockedGetNodeInfo returns None 1622 # since the node list comes from _UnlocketGetNodeList, and we are 1623 # called with the lock held, so no modifications should take place 1624 # in between 1625 for node_name in self._UnlockedGetNodeList(): 1626 if node_name == myhostname: 1627 continue 1628 node_info = self._UnlockedGetNodeInfo(node_name) 1629 if not node_info.master_candidate: 1630 continue 1631 node_list.append(node_info.name) 1632 addr_list.append(node_info.primary_ip) 1633 1634 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file, 1635 address_list=addr_list) 1636 for to_node, to_result in result.items(): 1637 msg = to_result.fail_msg 1638 if msg: 1639 msg = ("Copy of file %s to node %s failed: %s" % 1640 (self._cfg_file, to_node, msg)) 1641 logging.error(msg) 1642 1643 if feedback_fn: 1644 feedback_fn(msg) 1645 1646 bad = True 1647 1648 return not bad
1649
1650 - def _WriteConfig(self, destination=None, feedback_fn=None):
1651 """Write the configuration data to persistent storage. 1652 1653 """ 1654 assert feedback_fn is None or callable(feedback_fn) 1655 1656 # Warn on config errors, but don't abort the save - the 1657 # configuration has already been modified, and we can't revert; 1658 # the best we can do is to warn the user and save as is, leaving 1659 # recovery to the user 1660 config_errors = self._UnlockedVerifyConfig() 1661 if config_errors: 1662 errmsg = ("Configuration data is not consistent: %s" % 1663 (utils.CommaJoin(config_errors))) 1664 logging.critical(errmsg) 1665 if feedback_fn: 1666 feedback_fn(errmsg) 1667 1668 if destination is None: 1669 destination = self._cfg_file 1670 self._BumpSerialNo() 1671 txt = serializer.Dump(self._config_data.ToDict()) 1672 1673 getents = self._getents() 1674 try: 1675 fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt, 1676 close=False, gid=getents.confd_gid, mode=0640) 1677 except errors.LockError: 1678 raise errors.ConfigurationError("The configuration file has been" 1679 " modified since the last write, cannot" 1680 " update") 1681 try: 1682 self._cfg_id = utils.GetFileID(fd=fd) 1683 finally: 1684 os.close(fd) 1685 1686 self.write_count += 1 1687 1688 # and redistribute the config file to master candidates 1689 self._DistributeConfig(feedback_fn) 1690 1691 # Write ssconf files on all nodes (including locally) 1692 if self._last_cluster_serial < self._config_data.cluster.serial_no: 1693 if not self._offline: 1694 result = rpc.RpcRunner.call_write_ssconf_files( 1695 self._UnlockedGetOnlineNodeList(), 1696 self._UnlockedGetSsconfValues()) 1697 1698 for nname, nresu in result.items(): 1699 msg = nresu.fail_msg 1700 if msg: 1701 errmsg = ("Error while uploading ssconf files to" 1702 " node %s: %s" % (nname, msg)) 1703 logging.warning(errmsg) 1704 1705 if feedback_fn: 1706 feedback_fn(errmsg) 1707 1708 self._last_cluster_serial = self._config_data.cluster.serial_no
1709
1710 - def _UnlockedGetSsconfValues(self):
1711 """Return the values needed by ssconf. 1712 1713 @rtype: dict 1714 @return: a dictionary with keys the ssconf names and values their 1715 associated value 1716 1717 """ 1718 fn = "\n".join 1719 instance_names = utils.NiceSort(self._UnlockedGetInstanceList()) 1720 node_names = utils.NiceSort(self._UnlockedGetNodeList()) 1721 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names] 1722 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip) 1723 for ninfo in node_info] 1724 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip) 1725 for ninfo in node_info] 1726 1727 instance_data = fn(instance_names) 1728 off_data = fn(node.name for node in node_info if node.offline) 1729 on_data = fn(node.name for node in node_info if not node.offline) 1730 mc_data = fn(node.name for node in node_info if node.master_candidate) 1731 mc_ips_data = fn(node.primary_ip for node in node_info 1732 if node.master_candidate) 1733 node_data = fn(node_names) 1734 node_pri_ips_data = fn(node_pri_ips) 1735 node_snd_ips_data = fn(node_snd_ips) 1736 1737 cluster = self._config_data.cluster 1738 cluster_tags = fn(cluster.GetTags()) 1739 1740 hypervisor_list = fn(cluster.enabled_hypervisors) 1741 1742 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n") 1743 1744 nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in 1745 self._config_data.nodegroups.values()] 1746 nodegroups_data = fn(utils.NiceSort(nodegroups)) 1747 1748 return { 1749 constants.SS_CLUSTER_NAME: cluster.cluster_name, 1750 constants.SS_CLUSTER_TAGS: cluster_tags, 1751 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir, 1752 constants.SS_MASTER_CANDIDATES: mc_data, 1753 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data, 1754 constants.SS_MASTER_IP: cluster.master_ip, 1755 constants.SS_MASTER_NETDEV: cluster.master_netdev, 1756 constants.SS_MASTER_NODE: cluster.master_node, 1757 constants.SS_NODE_LIST: node_data, 1758 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data, 1759 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data, 1760 constants.SS_OFFLINE_NODES: off_data, 1761 constants.SS_ONLINE_NODES: on_data, 1762 constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family), 1763 constants.SS_INSTANCE_LIST: instance_data, 1764 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION, 1765 constants.SS_HYPERVISOR_LIST: hypervisor_list, 1766 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health), 1767 constants.SS_UID_POOL: uid_pool, 1768 constants.SS_NODEGROUPS: nodegroups_data, 1769 }
1770 1771 @locking.ssynchronized(_config_lock, shared=1)
1772 - def GetSsconfValues(self):
1773 """Wrapper using lock around _UnlockedGetSsconf(). 1774 1775 """ 1776 return self._UnlockedGetSsconfValues()
1777 1778 @locking.ssynchronized(_config_lock, shared=1)
1779 - def GetVGName(self):
1780 """Return the volume group name. 1781 1782 """ 1783 return self._config_data.cluster.volume_group_name
1784 1785 @locking.ssynchronized(_config_lock)
1786 - def SetVGName(self, vg_name):
1787 """Set the volume group name. 1788 1789 """ 1790 self._config_data.cluster.volume_group_name = vg_name 1791 self._config_data.cluster.serial_no += 1 1792 self._WriteConfig()
1793 1794 @locking.ssynchronized(_config_lock, shared=1)
1795 - def GetDRBDHelper(self):
1796 """Return DRBD usermode helper. 1797 1798 """ 1799 return self._config_data.cluster.drbd_usermode_helper
1800 1801 @locking.ssynchronized(_config_lock)
1802 - def SetDRBDHelper(self, drbd_helper):
1803 """Set DRBD usermode helper. 1804 1805 """ 1806 self._config_data.cluster.drbd_usermode_helper = drbd_helper 1807 self._config_data.cluster.serial_no += 1 1808 self._WriteConfig()
1809 1810 @locking.ssynchronized(_config_lock, shared=1)
1811 - def GetMACPrefix(self):
1812 """Return the mac prefix. 1813 1814 """ 1815 return self._config_data.cluster.mac_prefix
1816 1817 @locking.ssynchronized(_config_lock, shared=1)
1818 - def GetClusterInfo(self):
1819 """Returns information about the cluster 1820 1821 @rtype: L{objects.Cluster} 1822 @return: the cluster object 1823 1824 """ 1825 return self._config_data.cluster
1826 1827 @locking.ssynchronized(_config_lock, shared=1)
1828 - def HasAnyDiskOfType(self, dev_type):
1829 """Check if in there is at disk of the given type in the configuration. 1830 1831 """ 1832 return self._config_data.HasAnyDiskOfType(dev_type)
1833 1834 @locking.ssynchronized(_config_lock)
1835 - def Update(self, target, feedback_fn):
1836 """Notify function to be called after updates. 1837 1838 This function must be called when an object (as returned by 1839 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the 1840 caller wants the modifications saved to the backing store. Note 1841 that all modified objects will be saved, but the target argument 1842 is the one the caller wants to ensure that it's saved. 1843 1844 @param target: an instance of either L{objects.Cluster}, 1845 L{objects.Node} or L{objects.Instance} which is existing in 1846 the cluster 1847 @param feedback_fn: Callable feedback function 1848 1849 """ 1850 if self._config_data is None: 1851 raise errors.ProgrammerError("Configuration file not read," 1852 " cannot save.") 1853 update_serial = False 1854 if isinstance(target, objects.Cluster): 1855 test = target == self._config_data.cluster 1856 elif isinstance(target, objects.Node): 1857 test = target in self._config_data.nodes.values() 1858 update_serial = True 1859 elif isinstance(target, objects.Instance): 1860 test = target in self._config_data.instances.values() 1861 elif isinstance(target, objects.NodeGroup): 1862 test = target in self._config_data.nodegroups.values() 1863 else: 1864 raise errors.ProgrammerError("Invalid object type (%s) passed to" 1865 " ConfigWriter.Update" % type(target)) 1866 if not test: 1867 raise errors.ConfigurationError("Configuration updated since object" 1868 " has been read or unknown object") 1869 target.serial_no += 1 1870 target.mtime = now = time.time() 1871 1872 if update_serial: 1873 # for node updates, we need to increase the cluster serial too 1874 self._config_data.cluster.serial_no += 1 1875 self._config_data.cluster.mtime = now 1876 1877 if isinstance(target, objects.Instance): 1878 self._UnlockedReleaseDRBDMinors(target.name) 1879 1880 self._WriteConfig(feedback_fn=feedback_fn)
1881 1882 @locking.ssynchronized(_config_lock)
1883 - def DropECReservations(self, ec_id):
1884 """Drop per-execution-context reservations 1885 1886 """ 1887 for rm in self._all_rms: 1888 rm.DropECReservations(ec_id)
1889