Package ganeti :: Module config
[hide private]
[frames] | no frames]

Source Code for Module ganeti.config

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Configuration management for Ganeti 
  23   
  24  This module provides the interface to the Ganeti cluster configuration. 
  25   
  26  The configuration data is stored on every node but is updated on the master 
  27  only. After each update, the master distributes the data to the other nodes. 
  28   
  29  Currently, the data storage format is JSON. YAML was slow and consuming too 
  30  much memory. 
  31   
  32  """ 
  33   
  34  # pylint: disable-msg=R0904 
  35  # R0904: Too many public methods 
  36   
  37  import os 
  38  import random 
  39  import logging 
  40  import time 
  41   
  42  from ganeti import errors 
  43  from ganeti import locking 
  44  from ganeti import utils 
  45  from ganeti import constants 
  46  from ganeti import rpc 
  47  from ganeti import objects 
  48  from ganeti import serializer 
  49  from ganeti import uidpool 
  50  from ganeti import netutils 
  51  from ganeti import runtime 
  52   
  53   
  54  _config_lock = locking.SharedLock("ConfigWriter") 
  55   
  56  # job id used for resource management at config upgrade time 
  57  _UPGRADE_CONFIG_JID = "jid-cfg-upgrade" 
58 59 60 -def _ValidateConfig(data):
61 """Verifies that a configuration objects looks valid. 62 63 This only verifies the version of the configuration. 64 65 @raise errors.ConfigurationError: if the version differs from what 66 we expect 67 68 """ 69 if data.version != constants.CONFIG_VERSION: 70 raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
71
72 73 -class TemporaryReservationManager:
74 """A temporary resource reservation manager. 75 76 This is used to reserve resources in a job, before using them, making sure 77 other jobs cannot get them in the meantime. 78 79 """
80 - def __init__(self):
81 self._ec_reserved = {}
82
83 - def Reserved(self, resource):
84 for holder_reserved in self._ec_reserved.values(): 85 if resource in holder_reserved: 86 return True 87 return False
88
89 - def Reserve(self, ec_id, resource):
90 if self.Reserved(resource): 91 raise errors.ReservationError("Duplicate reservation for resource '%s'" 92 % str(resource)) 93 if ec_id not in self._ec_reserved: 94 self._ec_reserved[ec_id] = set([resource]) 95 else: 96 self._ec_reserved[ec_id].add(resource)
97
98 - def DropECReservations(self, ec_id):
99 if ec_id in self._ec_reserved: 100 del self._ec_reserved[ec_id]
101
102 - def GetReserved(self):
103 all_reserved = set() 104 for holder_reserved in self._ec_reserved.values(): 105 all_reserved.update(holder_reserved) 106 return all_reserved
107
108 - def Generate(self, existing, generate_one_fn, ec_id):
109 """Generate a new resource of this type 110 111 """ 112 assert callable(generate_one_fn) 113 114 all_elems = self.GetReserved() 115 all_elems.update(existing) 116 retries = 64 117 while retries > 0: 118 new_resource = generate_one_fn() 119 if new_resource is not None and new_resource not in all_elems: 120 break 121 else: 122 raise errors.ConfigurationError("Not able generate new resource" 123 " (last tried: %s)" % new_resource) 124 self.Reserve(ec_id, new_resource) 125 return new_resource
126
127 128 -class ConfigWriter:
129 """The interface to the cluster configuration. 130 131 @ivar _temporary_lvs: reservation manager for temporary LVs 132 @ivar _all_rms: a list of all temporary reservation managers 133 134 """
135 - def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts, 136 accept_foreign=False):
137 self.write_count = 0 138 self._lock = _config_lock 139 self._config_data = None 140 self._offline = offline 141 if cfg_file is None: 142 self._cfg_file = constants.CLUSTER_CONF_FILE 143 else: 144 self._cfg_file = cfg_file 145 self._getents = _getents 146 self._temporary_ids = TemporaryReservationManager() 147 self._temporary_drbds = {} 148 self._temporary_macs = TemporaryReservationManager() 149 self._temporary_secrets = TemporaryReservationManager() 150 self._temporary_lvs = TemporaryReservationManager() 151 self._all_rms = [self._temporary_ids, self._temporary_macs, 152 self._temporary_secrets, self._temporary_lvs] 153 # Note: in order to prevent errors when resolving our name in 154 # _DistributeConfig, we compute it here once and reuse it; it's 155 # better to raise an error before starting to modify the config 156 # file than after it was modified 157 self._my_hostname = netutils.Hostname.GetSysName() 158 self._last_cluster_serial = -1 159 self._cfg_id = None 160 self._OpenConfig(accept_foreign)
161 162 # this method needs to be static, so that we can call it on the class 163 @staticmethod
164 - def IsCluster():
165 """Check if the cluster is configured. 166 167 """ 168 return os.path.exists(constants.CLUSTER_CONF_FILE)
169
170 - def _GenerateOneMAC(self):
171 """Generate one mac address 172 173 """ 174 prefix = self._config_data.cluster.mac_prefix 175 byte1 = random.randrange(0, 256) 176 byte2 = random.randrange(0, 256) 177 byte3 = random.randrange(0, 256) 178 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3) 179 return mac
180 181 @locking.ssynchronized(_config_lock, shared=1)
182 - def GenerateMAC(self, ec_id):
183 """Generate a MAC for an instance. 184 185 This should check the current instances for duplicates. 186 187 """ 188 existing = self._AllMACs() 189 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
190 191 @locking.ssynchronized(_config_lock, shared=1)
192 - def ReserveMAC(self, mac, ec_id):
193 """Reserve a MAC for an instance. 194 195 This only checks instances managed by this cluster, it does not 196 check for potential collisions elsewhere. 197 198 """ 199 all_macs = self._AllMACs() 200 if mac in all_macs: 201 raise errors.ReservationError("mac already in use") 202 else: 203 self._temporary_macs.Reserve(mac, ec_id)
204 205 @locking.ssynchronized(_config_lock, shared=1)
206 - def ReserveLV(self, lv_name, ec_id):
207 """Reserve an VG/LV pair for an instance. 208 209 @type lv_name: string 210 @param lv_name: the logical volume name to reserve 211 212 """ 213 all_lvs = self._AllLVs() 214 if lv_name in all_lvs: 215 raise errors.ReservationError("LV already in use") 216 else: 217 self._temporary_lvs.Reserve(lv_name, ec_id)
218 219 @locking.ssynchronized(_config_lock, shared=1)
220 - def GenerateDRBDSecret(self, ec_id):
221 """Generate a DRBD secret. 222 223 This checks the current disks for duplicates. 224 225 """ 226 return self._temporary_secrets.Generate(self._AllDRBDSecrets(), 227 utils.GenerateSecret, 228 ec_id)
229
230 - def _AllLVs(self):
231 """Compute the list of all LVs. 232 233 """ 234 lvnames = set() 235 for instance in self._config_data.instances.values(): 236 node_data = instance.MapLVsByNode() 237 for lv_list in node_data.values(): 238 lvnames.update(lv_list) 239 return lvnames
240
241 - def _AllIDs(self, include_temporary):
242 """Compute the list of all UUIDs and names we have. 243 244 @type include_temporary: boolean 245 @param include_temporary: whether to include the _temporary_ids set 246 @rtype: set 247 @return: a set of IDs 248 249 """ 250 existing = set() 251 if include_temporary: 252 existing.update(self._temporary_ids.GetReserved()) 253 existing.update(self._AllLVs()) 254 existing.update(self._config_data.instances.keys()) 255 existing.update(self._config_data.nodes.keys()) 256 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid]) 257 return existing
258
259 - def _GenerateUniqueID(self, ec_id):
260 """Generate an unique UUID. 261 262 This checks the current node, instances and disk names for 263 duplicates. 264 265 @rtype: string 266 @return: the unique id 267 268 """ 269 existing = self._AllIDs(include_temporary=False) 270 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
271 272 @locking.ssynchronized(_config_lock, shared=1)
273 - def GenerateUniqueID(self, ec_id):
274 """Generate an unique ID. 275 276 This is just a wrapper over the unlocked version. 277 278 @type ec_id: string 279 @param ec_id: unique id for the job to reserve the id to 280 281 """ 282 return self._GenerateUniqueID(ec_id)
283
284 - def _AllMACs(self):
285 """Return all MACs present in the config. 286 287 @rtype: list 288 @return: the list of all MACs 289 290 """ 291 result = [] 292 for instance in self._config_data.instances.values(): 293 for nic in instance.nics: 294 result.append(nic.mac) 295 296 return result
297
298 - def _AllDRBDSecrets(self):
299 """Return all DRBD secrets present in the config. 300 301 @rtype: list 302 @return: the list of all DRBD secrets 303 304 """ 305 def helper(disk, result): 306 """Recursively gather secrets from this disk.""" 307 if disk.dev_type == constants.DT_DRBD8: 308 result.append(disk.logical_id[5]) 309 if disk.children: 310 for child in disk.children: 311 helper(child, result)
312 313 result = [] 314 for instance in self._config_data.instances.values(): 315 for disk in instance.disks: 316 helper(disk, result) 317 318 return result
319
320 - def _CheckDiskIDs(self, disk, l_ids, p_ids):
321 """Compute duplicate disk IDs 322 323 @type disk: L{objects.Disk} 324 @param disk: the disk at which to start searching 325 @type l_ids: list 326 @param l_ids: list of current logical ids 327 @type p_ids: list 328 @param p_ids: list of current physical ids 329 @rtype: list 330 @return: a list of error messages 331 332 """ 333 result = [] 334 if disk.logical_id is not None: 335 if disk.logical_id in l_ids: 336 result.append("duplicate logical id %s" % str(disk.logical_id)) 337 else: 338 l_ids.append(disk.logical_id) 339 if disk.physical_id is not None: 340 if disk.physical_id in p_ids: 341 result.append("duplicate physical id %s" % str(disk.physical_id)) 342 else: 343 p_ids.append(disk.physical_id) 344 345 if disk.children: 346 for child in disk.children: 347 result.extend(self._CheckDiskIDs(child, l_ids, p_ids)) 348 return result
349
350 - def _UnlockedVerifyConfig(self):
351 """Verify function. 352 353 @rtype: list 354 @return: a list of error messages; a non-empty list signifies 355 configuration errors 356 357 """ 358 result = [] 359 seen_macs = [] 360 ports = {} 361 data = self._config_data 362 seen_lids = [] 363 seen_pids = [] 364 365 # global cluster checks 366 if not data.cluster.enabled_hypervisors: 367 result.append("enabled hypervisors list doesn't have any entries") 368 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES 369 if invalid_hvs: 370 result.append("enabled hypervisors contains invalid entries: %s" % 371 invalid_hvs) 372 missing_hvp = (set(data.cluster.enabled_hypervisors) - 373 set(data.cluster.hvparams.keys())) 374 if missing_hvp: 375 result.append("hypervisor parameters missing for the enabled" 376 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp)) 377 378 if data.cluster.master_node not in data.nodes: 379 result.append("cluster has invalid primary node '%s'" % 380 data.cluster.master_node) 381 382 # per-instance checks 383 for instance_name in data.instances: 384 instance = data.instances[instance_name] 385 if instance.name != instance_name: 386 result.append("instance '%s' is indexed by wrong name '%s'" % 387 (instance.name, instance_name)) 388 if instance.primary_node not in data.nodes: 389 result.append("instance '%s' has invalid primary node '%s'" % 390 (instance_name, instance.primary_node)) 391 for snode in instance.secondary_nodes: 392 if snode not in data.nodes: 393 result.append("instance '%s' has invalid secondary node '%s'" % 394 (instance_name, snode)) 395 for idx, nic in enumerate(instance.nics): 396 if nic.mac in seen_macs: 397 result.append("instance '%s' has NIC %d mac %s duplicate" % 398 (instance_name, idx, nic.mac)) 399 else: 400 seen_macs.append(nic.mac) 401 402 # gather the drbd ports for duplicate checks 403 for dsk in instance.disks: 404 if dsk.dev_type in constants.LDS_DRBD: 405 tcp_port = dsk.logical_id[2] 406 if tcp_port not in ports: 407 ports[tcp_port] = [] 408 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name)) 409 # gather network port reservation 410 net_port = getattr(instance, "network_port", None) 411 if net_port is not None: 412 if net_port not in ports: 413 ports[net_port] = [] 414 ports[net_port].append((instance.name, "network port")) 415 416 # instance disk verify 417 for idx, disk in enumerate(instance.disks): 418 result.extend(["instance '%s' disk %d error: %s" % 419 (instance.name, idx, msg) for msg in disk.Verify()]) 420 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids)) 421 422 # cluster-wide pool of free ports 423 for free_port in data.cluster.tcpudp_port_pool: 424 if free_port not in ports: 425 ports[free_port] = [] 426 ports[free_port].append(("cluster", "port marked as free")) 427 428 # compute tcp/udp duplicate ports 429 keys = ports.keys() 430 keys.sort() 431 for pnum in keys: 432 pdata = ports[pnum] 433 if len(pdata) > 1: 434 txt = utils.CommaJoin(["%s/%s" % val for val in pdata]) 435 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt)) 436 437 # highest used tcp port check 438 if keys: 439 if keys[-1] > data.cluster.highest_used_port: 440 result.append("Highest used port mismatch, saved %s, computed %s" % 441 (data.cluster.highest_used_port, keys[-1])) 442 443 if not data.nodes[data.cluster.master_node].master_candidate: 444 result.append("Master node is not a master candidate") 445 446 # master candidate checks 447 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats() 448 if mc_now < mc_max: 449 result.append("Not enough master candidates: actual %d, target %d" % 450 (mc_now, mc_max)) 451 452 # node checks 453 for node_name, node in data.nodes.items(): 454 if node.name != node_name: 455 result.append("Node '%s' is indexed by wrong name '%s'" % 456 (node.name, node_name)) 457 if [node.master_candidate, node.drained, node.offline].count(True) > 1: 458 result.append("Node %s state is invalid: master_candidate=%s," 459 " drain=%s, offline=%s" % 460 (node.name, node.master_candidate, node.drained, 461 node.offline)) 462 463 # nodegroups checks 464 nodegroups_names = set() 465 for nodegroup_uuid in data.nodegroups: 466 nodegroup = data.nodegroups[nodegroup_uuid] 467 if nodegroup.uuid != nodegroup_uuid: 468 result.append("nodegroup '%s' (uuid: '%s') indexed by wrong uuid '%s'" 469 % (nodegroup.name, nodegroup.uuid, nodegroup_uuid)) 470 if utils.UUID_RE.match(nodegroup.name.lower()): 471 result.append("nodegroup '%s' (uuid: '%s') has uuid-like name" % 472 (nodegroup.name, nodegroup.uuid)) 473 if nodegroup.name in nodegroups_names: 474 result.append("duplicate nodegroup name '%s'" % nodegroup.name) 475 else: 476 nodegroups_names.add(nodegroup.name) 477 478 # drbd minors check 479 _, duplicates = self._UnlockedComputeDRBDMap() 480 for node, minor, instance_a, instance_b in duplicates: 481 result.append("DRBD minor %d on node %s is assigned twice to instances" 482 " %s and %s" % (minor, node, instance_a, instance_b)) 483 484 # IP checks 485 default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT] 486 ips = {} 487 488 def _AddIpAddress(ip, name): 489 ips.setdefault(ip, []).append(name)
490 491 _AddIpAddress(data.cluster.master_ip, "cluster_ip") 492 493 for node in data.nodes.values(): 494 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name) 495 if node.secondary_ip != node.primary_ip: 496 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name) 497 498 for instance in data.instances.values(): 499 for idx, nic in enumerate(instance.nics): 500 if nic.ip is None: 501 continue 502 503 nicparams = objects.FillDict(default_nicparams, nic.nicparams) 504 nic_mode = nicparams[constants.NIC_MODE] 505 nic_link = nicparams[constants.NIC_LINK] 506 507 if nic_mode == constants.NIC_MODE_BRIDGED: 508 link = "bridge:%s" % nic_link 509 elif nic_mode == constants.NIC_MODE_ROUTED: 510 link = "route:%s" % nic_link 511 else: 512 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode) 513 514 _AddIpAddress("%s/%s" % (link, nic.ip), 515 "instance:%s/nic:%d" % (instance.name, idx)) 516 517 for ip, owners in ips.items(): 518 if len(owners) > 1: 519 result.append("IP address %s is used by multiple owners: %s" % 520 (ip, utils.CommaJoin(owners))) 521 522 return result 523 524 @locking.ssynchronized(_config_lock, shared=1)
525 - def VerifyConfig(self):
526 """Verify function. 527 528 This is just a wrapper over L{_UnlockedVerifyConfig}. 529 530 @rtype: list 531 @return: a list of error messages; a non-empty list signifies 532 configuration errors 533 534 """ 535 return self._UnlockedVerifyConfig()
536
537 - def _UnlockedSetDiskID(self, disk, node_name):
538 """Convert the unique ID to the ID needed on the target nodes. 539 540 This is used only for drbd, which needs ip/port configuration. 541 542 The routine descends down and updates its children also, because 543 this helps when the only the top device is passed to the remote 544 node. 545 546 This function is for internal use, when the config lock is already held. 547 548 """ 549 if disk.children: 550 for child in disk.children: 551 self._UnlockedSetDiskID(child, node_name) 552 553 if disk.logical_id is None and disk.physical_id is not None: 554 return 555 if disk.dev_type == constants.LD_DRBD8: 556 pnode, snode, port, pminor, sminor, secret = disk.logical_id 557 if node_name not in (pnode, snode): 558 raise errors.ConfigurationError("DRBD device not knowing node %s" % 559 node_name) 560 pnode_info = self._UnlockedGetNodeInfo(pnode) 561 snode_info = self._UnlockedGetNodeInfo(snode) 562 if pnode_info is None or snode_info is None: 563 raise errors.ConfigurationError("Can't find primary or secondary node" 564 " for %s" % str(disk)) 565 p_data = (pnode_info.secondary_ip, port) 566 s_data = (snode_info.secondary_ip, port) 567 if pnode == node_name: 568 disk.physical_id = p_data + s_data + (pminor, secret) 569 else: # it must be secondary, we tested above 570 disk.physical_id = s_data + p_data + (sminor, secret) 571 else: 572 disk.physical_id = disk.logical_id 573 return
574 575 @locking.ssynchronized(_config_lock)
576 - def SetDiskID(self, disk, node_name):
577 """Convert the unique ID to the ID needed on the target nodes. 578 579 This is used only for drbd, which needs ip/port configuration. 580 581 The routine descends down and updates its children also, because 582 this helps when the only the top device is passed to the remote 583 node. 584 585 """ 586 return self._UnlockedSetDiskID(disk, node_name)
587 588 @locking.ssynchronized(_config_lock)
589 - def AddTcpUdpPort(self, port):
590 """Adds a new port to the available port pool. 591 592 """ 593 if not isinstance(port, int): 594 raise errors.ProgrammerError("Invalid type passed for port") 595 596 self._config_data.cluster.tcpudp_port_pool.add(port) 597 self._WriteConfig()
598 599 @locking.ssynchronized(_config_lock, shared=1)
600 - def GetPortList(self):
601 """Returns a copy of the current port list. 602 603 """ 604 return self._config_data.cluster.tcpudp_port_pool.copy()
605 606 @locking.ssynchronized(_config_lock)
607 - def AllocatePort(self):
608 """Allocate a port. 609 610 The port will be taken from the available port pool or from the 611 default port range (and in this case we increase 612 highest_used_port). 613 614 """ 615 # If there are TCP/IP ports configured, we use them first. 616 if self._config_data.cluster.tcpudp_port_pool: 617 port = self._config_data.cluster.tcpudp_port_pool.pop() 618 else: 619 port = self._config_data.cluster.highest_used_port + 1 620 if port >= constants.LAST_DRBD_PORT: 621 raise errors.ConfigurationError("The highest used port is greater" 622 " than %s. Aborting." % 623 constants.LAST_DRBD_PORT) 624 self._config_data.cluster.highest_used_port = port 625 626 self._WriteConfig() 627 return port
628
629 - def _UnlockedComputeDRBDMap(self):
630 """Compute the used DRBD minor/nodes. 631 632 @rtype: (dict, list) 633 @return: dictionary of node_name: dict of minor: instance_name; 634 the returned dict will have all the nodes in it (even if with 635 an empty list), and a list of duplicates; if the duplicates 636 list is not empty, the configuration is corrupted and its caller 637 should raise an exception 638 639 """ 640 def _AppendUsedPorts(instance_name, disk, used): 641 duplicates = [] 642 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5: 643 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5] 644 for node, port in ((node_a, minor_a), (node_b, minor_b)): 645 assert node in used, ("Node '%s' of instance '%s' not found" 646 " in node list" % (node, instance_name)) 647 if port in used[node]: 648 duplicates.append((node, port, instance_name, used[node][port])) 649 else: 650 used[node][port] = instance_name 651 if disk.children: 652 for child in disk.children: 653 duplicates.extend(_AppendUsedPorts(instance_name, child, used)) 654 return duplicates
655 656 duplicates = [] 657 my_dict = dict((node, {}) for node in self._config_data.nodes) 658 for instance in self._config_data.instances.itervalues(): 659 for disk in instance.disks: 660 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict)) 661 for (node, minor), instance in self._temporary_drbds.iteritems(): 662 if minor in my_dict[node] and my_dict[node][minor] != instance: 663 duplicates.append((node, minor, instance, my_dict[node][minor])) 664 else: 665 my_dict[node][minor] = instance 666 return my_dict, duplicates 667 668 @locking.ssynchronized(_config_lock)
669 - def ComputeDRBDMap(self):
670 """Compute the used DRBD minor/nodes. 671 672 This is just a wrapper over L{_UnlockedComputeDRBDMap}. 673 674 @return: dictionary of node_name: dict of minor: instance_name; 675 the returned dict will have all the nodes in it (even if with 676 an empty list). 677 678 """ 679 d_map, duplicates = self._UnlockedComputeDRBDMap() 680 if duplicates: 681 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 682 str(duplicates)) 683 return d_map
684 685 @locking.ssynchronized(_config_lock)
686 - def AllocateDRBDMinor(self, nodes, instance):
687 """Allocate a drbd minor. 688 689 The free minor will be automatically computed from the existing 690 devices. A node can be given multiple times in order to allocate 691 multiple minors. The result is the list of minors, in the same 692 order as the passed nodes. 693 694 @type instance: string 695 @param instance: the instance for which we allocate minors 696 697 """ 698 assert isinstance(instance, basestring), \ 699 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance 700 701 d_map, duplicates = self._UnlockedComputeDRBDMap() 702 if duplicates: 703 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 704 str(duplicates)) 705 result = [] 706 for nname in nodes: 707 ndata = d_map[nname] 708 if not ndata: 709 # no minors used, we can start at 0 710 result.append(0) 711 ndata[0] = instance 712 self._temporary_drbds[(nname, 0)] = instance 713 continue 714 keys = ndata.keys() 715 keys.sort() 716 ffree = utils.FirstFree(keys) 717 if ffree is None: 718 # return the next minor 719 # TODO: implement high-limit check 720 minor = keys[-1] + 1 721 else: 722 minor = ffree 723 # double-check minor against current instances 724 assert minor not in d_map[nname], \ 725 ("Attempt to reuse allocated DRBD minor %d on node %s," 726 " already allocated to instance %s" % 727 (minor, nname, d_map[nname][minor])) 728 ndata[minor] = instance 729 # double-check minor against reservation 730 r_key = (nname, minor) 731 assert r_key not in self._temporary_drbds, \ 732 ("Attempt to reuse reserved DRBD minor %d on node %s," 733 " reserved for instance %s" % 734 (minor, nname, self._temporary_drbds[r_key])) 735 self._temporary_drbds[r_key] = instance 736 result.append(minor) 737 logging.debug("Request to allocate drbd minors, input: %s, returning %s", 738 nodes, result) 739 return result
740
741 - def _UnlockedReleaseDRBDMinors(self, instance):
742 """Release temporary drbd minors allocated for a given instance. 743 744 @type instance: string 745 @param instance: the instance for which temporary minors should be 746 released 747 748 """ 749 assert isinstance(instance, basestring), \ 750 "Invalid argument passed to ReleaseDRBDMinors" 751 for key, name in self._temporary_drbds.items(): 752 if name == instance: 753 del self._temporary_drbds[key]
754 755 @locking.ssynchronized(_config_lock)
756 - def ReleaseDRBDMinors(self, instance):
757 """Release temporary drbd minors allocated for a given instance. 758 759 This should be called on the error paths, on the success paths 760 it's automatically called by the ConfigWriter add and update 761 functions. 762 763 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}. 764 765 @type instance: string 766 @param instance: the instance for which temporary minors should be 767 released 768 769 """ 770 self._UnlockedReleaseDRBDMinors(instance)
771 772 @locking.ssynchronized(_config_lock, shared=1)
773 - def GetConfigVersion(self):
774 """Get the configuration version. 775 776 @return: Config version 777 778 """ 779 return self._config_data.version
780 781 @locking.ssynchronized(_config_lock, shared=1)
782 - def GetClusterName(self):
783 """Get cluster name. 784 785 @return: Cluster name 786 787 """ 788 return self._config_data.cluster.cluster_name
789 790 @locking.ssynchronized(_config_lock, shared=1)
791 - def GetMasterNode(self):
792 """Get the hostname of the master node for this cluster. 793 794 @return: Master hostname 795 796 """ 797 return self._config_data.cluster.master_node
798 799 @locking.ssynchronized(_config_lock, shared=1)
800 - def GetMasterIP(self):
801 """Get the IP of the master node for this cluster. 802 803 @return: Master IP 804 805 """ 806 return self._config_data.cluster.master_ip
807 808 @locking.ssynchronized(_config_lock, shared=1)
809 - def GetMasterNetdev(self):
810 """Get the master network device for this cluster. 811 812 """ 813 return self._config_data.cluster.master_netdev
814 815 @locking.ssynchronized(_config_lock, shared=1)
816 - def GetFileStorageDir(self):
817 """Get the file storage dir for this cluster. 818 819 """ 820 return self._config_data.cluster.file_storage_dir
821 822 @locking.ssynchronized(_config_lock, shared=1)
823 - def GetHypervisorType(self):
824 """Get the hypervisor type for this cluster. 825 826 """ 827 return self._config_data.cluster.enabled_hypervisors[0]
828 829 @locking.ssynchronized(_config_lock, shared=1)
830 - def GetHostKey(self):
831 """Return the rsa hostkey from the config. 832 833 @rtype: string 834 @return: the rsa hostkey 835 836 """ 837 return self._config_data.cluster.rsahostkeypub
838 839 @locking.ssynchronized(_config_lock, shared=1)
840 - def GetDefaultIAllocator(self):
841 """Get the default instance allocator for this cluster. 842 843 """ 844 return self._config_data.cluster.default_iallocator
845 846 @locking.ssynchronized(_config_lock, shared=1)
847 - def GetPrimaryIPFamily(self):
848 """Get cluster primary ip family. 849 850 @return: primary ip family 851 852 """ 853 return self._config_data.cluster.primary_ip_family
854 855 @locking.ssynchronized(_config_lock, shared=1)
856 - def LookupNodeGroup(self, target):
857 """Lookup a node group's UUID. 858 859 @type target: string or None 860 @param target: group name or UUID or None to look for the default 861 @rtype: string 862 @return: nodegroup UUID 863 @raises errors.OpPrereqError: when the target group cannot be found 864 865 """ 866 if target is None: 867 if len(self._config_data.nodegroups) != 1: 868 raise errors.OpPrereqError("More than one nodegroup exists. Target" 869 " group must be specified explicitely.") 870 else: 871 return self._config_data.nodegroups.keys()[0] 872 if target in self._config_data.nodegroups: 873 return target 874 for nodegroup in self._config_data.nodegroups.values(): 875 if nodegroup.name == target: 876 return nodegroup.uuid 877 raise errors.OpPrereqError("Nodegroup '%s' not found" % target)
878 879 @locking.ssynchronized(_config_lock, shared=1)
880 - def GetNodeGroup(self, uuid):
881 """Lookup a node group. 882 883 @type uuid: string 884 @param uuid: group UUID 885 @rtype: L{objects.NodeGroup} or None 886 @return: nodegroup object, or None if not found 887 888 """ 889 if uuid not in self._config_data.nodegroups: 890 return None 891 892 return self._config_data.nodegroups[uuid]
893 894 @locking.ssynchronized(_config_lock, shared=1)
895 - def GetAllNodeGroupsInfo(self):
896 """Get the configuration of all node groups. 897 898 """ 899 return dict(self._config_data.nodegroups)
900 901 @locking.ssynchronized(_config_lock, shared=1)
902 - def GetNodeGroupList(self):
903 """Get a list of node groups. 904 905 """ 906 return self._config_data.nodegroups.keys()
907 908 @locking.ssynchronized(_config_lock)
909 - def AddInstance(self, instance, ec_id):
910 """Add an instance to the config. 911 912 This should be used after creating a new instance. 913 914 @type instance: L{objects.Instance} 915 @param instance: the instance object 916 917 """ 918 if not isinstance(instance, objects.Instance): 919 raise errors.ProgrammerError("Invalid type passed to AddInstance") 920 921 if instance.disk_template != constants.DT_DISKLESS: 922 all_lvs = instance.MapLVsByNode() 923 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs) 924 925 all_macs = self._AllMACs() 926 for nic in instance.nics: 927 if nic.mac in all_macs: 928 raise errors.ConfigurationError("Cannot add instance %s:" 929 " MAC address '%s' already in use." % 930 (instance.name, nic.mac)) 931 932 self._EnsureUUID(instance, ec_id) 933 934 instance.serial_no = 1 935 instance.ctime = instance.mtime = time.time() 936 self._config_data.instances[instance.name] = instance 937 self._config_data.cluster.serial_no += 1 938 self._UnlockedReleaseDRBDMinors(instance.name) 939 self._WriteConfig()
940
941 - def _EnsureUUID(self, item, ec_id):
942 """Ensures a given object has a valid UUID. 943 944 @param item: the instance or node to be checked 945 @param ec_id: the execution context id for the uuid reservation 946 947 """ 948 if not item.uuid: 949 item.uuid = self._GenerateUniqueID(ec_id) 950 elif item.uuid in self._AllIDs(include_temporary=True): 951 raise errors.ConfigurationError("Cannot add '%s': UUID %s already" 952 " in use" % (item.name, item.uuid))
953
954 - def _SetInstanceStatus(self, instance_name, status):
955 """Set the instance's status to a given value. 956 957 """ 958 assert isinstance(status, bool), \ 959 "Invalid status '%s' passed to SetInstanceStatus" % (status,) 960 961 if instance_name not in self._config_data.instances: 962 raise errors.ConfigurationError("Unknown instance '%s'" % 963 instance_name) 964 instance = self._config_data.instances[instance_name] 965 if instance.admin_up != status: 966 instance.admin_up = status 967 instance.serial_no += 1 968 instance.mtime = time.time() 969 self._WriteConfig()
970 971 @locking.ssynchronized(_config_lock)
972 - def MarkInstanceUp(self, instance_name):
973 """Mark the instance status to up in the config. 974 975 """ 976 self._SetInstanceStatus(instance_name, True)
977 978 @locking.ssynchronized(_config_lock)
979 - def RemoveInstance(self, instance_name):
980 """Remove the instance from the configuration. 981 982 """ 983 if instance_name not in self._config_data.instances: 984 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) 985 del self._config_data.instances[instance_name] 986 self._config_data.cluster.serial_no += 1 987 self._WriteConfig()
988 989 @locking.ssynchronized(_config_lock)
990 - def RenameInstance(self, old_name, new_name):
991 """Rename an instance. 992 993 This needs to be done in ConfigWriter and not by RemoveInstance 994 combined with AddInstance as only we can guarantee an atomic 995 rename. 996 997 """ 998 if old_name not in self._config_data.instances: 999 raise errors.ConfigurationError("Unknown instance '%s'" % old_name) 1000 inst = self._config_data.instances[old_name] 1001 del self._config_data.instances[old_name] 1002 inst.name = new_name 1003 1004 for disk in inst.disks: 1005 if disk.dev_type == constants.LD_FILE: 1006 # rename the file paths in logical and physical id 1007 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1])) 1008 disk_fname = "disk%s" % disk.iv_name.split("/")[1] 1009 disk.physical_id = disk.logical_id = (disk.logical_id[0], 1010 utils.PathJoin(file_storage_dir, 1011 inst.name, 1012 disk_fname)) 1013 1014 # Force update of ssconf files 1015 self._config_data.cluster.serial_no += 1 1016 1017 self._config_data.instances[inst.name] = inst 1018 self._WriteConfig()
1019 1020 @locking.ssynchronized(_config_lock)
1021 - def MarkInstanceDown(self, instance_name):
1022 """Mark the status of an instance to down in the configuration. 1023 1024 """ 1025 self._SetInstanceStatus(instance_name, False)
1026
1027 - def _UnlockedGetInstanceList(self):
1028 """Get the list of instances. 1029 1030 This function is for internal use, when the config lock is already held. 1031 1032 """ 1033 return self._config_data.instances.keys()
1034 1035 @locking.ssynchronized(_config_lock, shared=1)
1036 - def GetInstanceList(self):
1037 """Get the list of instances. 1038 1039 @return: array of instances, ex. ['instance2.example.com', 1040 'instance1.example.com'] 1041 1042 """ 1043 return self._UnlockedGetInstanceList()
1044 1045 @locking.ssynchronized(_config_lock, shared=1)
1046 - def ExpandInstanceName(self, short_name):
1047 """Attempt to expand an incomplete instance name. 1048 1049 """ 1050 return utils.MatchNameComponent(short_name, 1051 self._config_data.instances.keys(), 1052 case_sensitive=False)
1053
1054 - def _UnlockedGetInstanceInfo(self, instance_name):
1055 """Returns information about an instance. 1056 1057 This function is for internal use, when the config lock is already held. 1058 1059 """ 1060 if instance_name not in self._config_data.instances: 1061 return None 1062 1063 return self._config_data.instances[instance_name]
1064 1065 @locking.ssynchronized(_config_lock, shared=1)
1066 - def GetInstanceInfo(self, instance_name):
1067 """Returns information about an instance. 1068 1069 It takes the information from the configuration file. Other information of 1070 an instance are taken from the live systems. 1071 1072 @param instance_name: name of the instance, e.g. 1073 I{instance1.example.com} 1074 1075 @rtype: L{objects.Instance} 1076 @return: the instance object 1077 1078 """ 1079 return self._UnlockedGetInstanceInfo(instance_name)
1080 1081 @locking.ssynchronized(_config_lock, shared=1)
1082 - def GetAllInstancesInfo(self):
1083 """Get the configuration of all instances. 1084 1085 @rtype: dict 1086 @return: dict of (instance, instance_info), where instance_info is what 1087 would GetInstanceInfo return for the node 1088 1089 """ 1090 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance)) 1091 for instance in self._UnlockedGetInstanceList()]) 1092 return my_dict
1093 1094 @locking.ssynchronized(_config_lock)
1095 - def AddNode(self, node, ec_id):
1096 """Add a node to the configuration. 1097 1098 @type node: L{objects.Node} 1099 @param node: a Node instance 1100 1101 """ 1102 logging.info("Adding node %s to configuration", node.name) 1103 1104 self._EnsureUUID(node, ec_id) 1105 1106 node.serial_no = 1 1107 node.ctime = node.mtime = time.time() 1108 self._UnlockedAddNodeToGroup(node.name, node.group) 1109 self._config_data.nodes[node.name] = node 1110 self._config_data.cluster.serial_no += 1 1111 self._WriteConfig()
1112 1113 @locking.ssynchronized(_config_lock)
1114 - def RemoveNode(self, node_name):
1115 """Remove a node from the configuration. 1116 1117 """ 1118 logging.info("Removing node %s from configuration", node_name) 1119 1120 if node_name not in self._config_data.nodes: 1121 raise errors.ConfigurationError("Unknown node '%s'" % node_name) 1122 1123 self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name]) 1124 del self._config_data.nodes[node_name] 1125 self._config_data.cluster.serial_no += 1 1126 self._WriteConfig()
1127 1128 @locking.ssynchronized(_config_lock, shared=1)
1129 - def ExpandNodeName(self, short_name):
1130 """Attempt to expand an incomplete instance name. 1131 1132 """ 1133 return utils.MatchNameComponent(short_name, 1134 self._config_data.nodes.keys(), 1135 case_sensitive=False)
1136
1137 - def _UnlockedGetNodeInfo(self, node_name):
1138 """Get the configuration of a node, as stored in the config. 1139 1140 This function is for internal use, when the config lock is already 1141 held. 1142 1143 @param node_name: the node name, e.g. I{node1.example.com} 1144 1145 @rtype: L{objects.Node} 1146 @return: the node object 1147 1148 """ 1149 if node_name not in self._config_data.nodes: 1150 return None 1151 1152 return self._config_data.nodes[node_name]
1153 1154 @locking.ssynchronized(_config_lock, shared=1)
1155 - def GetNodeInfo(self, node_name):
1156 """Get the configuration of a node, as stored in the config. 1157 1158 This is just a locked wrapper over L{_UnlockedGetNodeInfo}. 1159 1160 @param node_name: the node name, e.g. I{node1.example.com} 1161 1162 @rtype: L{objects.Node} 1163 @return: the node object 1164 1165 """ 1166 return self._UnlockedGetNodeInfo(node_name)
1167 1168 @locking.ssynchronized(_config_lock, shared=1)
1169 - def GetNodeInstances(self, node_name):
1170 """Get the instances of a node, as stored in the config. 1171 1172 @param node_name: the node name, e.g. I{node1.example.com} 1173 1174 @rtype: (list, list) 1175 @return: a tuple with two lists: the primary and the secondary instances 1176 1177 """ 1178 pri = [] 1179 sec = [] 1180 for inst in self._config_data.instances.values(): 1181 if inst.primary_node == node_name: 1182 pri.append(inst.name) 1183 if node_name in inst.secondary_nodes: 1184 sec.append(inst.name) 1185 return (pri, sec)
1186
1187 - def _UnlockedGetNodeList(self):
1188 """Return the list of nodes which are in the configuration. 1189 1190 This function is for internal use, when the config lock is already 1191 held. 1192 1193 @rtype: list 1194 1195 """ 1196 return self._config_data.nodes.keys()
1197 1198 @locking.ssynchronized(_config_lock, shared=1)
1199 - def GetNodeList(self):
1200 """Return the list of nodes which are in the configuration. 1201 1202 """ 1203 return self._UnlockedGetNodeList()
1204
1205 - def _UnlockedGetOnlineNodeList(self):
1206 """Return the list of nodes which are online. 1207 1208 """ 1209 all_nodes = [self._UnlockedGetNodeInfo(node) 1210 for node in self._UnlockedGetNodeList()] 1211 return [node.name for node in all_nodes if not node.offline]
1212 1213 @locking.ssynchronized(_config_lock, shared=1)
1214 - def GetOnlineNodeList(self):
1215 """Return the list of nodes which are online. 1216 1217 """ 1218 return self._UnlockedGetOnlineNodeList()
1219 1220 @locking.ssynchronized(_config_lock, shared=1)
1221 - def GetNonVmCapableNodeList(self):
1222 """Return the list of nodes which are not vm capable. 1223 1224 """ 1225 all_nodes = [self._UnlockedGetNodeInfo(node) 1226 for node in self._UnlockedGetNodeList()] 1227 return [node.name for node in all_nodes if not node.vm_capable]
1228 1229 @locking.ssynchronized(_config_lock, shared=1)
1230 - def GetAllNodesInfo(self):
1231 """Get the configuration of all nodes. 1232 1233 @rtype: dict 1234 @return: dict of (node, node_info), where node_info is what 1235 would GetNodeInfo return for the node 1236 1237 """ 1238 my_dict = dict([(node, self._UnlockedGetNodeInfo(node)) 1239 for node in self._UnlockedGetNodeList()]) 1240 return my_dict
1241
1242 - def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1243 """Get the number of current and maximum desired and possible candidates. 1244 1245 @type exceptions: list 1246 @param exceptions: if passed, list of nodes that should be ignored 1247 @rtype: tuple 1248 @return: tuple of (current, desired and possible, possible) 1249 1250 """ 1251 mc_now = mc_should = mc_max = 0 1252 for node in self._config_data.nodes.values(): 1253 if exceptions and node.name in exceptions: 1254 continue 1255 if not (node.offline or node.drained) and node.master_capable: 1256 mc_max += 1 1257 if node.master_candidate: 1258 mc_now += 1 1259 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size) 1260 return (mc_now, mc_should, mc_max)
1261 1262 @locking.ssynchronized(_config_lock, shared=1)
1263 - def GetMasterCandidateStats(self, exceptions=None):
1264 """Get the number of current and maximum possible candidates. 1265 1266 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}. 1267 1268 @type exceptions: list 1269 @param exceptions: if passed, list of nodes that should be ignored 1270 @rtype: tuple 1271 @return: tuple of (current, max) 1272 1273 """ 1274 return self._UnlockedGetMasterCandidateStats(exceptions)
1275 1276 @locking.ssynchronized(_config_lock)
1277 - def MaintainCandidatePool(self, exceptions):
1278 """Try to grow the candidate pool to the desired size. 1279 1280 @type exceptions: list 1281 @param exceptions: if passed, list of nodes that should be ignored 1282 @rtype: list 1283 @return: list with the adjusted nodes (L{objects.Node} instances) 1284 1285 """ 1286 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions) 1287 mod_list = [] 1288 if mc_now < mc_max: 1289 node_list = self._config_data.nodes.keys() 1290 random.shuffle(node_list) 1291 for name in node_list: 1292 if mc_now >= mc_max: 1293 break 1294 node = self._config_data.nodes[name] 1295 if (node.master_candidate or node.offline or node.drained or 1296 node.name in exceptions or not node.master_capable): 1297 continue 1298 mod_list.append(node) 1299 node.master_candidate = True 1300 node.serial_no += 1 1301 mc_now += 1 1302 if mc_now != mc_max: 1303 # this should not happen 1304 logging.warning("Warning: MaintainCandidatePool didn't manage to" 1305 " fill the candidate pool (%d/%d)", mc_now, mc_max) 1306 if mod_list: 1307 self._config_data.cluster.serial_no += 1 1308 self._WriteConfig() 1309 1310 return mod_list
1311
1312 - def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1313 """Add a given node to the specified group. 1314 1315 """ 1316 if nodegroup_uuid not in self._config_data.nodegroups: 1317 # This can happen if a node group gets deleted between its lookup and 1318 # when we're adding the first node to it, since we don't keep a lock in 1319 # the meantime. It's ok though, as we'll fail cleanly if the node group 1320 # is not found anymore. 1321 raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid) 1322 if node_name not in self._config_data.nodegroups[nodegroup_uuid].members: 1323 self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1324
1325 - def _UnlockedRemoveNodeFromGroup(self, node):
1326 """Remove a given node from its group. 1327 1328 """ 1329 nodegroup = node.group 1330 if nodegroup not in self._config_data.nodegroups: 1331 logging.warning("Warning: node '%s' has unknown node group '%s'" 1332 " (while being removed from it)", node.name, nodegroup) 1333 nodegroup_obj = self._config_data.nodegroups[nodegroup] 1334 if node.name not in nodegroup_obj.members: 1335 logging.warning("Warning: node '%s' not a member of its node group '%s'" 1336 " (while being removed from it)", node.name, nodegroup) 1337 else: 1338 nodegroup_obj.members.remove(node.name)
1339
1340 - def _BumpSerialNo(self):
1341 """Bump up the serial number of the config. 1342 1343 """ 1344 self._config_data.serial_no += 1 1345 self._config_data.mtime = time.time()
1346
1347 - def _AllUUIDObjects(self):
1348 """Returns all objects with uuid attributes. 1349 1350 """ 1351 return (self._config_data.instances.values() + 1352 self._config_data.nodes.values() + 1353 self._config_data.nodegroups.values() + 1354 [self._config_data.cluster])
1355
1356 - def _OpenConfig(self, accept_foreign):
1357 """Read the config data from disk. 1358 1359 """ 1360 raw_data = utils.ReadFile(self._cfg_file) 1361 1362 try: 1363 data = objects.ConfigData.FromDict(serializer.Load(raw_data)) 1364 except Exception, err: 1365 raise errors.ConfigurationError(err) 1366 1367 # Make sure the configuration has the right version 1368 _ValidateConfig(data) 1369 1370 if (not hasattr(data, 'cluster') or 1371 not hasattr(data.cluster, 'rsahostkeypub')): 1372 raise errors.ConfigurationError("Incomplete configuration" 1373 " (missing cluster.rsahostkeypub)") 1374 1375 if data.cluster.master_node != self._my_hostname and not accept_foreign: 1376 msg = ("The configuration denotes node %s as master, while my" 1377 " hostname is %s; opening a foreign configuration is only" 1378 " possible in accept_foreign mode" % 1379 (data.cluster.master_node, self._my_hostname)) 1380 raise errors.ConfigurationError(msg) 1381 1382 # Upgrade configuration if needed 1383 data.UpgradeConfig() 1384 1385 self._config_data = data 1386 # reset the last serial as -1 so that the next write will cause 1387 # ssconf update 1388 self._last_cluster_serial = -1 1389 1390 # And finally run our (custom) config upgrade sequence 1391 self._UpgradeConfig() 1392 1393 self._cfg_id = utils.GetFileID(path=self._cfg_file)
1394
1395 - def _UpgradeConfig(self):
1396 """Run upgrade steps that cannot be done purely in the objects. 1397 1398 This is because some data elements need uniqueness across the 1399 whole configuration, etc. 1400 1401 @warning: this function will call L{_WriteConfig()}, but also 1402 L{DropECReservations} so it needs to be called only from a 1403 "safe" place (the constructor). If one wanted to call it with 1404 the lock held, a DropECReservationUnlocked would need to be 1405 created first, to avoid causing deadlock. 1406 1407 """ 1408 modified = False 1409 for item in self._AllUUIDObjects(): 1410 if item.uuid is None: 1411 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID) 1412 modified = True 1413 if not self._config_data.nodegroups: 1414 default_nodegroup_uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID) 1415 default_nodegroup = objects.NodeGroup( 1416 uuid=default_nodegroup_uuid, 1417 name="default", 1418 members=[], 1419 ) 1420 self._config_data.nodegroups[default_nodegroup_uuid] = default_nodegroup 1421 modified = True 1422 for node in self._config_data.nodes.values(): 1423 if not node.group: 1424 node.group = self.LookupNodeGroup(None) 1425 modified = True 1426 # This is technically *not* an upgrade, but needs to be done both when 1427 # nodegroups are being added, and upon normally loading the config, 1428 # because the members list of a node group is discarded upon 1429 # serializing/deserializing the object. 1430 self._UnlockedAddNodeToGroup(node.name, node.group) 1431 if modified: 1432 self._WriteConfig() 1433 # This is ok even if it acquires the internal lock, as _UpgradeConfig is 1434 # only called at config init time, without the lock held 1435 self.DropECReservations(_UPGRADE_CONFIG_JID)
1436
1437 - def _DistributeConfig(self, feedback_fn):
1438 """Distribute the configuration to the other nodes. 1439 1440 Currently, this only copies the configuration file. In the future, 1441 it could be used to encapsulate the 2/3-phase update mechanism. 1442 1443 """ 1444 if self._offline: 1445 return True 1446 1447 bad = False 1448 1449 node_list = [] 1450 addr_list = [] 1451 myhostname = self._my_hostname 1452 # we can skip checking whether _UnlockedGetNodeInfo returns None 1453 # since the node list comes from _UnlocketGetNodeList, and we are 1454 # called with the lock held, so no modifications should take place 1455 # in between 1456 for node_name in self._UnlockedGetNodeList(): 1457 if node_name == myhostname: 1458 continue 1459 node_info = self._UnlockedGetNodeInfo(node_name) 1460 if not node_info.master_candidate: 1461 continue 1462 node_list.append(node_info.name) 1463 addr_list.append(node_info.primary_ip) 1464 1465 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file, 1466 address_list=addr_list) 1467 for to_node, to_result in result.items(): 1468 msg = to_result.fail_msg 1469 if msg: 1470 msg = ("Copy of file %s to node %s failed: %s" % 1471 (self._cfg_file, to_node, msg)) 1472 logging.error(msg) 1473 1474 if feedback_fn: 1475 feedback_fn(msg) 1476 1477 bad = True 1478 1479 return not bad
1480
1481 - def _WriteConfig(self, destination=None, feedback_fn=None):
1482 """Write the configuration data to persistent storage. 1483 1484 """ 1485 assert feedback_fn is None or callable(feedback_fn) 1486 1487 # Warn on config errors, but don't abort the save - the 1488 # configuration has already been modified, and we can't revert; 1489 # the best we can do is to warn the user and save as is, leaving 1490 # recovery to the user 1491 config_errors = self._UnlockedVerifyConfig() 1492 if config_errors: 1493 errmsg = ("Configuration data is not consistent: %s" % 1494 (utils.CommaJoin(config_errors))) 1495 logging.critical(errmsg) 1496 if feedback_fn: 1497 feedback_fn(errmsg) 1498 1499 if destination is None: 1500 destination = self._cfg_file 1501 self._BumpSerialNo() 1502 txt = serializer.Dump(self._config_data.ToDict()) 1503 1504 getents = self._getents() 1505 try: 1506 fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt, 1507 close=False, gid=getents.confd_gid, mode=0640) 1508 except errors.LockError: 1509 raise errors.ConfigurationError("The configuration file has been" 1510 " modified since the last write, cannot" 1511 " update") 1512 try: 1513 self._cfg_id = utils.GetFileID(fd=fd) 1514 finally: 1515 os.close(fd) 1516 1517 self.write_count += 1 1518 1519 # and redistribute the config file to master candidates 1520 self._DistributeConfig(feedback_fn) 1521 1522 # Write ssconf files on all nodes (including locally) 1523 if self._last_cluster_serial < self._config_data.cluster.serial_no: 1524 if not self._offline: 1525 result = rpc.RpcRunner.call_write_ssconf_files( 1526 self._UnlockedGetOnlineNodeList(), 1527 self._UnlockedGetSsconfValues()) 1528 1529 for nname, nresu in result.items(): 1530 msg = nresu.fail_msg 1531 if msg: 1532 errmsg = ("Error while uploading ssconf files to" 1533 " node %s: %s" % (nname, msg)) 1534 logging.warning(errmsg) 1535 1536 if feedback_fn: 1537 feedback_fn(errmsg) 1538 1539 self._last_cluster_serial = self._config_data.cluster.serial_no
1540
1541 - def _UnlockedGetSsconfValues(self):
1542 """Return the values needed by ssconf. 1543 1544 @rtype: dict 1545 @return: a dictionary with keys the ssconf names and values their 1546 associated value 1547 1548 """ 1549 fn = "\n".join 1550 instance_names = utils.NiceSort(self._UnlockedGetInstanceList()) 1551 node_names = utils.NiceSort(self._UnlockedGetNodeList()) 1552 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names] 1553 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip) 1554 for ninfo in node_info] 1555 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip) 1556 for ninfo in node_info] 1557 1558 instance_data = fn(instance_names) 1559 off_data = fn(node.name for node in node_info if node.offline) 1560 on_data = fn(node.name for node in node_info if not node.offline) 1561 mc_data = fn(node.name for node in node_info if node.master_candidate) 1562 mc_ips_data = fn(node.primary_ip for node in node_info 1563 if node.master_candidate) 1564 node_data = fn(node_names) 1565 node_pri_ips_data = fn(node_pri_ips) 1566 node_snd_ips_data = fn(node_snd_ips) 1567 1568 cluster = self._config_data.cluster 1569 cluster_tags = fn(cluster.GetTags()) 1570 1571 hypervisor_list = fn(cluster.enabled_hypervisors) 1572 1573 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n") 1574 1575 nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in 1576 self._config_data.nodegroups.values()] 1577 nodegroups_data = fn(utils.NiceSort(nodegroups)) 1578 1579 return { 1580 constants.SS_CLUSTER_NAME: cluster.cluster_name, 1581 constants.SS_CLUSTER_TAGS: cluster_tags, 1582 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir, 1583 constants.SS_MASTER_CANDIDATES: mc_data, 1584 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data, 1585 constants.SS_MASTER_IP: cluster.master_ip, 1586 constants.SS_MASTER_NETDEV: cluster.master_netdev, 1587 constants.SS_MASTER_NODE: cluster.master_node, 1588 constants.SS_NODE_LIST: node_data, 1589 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data, 1590 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data, 1591 constants.SS_OFFLINE_NODES: off_data, 1592 constants.SS_ONLINE_NODES: on_data, 1593 constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family), 1594 constants.SS_INSTANCE_LIST: instance_data, 1595 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION, 1596 constants.SS_HYPERVISOR_LIST: hypervisor_list, 1597 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health), 1598 constants.SS_UID_POOL: uid_pool, 1599 constants.SS_NODEGROUPS: nodegroups_data, 1600 }
1601 1602 @locking.ssynchronized(_config_lock, shared=1)
1603 - def GetSsconfValues(self):
1604 """Wrapper using lock around _UnlockedGetSsconf(). 1605 1606 """ 1607 return self._UnlockedGetSsconfValues()
1608 1609 @locking.ssynchronized(_config_lock, shared=1)
1610 - def GetVGName(self):
1611 """Return the volume group name. 1612 1613 """ 1614 return self._config_data.cluster.volume_group_name
1615 1616 @locking.ssynchronized(_config_lock)
1617 - def SetVGName(self, vg_name):
1618 """Set the volume group name. 1619 1620 """ 1621 self._config_data.cluster.volume_group_name = vg_name 1622 self._config_data.cluster.serial_no += 1 1623 self._WriteConfig()
1624 1625 @locking.ssynchronized(_config_lock, shared=1)
1626 - def GetDRBDHelper(self):
1627 """Return DRBD usermode helper. 1628 1629 """ 1630 return self._config_data.cluster.drbd_usermode_helper
1631 1632 @locking.ssynchronized(_config_lock)
1633 - def SetDRBDHelper(self, drbd_helper):
1634 """Set DRBD usermode helper. 1635 1636 """ 1637 self._config_data.cluster.drbd_usermode_helper = drbd_helper 1638 self._config_data.cluster.serial_no += 1 1639 self._WriteConfig()
1640 1641 @locking.ssynchronized(_config_lock, shared=1)
1642 - def GetMACPrefix(self):
1643 """Return the mac prefix. 1644 1645 """ 1646 return self._config_data.cluster.mac_prefix
1647 1648 @locking.ssynchronized(_config_lock, shared=1)
1649 - def GetClusterInfo(self):
1650 """Returns information about the cluster 1651 1652 @rtype: L{objects.Cluster} 1653 @return: the cluster object 1654 1655 """ 1656 return self._config_data.cluster
1657 1658 @locking.ssynchronized(_config_lock, shared=1)
1659 - def HasAnyDiskOfType(self, dev_type):
1660 """Check if in there is at disk of the given type in the configuration. 1661 1662 """ 1663 return self._config_data.HasAnyDiskOfType(dev_type)
1664 1665 @locking.ssynchronized(_config_lock)
1666 - def Update(self, target, feedback_fn):
1667 """Notify function to be called after updates. 1668 1669 This function must be called when an object (as returned by 1670 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the 1671 caller wants the modifications saved to the backing store. Note 1672 that all modified objects will be saved, but the target argument 1673 is the one the caller wants to ensure that it's saved. 1674 1675 @param target: an instance of either L{objects.Cluster}, 1676 L{objects.Node} or L{objects.Instance} which is existing in 1677 the cluster 1678 @param feedback_fn: Callable feedback function 1679 1680 """ 1681 if self._config_data is None: 1682 raise errors.ProgrammerError("Configuration file not read," 1683 " cannot save.") 1684 update_serial = False 1685 if isinstance(target, objects.Cluster): 1686 test = target == self._config_data.cluster 1687 elif isinstance(target, objects.Node): 1688 test = target in self._config_data.nodes.values() 1689 update_serial = True 1690 elif isinstance(target, objects.Instance): 1691 test = target in self._config_data.instances.values() 1692 else: 1693 raise errors.ProgrammerError("Invalid object type (%s) passed to" 1694 " ConfigWriter.Update" % type(target)) 1695 if not test: 1696 raise errors.ConfigurationError("Configuration updated since object" 1697 " has been read or unknown object") 1698 target.serial_no += 1 1699 target.mtime = now = time.time() 1700 1701 if update_serial: 1702 # for node updates, we need to increase the cluster serial too 1703 self._config_data.cluster.serial_no += 1 1704 self._config_data.cluster.mtime = now 1705 1706 if isinstance(target, objects.Instance): 1707 self._UnlockedReleaseDRBDMinors(target.name) 1708 1709 self._WriteConfig(feedback_fn=feedback_fn)
1710 1711 @locking.ssynchronized(_config_lock)
1712 - def DropECReservations(self, ec_id):
1713 """Drop per-execution-context reservations 1714 1715 """ 1716 for rm in self._all_rms: 1717 rm.DropECReservations(ec_id)
1718