Package ganeti :: Module config
[hide private]
[frames] | no frames]

Source Code for Module ganeti.config

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Configuration management for Ganeti 
  23   
  24  This module provides the interface to the Ganeti cluster configuration. 
  25   
  26  The configuration data is stored on every node but is updated on the master 
  27  only. After each update, the master distributes the data to the other nodes. 
  28   
  29  Currently, the data storage format is JSON. YAML was slow and consuming too 
  30  much memory. 
  31   
  32  """ 
  33   
  34  # pylint: disable=R0904 
  35  # R0904: Too many public methods 
  36   
  37  import copy 
  38  import os 
  39  import random 
  40  import logging 
  41  import time 
  42  import itertools 
  43   
  44  from ganeti import errors 
  45  from ganeti import locking 
  46  from ganeti import utils 
  47  from ganeti import constants 
  48  from ganeti import rpc 
  49  from ganeti import objects 
  50  from ganeti import serializer 
  51  from ganeti import uidpool 
  52  from ganeti import netutils 
  53  from ganeti import runtime 
  54  from ganeti import pathutils 
  55  from ganeti import network 
  56   
  57   
  58  _config_lock = locking.SharedLock("ConfigWriter") 
  59   
  60  # job id used for resource management at config upgrade time 
  61  _UPGRADE_CONFIG_JID = "jid-cfg-upgrade" 
62 63 64 -def _ValidateConfig(data):
65 """Verifies that a configuration objects looks valid. 66 67 This only verifies the version of the configuration. 68 69 @raise errors.ConfigurationError: if the version differs from what 70 we expect 71 72 """ 73 if data.version != constants.CONFIG_VERSION: 74 raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
75
76 77 -class TemporaryReservationManager:
78 """A temporary resource reservation manager. 79 80 This is used to reserve resources in a job, before using them, making sure 81 other jobs cannot get them in the meantime. 82 83 """
84 - def __init__(self):
85 self._ec_reserved = {}
86
87 - def Reserved(self, resource):
88 for holder_reserved in self._ec_reserved.values(): 89 if resource in holder_reserved: 90 return True 91 return False
92
93 - def Reserve(self, ec_id, resource):
94 if self.Reserved(resource): 95 raise errors.ReservationError("Duplicate reservation for resource '%s'" 96 % str(resource)) 97 if ec_id not in self._ec_reserved: 98 self._ec_reserved[ec_id] = set([resource]) 99 else: 100 self._ec_reserved[ec_id].add(resource)
101
102 - def DropECReservations(self, ec_id):
103 if ec_id in self._ec_reserved: 104 del self._ec_reserved[ec_id]
105
106 - def GetReserved(self):
107 all_reserved = set() 108 for holder_reserved in self._ec_reserved.values(): 109 all_reserved.update(holder_reserved) 110 return all_reserved
111
112 - def GetECReserved(self, ec_id):
113 """ Used when you want to retrieve all reservations for a specific 114 execution context. E.g when commiting reserved IPs for a specific 115 network. 116 117 """ 118 ec_reserved = set() 119 if ec_id in self._ec_reserved: 120 ec_reserved.update(self._ec_reserved[ec_id]) 121 return ec_reserved
122
123 - def Generate(self, existing, generate_one_fn, ec_id):
124 """Generate a new resource of this type 125 126 """ 127 assert callable(generate_one_fn) 128 129 all_elems = self.GetReserved() 130 all_elems.update(existing) 131 retries = 64 132 while retries > 0: 133 new_resource = generate_one_fn() 134 if new_resource is not None and new_resource not in all_elems: 135 break 136 else: 137 raise errors.ConfigurationError("Not able generate new resource" 138 " (last tried: %s)" % new_resource) 139 self.Reserve(ec_id, new_resource) 140 return new_resource
141
142 143 -def _MatchNameComponentIgnoreCase(short_name, names):
144 """Wrapper around L{utils.text.MatchNameComponent}. 145 146 """ 147 return utils.MatchNameComponent(short_name, names, case_sensitive=False)
148
149 150 -def _CheckInstanceDiskIvNames(disks):
151 """Checks if instance's disks' C{iv_name} attributes are in order. 152 153 @type disks: list of L{objects.Disk} 154 @param disks: List of disks 155 @rtype: list of tuples; (int, string, string) 156 @return: List of wrongly named disks, each tuple contains disk index, 157 expected and actual name 158 159 """ 160 result = [] 161 162 for (idx, disk) in enumerate(disks): 163 exp_iv_name = "disk/%s" % idx 164 if disk.iv_name != exp_iv_name: 165 result.append((idx, exp_iv_name, disk.iv_name)) 166 167 return result
168
169 170 -class ConfigWriter:
171 """The interface to the cluster configuration. 172 173 @ivar _temporary_lvs: reservation manager for temporary LVs 174 @ivar _all_rms: a list of all temporary reservation managers 175 176 """
177 - def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts, 178 accept_foreign=False):
179 self.write_count = 0 180 self._lock = _config_lock 181 self._config_data = None 182 self._offline = offline 183 if cfg_file is None: 184 self._cfg_file = pathutils.CLUSTER_CONF_FILE 185 else: 186 self._cfg_file = cfg_file 187 self._getents = _getents 188 self._temporary_ids = TemporaryReservationManager() 189 self._temporary_drbds = {} 190 self._temporary_macs = TemporaryReservationManager() 191 self._temporary_secrets = TemporaryReservationManager() 192 self._temporary_lvs = TemporaryReservationManager() 193 self._temporary_ips = TemporaryReservationManager() 194 self._all_rms = [self._temporary_ids, self._temporary_macs, 195 self._temporary_secrets, self._temporary_lvs, 196 self._temporary_ips] 197 # Note: in order to prevent errors when resolving our name in 198 # _DistributeConfig, we compute it here once and reuse it; it's 199 # better to raise an error before starting to modify the config 200 # file than after it was modified 201 self._my_hostname = netutils.Hostname.GetSysName() 202 self._last_cluster_serial = -1 203 self._cfg_id = None 204 self._context = None 205 self._OpenConfig(accept_foreign)
206
207 - def _GetRpc(self, address_list):
208 """Returns RPC runner for configuration. 209 210 """ 211 return rpc.ConfigRunner(self._context, address_list)
212
213 - def SetContext(self, context):
214 """Sets Ganeti context. 215 216 """ 217 self._context = context
218 219 # this method needs to be static, so that we can call it on the class 220 @staticmethod
221 - def IsCluster():
222 """Check if the cluster is configured. 223 224 """ 225 return os.path.exists(pathutils.CLUSTER_CONF_FILE)
226 227 @locking.ssynchronized(_config_lock, shared=1)
228 - def GetNdParams(self, node):
229 """Get the node params populated with cluster defaults. 230 231 @type node: L{objects.Node} 232 @param node: The node we want to know the params for 233 @return: A dict with the filled in node params 234 235 """ 236 nodegroup = self._UnlockedGetNodeGroup(node.group) 237 return self._config_data.cluster.FillND(node, nodegroup)
238 239 @locking.ssynchronized(_config_lock, shared=1)
240 - def GetInstanceDiskParams(self, instance):
241 """Get the disk params populated with inherit chain. 242 243 @type instance: L{objects.Instance} 244 @param instance: The instance we want to know the params for 245 @return: A dict with the filled in disk params 246 247 """ 248 node = self._UnlockedGetNodeInfo(instance.primary_node) 249 nodegroup = self._UnlockedGetNodeGroup(node.group) 250 return self._UnlockedGetGroupDiskParams(nodegroup)
251 252 @locking.ssynchronized(_config_lock, shared=1)
253 - def GetGroupDiskParams(self, group):
254 """Get the disk params populated with inherit chain. 255 256 @type group: L{objects.NodeGroup} 257 @param group: The group we want to know the params for 258 @return: A dict with the filled in disk params 259 260 """ 261 return self._UnlockedGetGroupDiskParams(group)
262
263 - def _UnlockedGetGroupDiskParams(self, group):
264 """Get the disk params populated with inherit chain down to node-group. 265 266 @type group: L{objects.NodeGroup} 267 @param group: The group we want to know the params for 268 @return: A dict with the filled in disk params 269 270 """ 271 return self._config_data.cluster.SimpleFillDP(group.diskparams)
272
273 - def _UnlockedGetNetworkMACPrefix(self, net_uuid):
274 """Return the network mac prefix if it exists or the cluster level default. 275 276 """ 277 prefix = None 278 if net_uuid: 279 nobj = self._UnlockedGetNetwork(net_uuid) 280 if nobj.mac_prefix: 281 prefix = nobj.mac_prefix 282 283 return prefix
284
285 - def _GenerateOneMAC(self, prefix=None):
286 """Return a function that randomly generates a MAC suffic 287 and appends it to the given prefix. If prefix is not given get 288 the cluster level default. 289 290 """ 291 if not prefix: 292 prefix = self._config_data.cluster.mac_prefix 293 294 def GenMac(): 295 byte1 = random.randrange(0, 256) 296 byte2 = random.randrange(0, 256) 297 byte3 = random.randrange(0, 256) 298 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3) 299 return mac
300 301 return GenMac
302 303 @locking.ssynchronized(_config_lock, shared=1)
304 - def GenerateMAC(self, net_uuid, ec_id):
305 """Generate a MAC for an instance. 306 307 This should check the current instances for duplicates. 308 309 """ 310 existing = self._AllMACs() 311 prefix = self._UnlockedGetNetworkMACPrefix(net_uuid) 312 gen_mac = self._GenerateOneMAC(prefix) 313 return self._temporary_ids.Generate(existing, gen_mac, ec_id)
314 315 @locking.ssynchronized(_config_lock, shared=1)
316 - def ReserveMAC(self, mac, ec_id):
317 """Reserve a MAC for an instance. 318 319 This only checks instances managed by this cluster, it does not 320 check for potential collisions elsewhere. 321 322 """ 323 all_macs = self._AllMACs() 324 if mac in all_macs: 325 raise errors.ReservationError("mac already in use") 326 else: 327 self._temporary_macs.Reserve(ec_id, mac)
328
329 - def _UnlockedCommitTemporaryIps(self, ec_id):
330 """Commit all reserved IP address to their respective pools 331 332 """ 333 for action, address, net_uuid in self._temporary_ips.GetECReserved(ec_id): 334 self._UnlockedCommitIp(action, net_uuid, address)
335
336 - def _UnlockedCommitIp(self, action, net_uuid, address):
337 """Commit a reserved IP address to an IP pool. 338 339 The IP address is taken from the network's IP pool and marked as reserved. 340 341 """ 342 nobj = self._UnlockedGetNetwork(net_uuid) 343 pool = network.AddressPool(nobj) 344 if action == constants.RESERVE_ACTION: 345 pool.Reserve(address) 346 elif action == constants.RELEASE_ACTION: 347 pool.Release(address)
348
349 - def _UnlockedReleaseIp(self, net_uuid, address, ec_id):
350 """Give a specific IP address back to an IP pool. 351 352 The IP address is returned to the IP pool designated by pool_id and marked 353 as reserved. 354 355 """ 356 self._temporary_ips.Reserve(ec_id, 357 (constants.RELEASE_ACTION, address, net_uuid))
358 359 @locking.ssynchronized(_config_lock, shared=1)
360 - def ReleaseIp(self, net_uuid, address, ec_id):
361 """Give a specified IP address back to an IP pool. 362 363 This is just a wrapper around _UnlockedReleaseIp. 364 365 """ 366 if net_uuid: 367 self._UnlockedReleaseIp(net_uuid, address, ec_id)
368 369 @locking.ssynchronized(_config_lock, shared=1)
370 - def GenerateIp(self, net_uuid, ec_id):
371 """Find a free IPv4 address for an instance. 372 373 """ 374 nobj = self._UnlockedGetNetwork(net_uuid) 375 pool = network.AddressPool(nobj) 376 377 def gen_one(): 378 try: 379 ip = pool.GenerateFree() 380 except errors.AddressPoolError: 381 raise errors.ReservationError("Cannot generate IP. Network is full") 382 return (constants.RESERVE_ACTION, ip, net_uuid)
383 384 _, address, _ = self._temporary_ips.Generate([], gen_one, ec_id) 385 return address 386
387 - def _UnlockedReserveIp(self, net_uuid, address, ec_id):
388 """Reserve a given IPv4 address for use by an instance. 389 390 """ 391 nobj = self._UnlockedGetNetwork(net_uuid) 392 pool = network.AddressPool(nobj) 393 try: 394 isreserved = pool.IsReserved(address) 395 except errors.AddressPoolError: 396 raise errors.ReservationError("IP address not in network") 397 if isreserved: 398 raise errors.ReservationError("IP address already in use") 399 400 return self._temporary_ips.Reserve(ec_id, 401 (constants.RESERVE_ACTION, 402 address, net_uuid))
403 404 @locking.ssynchronized(_config_lock, shared=1)
405 - def ReserveIp(self, net_uuid, address, ec_id):
406 """Reserve a given IPv4 address for use by an instance. 407 408 """ 409 if net_uuid: 410 return self._UnlockedReserveIp(net_uuid, address, ec_id)
411 412 @locking.ssynchronized(_config_lock, shared=1)
413 - def ReserveLV(self, lv_name, ec_id):
414 """Reserve an VG/LV pair for an instance. 415 416 @type lv_name: string 417 @param lv_name: the logical volume name to reserve 418 419 """ 420 all_lvs = self._AllLVs() 421 if lv_name in all_lvs: 422 raise errors.ReservationError("LV already in use") 423 else: 424 self._temporary_lvs.Reserve(ec_id, lv_name)
425 426 @locking.ssynchronized(_config_lock, shared=1)
427 - def GenerateDRBDSecret(self, ec_id):
428 """Generate a DRBD secret. 429 430 This checks the current disks for duplicates. 431 432 """ 433 return self._temporary_secrets.Generate(self._AllDRBDSecrets(), 434 utils.GenerateSecret, 435 ec_id)
436
437 - def _AllLVs(self):
438 """Compute the list of all LVs. 439 440 """ 441 lvnames = set() 442 for instance in self._config_data.instances.values(): 443 node_data = instance.MapLVsByNode() 444 for lv_list in node_data.values(): 445 lvnames.update(lv_list) 446 return lvnames
447
448 - def _AllDisks(self):
449 """Compute the list of all Disks (recursively, including children). 450 451 """ 452 def DiskAndAllChildren(disk): 453 """Returns a list containing the given disk and all of his children. 454 455 """ 456 disks = [disk] 457 if disk.children: 458 for child_disk in disk.children: 459 disks.extend(DiskAndAllChildren(child_disk)) 460 return disks
461 462 disks = [] 463 for instance in self._config_data.instances.values(): 464 for disk in instance.disks: 465 disks.extend(DiskAndAllChildren(disk)) 466 return disks 467
468 - def _AllNICs(self):
469 """Compute the list of all NICs. 470 471 """ 472 nics = [] 473 for instance in self._config_data.instances.values(): 474 nics.extend(instance.nics) 475 return nics
476
477 - def _AllIDs(self, include_temporary):
478 """Compute the list of all UUIDs and names we have. 479 480 @type include_temporary: boolean 481 @param include_temporary: whether to include the _temporary_ids set 482 @rtype: set 483 @return: a set of IDs 484 485 """ 486 existing = set() 487 if include_temporary: 488 existing.update(self._temporary_ids.GetReserved()) 489 existing.update(self._AllLVs()) 490 existing.update(self._config_data.instances.keys()) 491 existing.update(self._config_data.nodes.keys()) 492 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid]) 493 return existing
494
495 - def _GenerateUniqueID(self, ec_id):
496 """Generate an unique UUID. 497 498 This checks the current node, instances and disk names for 499 duplicates. 500 501 @rtype: string 502 @return: the unique id 503 504 """ 505 existing = self._AllIDs(include_temporary=False) 506 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
507 508 @locking.ssynchronized(_config_lock, shared=1)
509 - def GenerateUniqueID(self, ec_id):
510 """Generate an unique ID. 511 512 This is just a wrapper over the unlocked version. 513 514 @type ec_id: string 515 @param ec_id: unique id for the job to reserve the id to 516 517 """ 518 return self._GenerateUniqueID(ec_id)
519
520 - def _AllMACs(self):
521 """Return all MACs present in the config. 522 523 @rtype: list 524 @return: the list of all MACs 525 526 """ 527 result = [] 528 for instance in self._config_data.instances.values(): 529 for nic in instance.nics: 530 result.append(nic.mac) 531 532 return result
533
534 - def _AllDRBDSecrets(self):
535 """Return all DRBD secrets present in the config. 536 537 @rtype: list 538 @return: the list of all DRBD secrets 539 540 """ 541 def helper(disk, result): 542 """Recursively gather secrets from this disk.""" 543 if disk.dev_type == constants.DT_DRBD8: 544 result.append(disk.logical_id[5]) 545 if disk.children: 546 for child in disk.children: 547 helper(child, result)
548 549 result = [] 550 for instance in self._config_data.instances.values(): 551 for disk in instance.disks: 552 helper(disk, result) 553 554 return result 555
556 - def _CheckDiskIDs(self, disk, l_ids, p_ids):
557 """Compute duplicate disk IDs 558 559 @type disk: L{objects.Disk} 560 @param disk: the disk at which to start searching 561 @type l_ids: list 562 @param l_ids: list of current logical ids 563 @type p_ids: list 564 @param p_ids: list of current physical ids 565 @rtype: list 566 @return: a list of error messages 567 568 """ 569 result = [] 570 if disk.logical_id is not None: 571 if disk.logical_id in l_ids: 572 result.append("duplicate logical id %s" % str(disk.logical_id)) 573 else: 574 l_ids.append(disk.logical_id) 575 if disk.physical_id is not None: 576 if disk.physical_id in p_ids: 577 result.append("duplicate physical id %s" % str(disk.physical_id)) 578 else: 579 p_ids.append(disk.physical_id) 580 581 if disk.children: 582 for child in disk.children: 583 result.extend(self._CheckDiskIDs(child, l_ids, p_ids)) 584 return result
585
586 - def _UnlockedVerifyConfig(self):
587 """Verify function. 588 589 @rtype: list 590 @return: a list of error messages; a non-empty list signifies 591 configuration errors 592 593 """ 594 # pylint: disable=R0914 595 result = [] 596 seen_macs = [] 597 ports = {} 598 data = self._config_data 599 cluster = data.cluster 600 seen_lids = [] 601 seen_pids = [] 602 603 # global cluster checks 604 if not cluster.enabled_hypervisors: 605 result.append("enabled hypervisors list doesn't have any entries") 606 invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES 607 if invalid_hvs: 608 result.append("enabled hypervisors contains invalid entries: %s" % 609 utils.CommaJoin(invalid_hvs)) 610 missing_hvp = (set(cluster.enabled_hypervisors) - 611 set(cluster.hvparams.keys())) 612 if missing_hvp: 613 result.append("hypervisor parameters missing for the enabled" 614 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp)) 615 616 if not cluster.enabled_disk_templates: 617 result.append("enabled disk templates list doesn't have any entries") 618 invalid_disk_templates = set(cluster.enabled_disk_templates) \ 619 - constants.DISK_TEMPLATES 620 if invalid_disk_templates: 621 result.append("enabled disk templates list contains invalid entries:" 622 " %s" % utils.CommaJoin(invalid_disk_templates)) 623 624 if cluster.master_node not in data.nodes: 625 result.append("cluster has invalid primary node '%s'" % 626 cluster.master_node) 627 628 def _helper(owner, attr, value, template): 629 try: 630 utils.ForceDictType(value, template) 631 except errors.GenericError, err: 632 result.append("%s has invalid %s: %s" % (owner, attr, err))
633 634 def _helper_nic(owner, params): 635 try: 636 objects.NIC.CheckParameterSyntax(params) 637 except errors.ConfigurationError, err: 638 result.append("%s has invalid nicparams: %s" % (owner, err)) 639 640 def _helper_ipolicy(owner, ipolicy, iscluster): 641 try: 642 objects.InstancePolicy.CheckParameterSyntax(ipolicy, iscluster) 643 except errors.ConfigurationError, err: 644 result.append("%s has invalid instance policy: %s" % (owner, err)) 645 for key, value in ipolicy.items(): 646 if key == constants.ISPECS_MINMAX: 647 for k in range(len(value)): 648 _helper_ispecs(owner, "ipolicy/%s[%s]" % (key, k), value[k]) 649 elif key == constants.ISPECS_STD: 650 _helper(owner, "ipolicy/" + key, value, 651 constants.ISPECS_PARAMETER_TYPES) 652 else: 653 # FIXME: assuming list type 654 if key in constants.IPOLICY_PARAMETERS: 655 exp_type = float 656 else: 657 exp_type = list 658 if not isinstance(value, exp_type): 659 result.append("%s has invalid instance policy: for %s," 660 " expecting %s, got %s" % 661 (owner, key, exp_type.__name__, type(value))) 662 663 def _helper_ispecs(owner, parentkey, params): 664 for (key, value) in params.items(): 665 fullkey = "/".join([parentkey, key]) 666 _helper(owner, fullkey, value, constants.ISPECS_PARAMETER_TYPES) 667 668 # check cluster parameters 669 _helper("cluster", "beparams", cluster.SimpleFillBE({}), 670 constants.BES_PARAMETER_TYPES) 671 _helper("cluster", "nicparams", cluster.SimpleFillNIC({}), 672 constants.NICS_PARAMETER_TYPES) 673 _helper_nic("cluster", cluster.SimpleFillNIC({})) 674 _helper("cluster", "ndparams", cluster.SimpleFillND({}), 675 constants.NDS_PARAMETER_TYPES) 676 _helper_ipolicy("cluster", cluster.ipolicy, True) 677 678 # per-instance checks 679 for instance_name in data.instances: 680 instance = data.instances[instance_name] 681 if instance.name != instance_name: 682 result.append("instance '%s' is indexed by wrong name '%s'" % 683 (instance.name, instance_name)) 684 if instance.primary_node not in data.nodes: 685 result.append("instance '%s' has invalid primary node '%s'" % 686 (instance_name, instance.primary_node)) 687 for snode in instance.secondary_nodes: 688 if snode not in data.nodes: 689 result.append("instance '%s' has invalid secondary node '%s'" % 690 (instance_name, snode)) 691 for idx, nic in enumerate(instance.nics): 692 if nic.mac in seen_macs: 693 result.append("instance '%s' has NIC %d mac %s duplicate" % 694 (instance_name, idx, nic.mac)) 695 else: 696 seen_macs.append(nic.mac) 697 if nic.nicparams: 698 filled = cluster.SimpleFillNIC(nic.nicparams) 699 owner = "instance %s nic %d" % (instance.name, idx) 700 _helper(owner, "nicparams", 701 filled, constants.NICS_PARAMETER_TYPES) 702 _helper_nic(owner, filled) 703 704 # disk template checks 705 if not instance.disk_template in data.cluster.enabled_disk_templates: 706 result.append("instance '%s' uses the disabled disk template '%s'." % 707 (instance_name, instance.disk_template)) 708 709 # parameter checks 710 if instance.beparams: 711 _helper("instance %s" % instance.name, "beparams", 712 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES) 713 714 # gather the drbd ports for duplicate checks 715 for (idx, dsk) in enumerate(instance.disks): 716 if dsk.dev_type in constants.LDS_DRBD: 717 tcp_port = dsk.logical_id[2] 718 if tcp_port not in ports: 719 ports[tcp_port] = [] 720 ports[tcp_port].append((instance.name, "drbd disk %s" % idx)) 721 # gather network port reservation 722 net_port = getattr(instance, "network_port", None) 723 if net_port is not None: 724 if net_port not in ports: 725 ports[net_port] = [] 726 ports[net_port].append((instance.name, "network port")) 727 728 # instance disk verify 729 for idx, disk in enumerate(instance.disks): 730 result.extend(["instance '%s' disk %d error: %s" % 731 (instance.name, idx, msg) for msg in disk.Verify()]) 732 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids)) 733 734 wrong_names = _CheckInstanceDiskIvNames(instance.disks) 735 if wrong_names: 736 tmp = "; ".join(("name of disk %s should be '%s', but is '%s'" % 737 (idx, exp_name, actual_name)) 738 for (idx, exp_name, actual_name) in wrong_names) 739 740 result.append("Instance '%s' has wrongly named disks: %s" % 741 (instance.name, tmp)) 742 743 # cluster-wide pool of free ports 744 for free_port in cluster.tcpudp_port_pool: 745 if free_port not in ports: 746 ports[free_port] = [] 747 ports[free_port].append(("cluster", "port marked as free")) 748 749 # compute tcp/udp duplicate ports 750 keys = ports.keys() 751 keys.sort() 752 for pnum in keys: 753 pdata = ports[pnum] 754 if len(pdata) > 1: 755 txt = utils.CommaJoin(["%s/%s" % val for val in pdata]) 756 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt)) 757 758 # highest used tcp port check 759 if keys: 760 if keys[-1] > cluster.highest_used_port: 761 result.append("Highest used port mismatch, saved %s, computed %s" % 762 (cluster.highest_used_port, keys[-1])) 763 764 if not data.nodes[cluster.master_node].master_candidate: 765 result.append("Master node is not a master candidate") 766 767 # master candidate checks 768 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats() 769 if mc_now < mc_max: 770 result.append("Not enough master candidates: actual %d, target %d" % 771 (mc_now, mc_max)) 772 773 # node checks 774 for node_name, node in data.nodes.items(): 775 if node.name != node_name: 776 result.append("Node '%s' is indexed by wrong name '%s'" % 777 (node.name, node_name)) 778 if [node.master_candidate, node.drained, node.offline].count(True) > 1: 779 result.append("Node %s state is invalid: master_candidate=%s," 780 " drain=%s, offline=%s" % 781 (node.name, node.master_candidate, node.drained, 782 node.offline)) 783 if node.group not in data.nodegroups: 784 result.append("Node '%s' has invalid group '%s'" % 785 (node.name, node.group)) 786 else: 787 _helper("node %s" % node.name, "ndparams", 788 cluster.FillND(node, data.nodegroups[node.group]), 789 constants.NDS_PARAMETER_TYPES) 790 used_globals = constants.NDC_GLOBALS.intersection(node.ndparams) 791 if used_globals: 792 result.append("Node '%s' has some global parameters set: %s" % 793 (node.name, utils.CommaJoin(used_globals))) 794 795 # nodegroups checks 796 nodegroups_names = set() 797 for nodegroup_uuid in data.nodegroups: 798 nodegroup = data.nodegroups[nodegroup_uuid] 799 if nodegroup.uuid != nodegroup_uuid: 800 result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'" 801 % (nodegroup.name, nodegroup.uuid, nodegroup_uuid)) 802 if utils.UUID_RE.match(nodegroup.name.lower()): 803 result.append("node group '%s' (uuid: '%s') has uuid-like name" % 804 (nodegroup.name, nodegroup.uuid)) 805 if nodegroup.name in nodegroups_names: 806 result.append("duplicate node group name '%s'" % nodegroup.name) 807 else: 808 nodegroups_names.add(nodegroup.name) 809 group_name = "group %s" % nodegroup.name 810 _helper_ipolicy(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy), 811 False) 812 if nodegroup.ndparams: 813 _helper(group_name, "ndparams", 814 cluster.SimpleFillND(nodegroup.ndparams), 815 constants.NDS_PARAMETER_TYPES) 816 817 # drbd minors check 818 _, duplicates = self._UnlockedComputeDRBDMap() 819 for node, minor, instance_a, instance_b in duplicates: 820 result.append("DRBD minor %d on node %s is assigned twice to instances" 821 " %s and %s" % (minor, node, instance_a, instance_b)) 822 823 # IP checks 824 default_nicparams = cluster.nicparams[constants.PP_DEFAULT] 825 ips = {} 826 827 def _AddIpAddress(ip, name): 828 ips.setdefault(ip, []).append(name) 829 830 _AddIpAddress(cluster.master_ip, "cluster_ip") 831 832 for node in data.nodes.values(): 833 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name) 834 if node.secondary_ip != node.primary_ip: 835 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name) 836 837 for instance in data.instances.values(): 838 for idx, nic in enumerate(instance.nics): 839 if nic.ip is None: 840 continue 841 842 nicparams = objects.FillDict(default_nicparams, nic.nicparams) 843 nic_mode = nicparams[constants.NIC_MODE] 844 nic_link = nicparams[constants.NIC_LINK] 845 846 if nic_mode == constants.NIC_MODE_BRIDGED: 847 link = "bridge:%s" % nic_link 848 elif nic_mode == constants.NIC_MODE_ROUTED: 849 link = "route:%s" % nic_link 850 else: 851 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode) 852 853 _AddIpAddress("%s/%s/%s" % (link, nic.ip, nic.network), 854 "instance:%s/nic:%d" % (instance.name, idx)) 855 856 for ip, owners in ips.items(): 857 if len(owners) > 1: 858 result.append("IP address %s is used by multiple owners: %s" % 859 (ip, utils.CommaJoin(owners))) 860 861 return result 862 863 @locking.ssynchronized(_config_lock, shared=1)
864 - def VerifyConfig(self):
865 """Verify function. 866 867 This is just a wrapper over L{_UnlockedVerifyConfig}. 868 869 @rtype: list 870 @return: a list of error messages; a non-empty list signifies 871 configuration errors 872 873 """ 874 return self._UnlockedVerifyConfig()
875
876 - def _UnlockedSetDiskID(self, disk, node_name):
877 """Convert the unique ID to the ID needed on the target nodes. 878 879 This is used only for drbd, which needs ip/port configuration. 880 881 The routine descends down and updates its children also, because 882 this helps when the only the top device is passed to the remote 883 node. 884 885 This function is for internal use, when the config lock is already held. 886 887 """ 888 if disk.children: 889 for child in disk.children: 890 self._UnlockedSetDiskID(child, node_name) 891 892 if disk.logical_id is None and disk.physical_id is not None: 893 return 894 if disk.dev_type == constants.LD_DRBD8: 895 pnode, snode, port, pminor, sminor, secret = disk.logical_id 896 if node_name not in (pnode, snode): 897 raise errors.ConfigurationError("DRBD device not knowing node %s" % 898 node_name) 899 pnode_info = self._UnlockedGetNodeInfo(pnode) 900 snode_info = self._UnlockedGetNodeInfo(snode) 901 if pnode_info is None or snode_info is None: 902 raise errors.ConfigurationError("Can't find primary or secondary node" 903 " for %s" % str(disk)) 904 p_data = (pnode_info.secondary_ip, port) 905 s_data = (snode_info.secondary_ip, port) 906 if pnode == node_name: 907 disk.physical_id = p_data + s_data + (pminor, secret) 908 else: # it must be secondary, we tested above 909 disk.physical_id = s_data + p_data + (sminor, secret) 910 else: 911 disk.physical_id = disk.logical_id 912 return
913 914 @locking.ssynchronized(_config_lock)
915 - def SetDiskID(self, disk, node_name):
916 """Convert the unique ID to the ID needed on the target nodes. 917 918 This is used only for drbd, which needs ip/port configuration. 919 920 The routine descends down and updates its children also, because 921 this helps when the only the top device is passed to the remote 922 node. 923 924 """ 925 return self._UnlockedSetDiskID(disk, node_name)
926 927 @locking.ssynchronized(_config_lock)
928 - def AddTcpUdpPort(self, port):
929 """Adds a new port to the available port pool. 930 931 @warning: this method does not "flush" the configuration (via 932 L{_WriteConfig}); callers should do that themselves once the 933 configuration is stable 934 935 """ 936 if not isinstance(port, int): 937 raise errors.ProgrammerError("Invalid type passed for port") 938 939 self._config_data.cluster.tcpudp_port_pool.add(port)
940 941 @locking.ssynchronized(_config_lock, shared=1)
942 - def GetPortList(self):
943 """Returns a copy of the current port list. 944 945 """ 946 return self._config_data.cluster.tcpudp_port_pool.copy()
947 948 @locking.ssynchronized(_config_lock)
949 - def AllocatePort(self):
950 """Allocate a port. 951 952 The port will be taken from the available port pool or from the 953 default port range (and in this case we increase 954 highest_used_port). 955 956 """ 957 # If there are TCP/IP ports configured, we use them first. 958 if self._config_data.cluster.tcpudp_port_pool: 959 port = self._config_data.cluster.tcpudp_port_pool.pop() 960 else: 961 port = self._config_data.cluster.highest_used_port + 1 962 if port >= constants.LAST_DRBD_PORT: 963 raise errors.ConfigurationError("The highest used port is greater" 964 " than %s. Aborting." % 965 constants.LAST_DRBD_PORT) 966 self._config_data.cluster.highest_used_port = port 967 968 self._WriteConfig() 969 return port
970
971 - def _UnlockedComputeDRBDMap(self):
972 """Compute the used DRBD minor/nodes. 973 974 @rtype: (dict, list) 975 @return: dictionary of node_name: dict of minor: instance_name; 976 the returned dict will have all the nodes in it (even if with 977 an empty list), and a list of duplicates; if the duplicates 978 list is not empty, the configuration is corrupted and its caller 979 should raise an exception 980 981 """ 982 def _AppendUsedPorts(instance_name, disk, used): 983 duplicates = [] 984 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5: 985 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5] 986 for node, port in ((node_a, minor_a), (node_b, minor_b)): 987 assert node in used, ("Node '%s' of instance '%s' not found" 988 " in node list" % (node, instance_name)) 989 if port in used[node]: 990 duplicates.append((node, port, instance_name, used[node][port])) 991 else: 992 used[node][port] = instance_name 993 if disk.children: 994 for child in disk.children: 995 duplicates.extend(_AppendUsedPorts(instance_name, child, used)) 996 return duplicates
997 998 duplicates = [] 999 my_dict = dict((node, {}) for node in self._config_data.nodes) 1000 for instance in self._config_data.instances.itervalues(): 1001 for disk in instance.disks: 1002 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict)) 1003 for (node, minor), instance in self._temporary_drbds.iteritems(): 1004 if minor in my_dict[node] and my_dict[node][minor] != instance: 1005 duplicates.append((node, minor, instance, my_dict[node][minor])) 1006 else: 1007 my_dict[node][minor] = instance 1008 return my_dict, duplicates 1009 1010 @locking.ssynchronized(_config_lock)
1011 - def ComputeDRBDMap(self):
1012 """Compute the used DRBD minor/nodes. 1013 1014 This is just a wrapper over L{_UnlockedComputeDRBDMap}. 1015 1016 @return: dictionary of node_name: dict of minor: instance_name; 1017 the returned dict will have all the nodes in it (even if with 1018 an empty list). 1019 1020 """ 1021 d_map, duplicates = self._UnlockedComputeDRBDMap() 1022 if duplicates: 1023 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 1024 str(duplicates)) 1025 return d_map
1026 1027 @locking.ssynchronized(_config_lock)
1028 - def AllocateDRBDMinor(self, nodes, instance):
1029 """Allocate a drbd minor. 1030 1031 The free minor will be automatically computed from the existing 1032 devices. A node can be given multiple times in order to allocate 1033 multiple minors. The result is the list of minors, in the same 1034 order as the passed nodes. 1035 1036 @type instance: string 1037 @param instance: the instance for which we allocate minors 1038 1039 """ 1040 assert isinstance(instance, basestring), \ 1041 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance 1042 1043 d_map, duplicates = self._UnlockedComputeDRBDMap() 1044 if duplicates: 1045 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 1046 str(duplicates)) 1047 result = [] 1048 for nname in nodes: 1049 ndata = d_map[nname] 1050 if not ndata: 1051 # no minors used, we can start at 0 1052 result.append(0) 1053 ndata[0] = instance 1054 self._temporary_drbds[(nname, 0)] = instance 1055 continue 1056 keys = ndata.keys() 1057 keys.sort() 1058 ffree = utils.FirstFree(keys) 1059 if ffree is None: 1060 # return the next minor 1061 # TODO: implement high-limit check 1062 minor = keys[-1] + 1 1063 else: 1064 minor = ffree 1065 # double-check minor against current instances 1066 assert minor not in d_map[nname], \ 1067 ("Attempt to reuse allocated DRBD minor %d on node %s," 1068 " already allocated to instance %s" % 1069 (minor, nname, d_map[nname][minor])) 1070 ndata[minor] = instance 1071 # double-check minor against reservation 1072 r_key = (nname, minor) 1073 assert r_key not in self._temporary_drbds, \ 1074 ("Attempt to reuse reserved DRBD minor %d on node %s," 1075 " reserved for instance %s" % 1076 (minor, nname, self._temporary_drbds[r_key])) 1077 self._temporary_drbds[r_key] = instance 1078 result.append(minor) 1079 logging.debug("Request to allocate drbd minors, input: %s, returning %s", 1080 nodes, result) 1081 return result
1082
1083 - def _UnlockedReleaseDRBDMinors(self, instance):
1084 """Release temporary drbd minors allocated for a given instance. 1085 1086 @type instance: string 1087 @param instance: the instance for which temporary minors should be 1088 released 1089 1090 """ 1091 assert isinstance(instance, basestring), \ 1092 "Invalid argument passed to ReleaseDRBDMinors" 1093 for key, name in self._temporary_drbds.items(): 1094 if name == instance: 1095 del self._temporary_drbds[key]
1096 1097 @locking.ssynchronized(_config_lock)
1098 - def ReleaseDRBDMinors(self, instance):
1099 """Release temporary drbd minors allocated for a given instance. 1100 1101 This should be called on the error paths, on the success paths 1102 it's automatically called by the ConfigWriter add and update 1103 functions. 1104 1105 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}. 1106 1107 @type instance: string 1108 @param instance: the instance for which temporary minors should be 1109 released 1110 1111 """ 1112 self._UnlockedReleaseDRBDMinors(instance)
1113 1114 @locking.ssynchronized(_config_lock, shared=1)
1115 - def GetConfigVersion(self):
1116 """Get the configuration version. 1117 1118 @return: Config version 1119 1120 """ 1121 return self._config_data.version
1122 1123 @locking.ssynchronized(_config_lock, shared=1)
1124 - def GetClusterName(self):
1125 """Get cluster name. 1126 1127 @return: Cluster name 1128 1129 """ 1130 return self._config_data.cluster.cluster_name
1131 1132 @locking.ssynchronized(_config_lock, shared=1)
1133 - def GetMasterNode(self):
1134 """Get the hostname of the master node for this cluster. 1135 1136 @return: Master hostname 1137 1138 """ 1139 return self._config_data.cluster.master_node
1140 1141 @locking.ssynchronized(_config_lock, shared=1)
1142 - def GetMasterIP(self):
1143 """Get the IP of the master node for this cluster. 1144 1145 @return: Master IP 1146 1147 """ 1148 return self._config_data.cluster.master_ip
1149 1150 @locking.ssynchronized(_config_lock, shared=1)
1151 - def GetMasterNetdev(self):
1152 """Get the master network device for this cluster. 1153 1154 """ 1155 return self._config_data.cluster.master_netdev
1156 1157 @locking.ssynchronized(_config_lock, shared=1)
1158 - def GetMasterNetmask(self):
1159 """Get the netmask of the master node for this cluster. 1160 1161 """ 1162 return self._config_data.cluster.master_netmask
1163 1164 @locking.ssynchronized(_config_lock, shared=1)
1165 - def GetUseExternalMipScript(self):
1166 """Get flag representing whether to use the external master IP setup script. 1167 1168 """ 1169 return self._config_data.cluster.use_external_mip_script
1170 1171 @locking.ssynchronized(_config_lock, shared=1)
1172 - def GetFileStorageDir(self):
1173 """Get the file storage dir for this cluster. 1174 1175 """ 1176 return self._config_data.cluster.file_storage_dir
1177 1178 @locking.ssynchronized(_config_lock, shared=1)
1179 - def GetSharedFileStorageDir(self):
1180 """Get the shared file storage dir for this cluster. 1181 1182 """ 1183 return self._config_data.cluster.shared_file_storage_dir
1184 1185 @locking.ssynchronized(_config_lock, shared=1)
1186 - def GetHypervisorType(self):
1187 """Get the hypervisor type for this cluster. 1188 1189 """ 1190 return self._config_data.cluster.enabled_hypervisors[0]
1191 1192 @locking.ssynchronized(_config_lock, shared=1)
1193 - def GetRsaHostKey(self):
1194 """Return the rsa hostkey from the config. 1195 1196 @rtype: string 1197 @return: the rsa hostkey 1198 1199 """ 1200 return self._config_data.cluster.rsahostkeypub
1201 1202 @locking.ssynchronized(_config_lock, shared=1)
1203 - def GetDsaHostKey(self):
1204 """Return the dsa hostkey from the config. 1205 1206 @rtype: string 1207 @return: the dsa hostkey 1208 1209 """ 1210 return self._config_data.cluster.dsahostkeypub
1211 1212 @locking.ssynchronized(_config_lock, shared=1)
1213 - def GetDefaultIAllocator(self):
1214 """Get the default instance allocator for this cluster. 1215 1216 """ 1217 return self._config_data.cluster.default_iallocator
1218 1219 @locking.ssynchronized(_config_lock, shared=1)
1220 - def GetPrimaryIPFamily(self):
1221 """Get cluster primary ip family. 1222 1223 @return: primary ip family 1224 1225 """ 1226 return self._config_data.cluster.primary_ip_family
1227 1228 @locking.ssynchronized(_config_lock, shared=1)
1229 - def GetMasterNetworkParameters(self):
1230 """Get network parameters of the master node. 1231 1232 @rtype: L{object.MasterNetworkParameters} 1233 @return: network parameters of the master node 1234 1235 """ 1236 cluster = self._config_data.cluster 1237 result = objects.MasterNetworkParameters( 1238 name=cluster.master_node, ip=cluster.master_ip, 1239 netmask=cluster.master_netmask, netdev=cluster.master_netdev, 1240 ip_family=cluster.primary_ip_family) 1241 1242 return result
1243 1244 @locking.ssynchronized(_config_lock)
1245 - def AddNodeGroup(self, group, ec_id, check_uuid=True):
1246 """Add a node group to the configuration. 1247 1248 This method calls group.UpgradeConfig() to fill any missing attributes 1249 according to their default values. 1250 1251 @type group: L{objects.NodeGroup} 1252 @param group: the NodeGroup object to add 1253 @type ec_id: string 1254 @param ec_id: unique id for the job to use when creating a missing UUID 1255 @type check_uuid: bool 1256 @param check_uuid: add an UUID to the group if it doesn't have one or, if 1257 it does, ensure that it does not exist in the 1258 configuration already 1259 1260 """ 1261 self._UnlockedAddNodeGroup(group, ec_id, check_uuid) 1262 self._WriteConfig()
1263
1264 - def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
1265 """Add a node group to the configuration. 1266 1267 """ 1268 logging.info("Adding node group %s to configuration", group.name) 1269 1270 # Some code might need to add a node group with a pre-populated UUID 1271 # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass 1272 # the "does this UUID" exist already check. 1273 if check_uuid: 1274 self._EnsureUUID(group, ec_id) 1275 1276 try: 1277 existing_uuid = self._UnlockedLookupNodeGroup(group.name) 1278 except errors.OpPrereqError: 1279 pass 1280 else: 1281 raise errors.OpPrereqError("Desired group name '%s' already exists as a" 1282 " node group (UUID: %s)" % 1283 (group.name, existing_uuid), 1284 errors.ECODE_EXISTS) 1285 1286 group.serial_no = 1 1287 group.ctime = group.mtime = time.time() 1288 group.UpgradeConfig() 1289 1290 self._config_data.nodegroups[group.uuid] = group 1291 self._config_data.cluster.serial_no += 1
1292 1293 @locking.ssynchronized(_config_lock)
1294 - def RemoveNodeGroup(self, group_uuid):
1295 """Remove a node group from the configuration. 1296 1297 @type group_uuid: string 1298 @param group_uuid: the UUID of the node group to remove 1299 1300 """ 1301 logging.info("Removing node group %s from configuration", group_uuid) 1302 1303 if group_uuid not in self._config_data.nodegroups: 1304 raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid) 1305 1306 assert len(self._config_data.nodegroups) != 1, \ 1307 "Group '%s' is the only group, cannot be removed" % group_uuid 1308 1309 del self._config_data.nodegroups[group_uuid] 1310 self._config_data.cluster.serial_no += 1 1311 self._WriteConfig()
1312
1313 - def _UnlockedLookupNodeGroup(self, target):
1314 """Lookup a node group's UUID. 1315 1316 @type target: string or None 1317 @param target: group name or UUID or None to look for the default 1318 @rtype: string 1319 @return: nodegroup UUID 1320 @raises errors.OpPrereqError: when the target group cannot be found 1321 1322 """ 1323 if target is None: 1324 if len(self._config_data.nodegroups) != 1: 1325 raise errors.OpPrereqError("More than one node group exists. Target" 1326 " group must be specified explicitly.") 1327 else: 1328 return self._config_data.nodegroups.keys()[0] 1329 if target in self._config_data.nodegroups: 1330 return target 1331 for nodegroup in self._config_data.nodegroups.values(): 1332 if nodegroup.name == target: 1333 return nodegroup.uuid 1334 raise errors.OpPrereqError("Node group '%s' not found" % target, 1335 errors.ECODE_NOENT)
1336 1337 @locking.ssynchronized(_config_lock, shared=1)
1338 - def LookupNodeGroup(self, target):
1339 """Lookup a node group's UUID. 1340 1341 This function is just a wrapper over L{_UnlockedLookupNodeGroup}. 1342 1343 @type target: string or None 1344 @param target: group name or UUID or None to look for the default 1345 @rtype: string 1346 @return: nodegroup UUID 1347 1348 """ 1349 return self._UnlockedLookupNodeGroup(target)
1350
1351 - def _UnlockedGetNodeGroup(self, uuid):
1352 """Lookup a node group. 1353 1354 @type uuid: string 1355 @param uuid: group UUID 1356 @rtype: L{objects.NodeGroup} or None 1357 @return: nodegroup object, or None if not found 1358 1359 """ 1360 if uuid not in self._config_data.nodegroups: 1361 return None 1362 1363 return self._config_data.nodegroups[uuid]
1364 1365 @locking.ssynchronized(_config_lock, shared=1)
1366 - def GetNodeGroup(self, uuid):
1367 """Lookup a node group. 1368 1369 @type uuid: string 1370 @param uuid: group UUID 1371 @rtype: L{objects.NodeGroup} or None 1372 @return: nodegroup object, or None if not found 1373 1374 """ 1375 return self._UnlockedGetNodeGroup(uuid)
1376 1377 @locking.ssynchronized(_config_lock, shared=1)
1378 - def GetAllNodeGroupsInfo(self):
1379 """Get the configuration of all node groups. 1380 1381 """ 1382 return dict(self._config_data.nodegroups)
1383 1384 @locking.ssynchronized(_config_lock, shared=1)
1385 - def GetNodeGroupList(self):
1386 """Get a list of node groups. 1387 1388 """ 1389 return self._config_data.nodegroups.keys()
1390 1391 @locking.ssynchronized(_config_lock, shared=1)
1392 - def GetNodeGroupMembersByNodes(self, nodes):
1393 """Get nodes which are member in the same nodegroups as the given nodes. 1394 1395 """ 1396 ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group 1397 return frozenset(member_name 1398 for node_name in nodes 1399 for member_name in 1400 self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1401 1402 @locking.ssynchronized(_config_lock, shared=1)
1403 - def GetMultiNodeGroupInfo(self, group_uuids):
1404 """Get the configuration of multiple node groups. 1405 1406 @param group_uuids: List of node group UUIDs 1407 @rtype: list 1408 @return: List of tuples of (group_uuid, group_info) 1409 1410 """ 1411 return [(uuid, self._UnlockedGetNodeGroup(uuid)) for uuid in group_uuids]
1412 1413 @locking.ssynchronized(_config_lock)
1414 - def AddInstance(self, instance, ec_id):
1415 """Add an instance to the config. 1416 1417 This should be used after creating a new instance. 1418 1419 @type instance: L{objects.Instance} 1420 @param instance: the instance object 1421 1422 """ 1423 if not isinstance(instance, objects.Instance): 1424 raise errors.ProgrammerError("Invalid type passed to AddInstance") 1425 1426 if instance.disk_template != constants.DT_DISKLESS: 1427 all_lvs = instance.MapLVsByNode() 1428 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs) 1429 1430 all_macs = self._AllMACs() 1431 for nic in instance.nics: 1432 if nic.mac in all_macs: 1433 raise errors.ConfigurationError("Cannot add instance %s:" 1434 " MAC address '%s' already in use." % 1435 (instance.name, nic.mac)) 1436 1437 self._EnsureUUID(instance, ec_id) 1438 1439 instance.serial_no = 1 1440 instance.ctime = instance.mtime = time.time() 1441 self._config_data.instances[instance.name] = instance 1442 self._config_data.cluster.serial_no += 1 1443 self._UnlockedReleaseDRBDMinors(instance.name) 1444 self._UnlockedCommitTemporaryIps(ec_id) 1445 self._WriteConfig()
1446
1447 - def _EnsureUUID(self, item, ec_id):
1448 """Ensures a given object has a valid UUID. 1449 1450 @param item: the instance or node to be checked 1451 @param ec_id: the execution context id for the uuid reservation 1452 1453 """ 1454 if not item.uuid: 1455 item.uuid = self._GenerateUniqueID(ec_id) 1456 elif item.uuid in self._AllIDs(include_temporary=True): 1457 raise errors.ConfigurationError("Cannot add '%s': UUID %s already" 1458 " in use" % (item.name, item.uuid))
1459
1460 - def _SetInstanceStatus(self, instance_name, status, disks_active):
1461 """Set the instance's status to a given value. 1462 1463 """ 1464 if instance_name not in self._config_data.instances: 1465 raise errors.ConfigurationError("Unknown instance '%s'" % 1466 instance_name) 1467 instance = self._config_data.instances[instance_name] 1468 1469 if status is None: 1470 status = instance.admin_state 1471 if disks_active is None: 1472 disks_active = instance.disks_active 1473 1474 assert status in constants.ADMINST_ALL, \ 1475 "Invalid status '%s' passed to SetInstanceStatus" % (status,) 1476 1477 if instance.admin_state != status or \ 1478 instance.disks_active != disks_active: 1479 instance.admin_state = status 1480 instance.disks_active = disks_active 1481 instance.serial_no += 1 1482 instance.mtime = time.time() 1483 self._WriteConfig()
1484 1485 @locking.ssynchronized(_config_lock)
1486 - def MarkInstanceUp(self, instance_name):
1487 """Mark the instance status to up in the config. 1488 1489 This also sets the instance disks active flag. 1490 1491 """ 1492 self._SetInstanceStatus(instance_name, constants.ADMINST_UP, True)
1493 1494 @locking.ssynchronized(_config_lock)
1495 - def MarkInstanceOffline(self, instance_name):
1496 """Mark the instance status to down in the config. 1497 1498 This also clears the instance disks active flag. 1499 1500 """ 1501 self._SetInstanceStatus(instance_name, constants.ADMINST_OFFLINE, False)
1502 1503 @locking.ssynchronized(_config_lock)
1504 - def RemoveInstance(self, instance_name):
1505 """Remove the instance from the configuration. 1506 1507 """ 1508 if instance_name not in self._config_data.instances: 1509 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) 1510 1511 # If a network port has been allocated to the instance, 1512 # return it to the pool of free ports. 1513 inst = self._config_data.instances[instance_name] 1514 network_port = getattr(inst, "network_port", None) 1515 if network_port is not None: 1516 self._config_data.cluster.tcpudp_port_pool.add(network_port) 1517 1518 instance = self._UnlockedGetInstanceInfo(instance_name) 1519 1520 for nic in instance.nics: 1521 if nic.network and nic.ip: 1522 # Return all IP addresses to the respective address pools 1523 self._UnlockedCommitIp(constants.RELEASE_ACTION, nic.network, nic.ip) 1524 1525 del self._config_data.instances[instance_name] 1526 self._config_data.cluster.serial_no += 1 1527 self._WriteConfig()
1528 1529 @locking.ssynchronized(_config_lock)
1530 - def RenameInstance(self, old_name, new_name):
1531 """Rename an instance. 1532 1533 This needs to be done in ConfigWriter and not by RemoveInstance 1534 combined with AddInstance as only we can guarantee an atomic 1535 rename. 1536 1537 """ 1538 if old_name not in self._config_data.instances: 1539 raise errors.ConfigurationError("Unknown instance '%s'" % old_name) 1540 1541 # Operate on a copy to not loose instance object in case of a failure 1542 inst = self._config_data.instances[old_name].Copy() 1543 inst.name = new_name 1544 1545 for (idx, disk) in enumerate(inst.disks): 1546 if disk.dev_type == constants.LD_FILE: 1547 # rename the file paths in logical and physical id 1548 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1])) 1549 disk.logical_id = (disk.logical_id[0], 1550 utils.PathJoin(file_storage_dir, inst.name, 1551 "disk%s" % idx)) 1552 disk.physical_id = disk.logical_id 1553 1554 # Actually replace instance object 1555 del self._config_data.instances[old_name] 1556 self._config_data.instances[inst.name] = inst 1557 1558 # Force update of ssconf files 1559 self._config_data.cluster.serial_no += 1 1560 1561 self._WriteConfig()
1562 1563 @locking.ssynchronized(_config_lock)
1564 - def MarkInstanceDown(self, instance_name):
1565 """Mark the status of an instance to down in the configuration. 1566 1567 This does not touch the instance disks active flag, as shut down instances 1568 can still have active disks. 1569 1570 """ 1571 self._SetInstanceStatus(instance_name, constants.ADMINST_DOWN, None)
1572 1573 @locking.ssynchronized(_config_lock)
1574 - def MarkInstanceDisksActive(self, instance_name):
1575 """Mark the status of instance disks active. 1576 1577 """ 1578 self._SetInstanceStatus(instance_name, None, True)
1579 1580 @locking.ssynchronized(_config_lock)
1581 - def MarkInstanceDisksInactive(self, instance_name):
1582 """Mark the status of instance disks inactive. 1583 1584 """ 1585 self._SetInstanceStatus(instance_name, None, False)
1586
1587 - def _UnlockedGetInstanceList(self):
1588 """Get the list of instances. 1589 1590 This function is for internal use, when the config lock is already held. 1591 1592 """ 1593 return self._config_data.instances.keys()
1594 1595 @locking.ssynchronized(_config_lock, shared=1)
1596 - def GetInstanceList(self):
1597 """Get the list of instances. 1598 1599 @return: array of instances, ex. ['instance2.example.com', 1600 'instance1.example.com'] 1601 1602 """ 1603 return self._UnlockedGetInstanceList()
1604
1605 - def ExpandInstanceName(self, short_name):
1606 """Attempt to expand an incomplete instance name. 1607 1608 """ 1609 # Locking is done in L{ConfigWriter.GetInstanceList} 1610 return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1611
1612 - def _UnlockedGetInstanceInfo(self, instance_name):
1613 """Returns information about an instance. 1614 1615 This function is for internal use, when the config lock is already held. 1616 1617 """ 1618 if instance_name not in self._config_data.instances: 1619 return None 1620 1621 return self._config_data.instances[instance_name]
1622 1623 @locking.ssynchronized(_config_lock, shared=1)
1624 - def GetInstanceInfo(self, instance_name):
1625 """Returns information about an instance. 1626 1627 It takes the information from the configuration file. Other information of 1628 an instance are taken from the live systems. 1629 1630 @param instance_name: name of the instance, e.g. 1631 I{instance1.example.com} 1632 1633 @rtype: L{objects.Instance} 1634 @return: the instance object 1635 1636 """ 1637 return self._UnlockedGetInstanceInfo(instance_name)
1638 1639 @locking.ssynchronized(_config_lock, shared=1)
1640 - def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1641 """Returns set of node group UUIDs for instance's nodes. 1642 1643 @rtype: frozenset 1644 1645 """ 1646 instance = self._UnlockedGetInstanceInfo(instance_name) 1647 if not instance: 1648 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) 1649 1650 if primary_only: 1651 nodes = [instance.primary_node] 1652 else: 1653 nodes = instance.all_nodes 1654 1655 return frozenset(self._UnlockedGetNodeInfo(node_name).group 1656 for node_name in nodes)
1657 1658 @locking.ssynchronized(_config_lock, shared=1)
1659 - def GetInstanceNetworks(self, instance_name):
1660 """Returns set of network UUIDs for instance's nics. 1661 1662 @rtype: frozenset 1663 1664 """ 1665 instance = self._UnlockedGetInstanceInfo(instance_name) 1666 if not instance: 1667 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) 1668 1669 networks = set() 1670 for nic in instance.nics: 1671 if nic.network: 1672 networks.add(nic.network) 1673 1674 return frozenset(networks)
1675 1676 @locking.ssynchronized(_config_lock, shared=1)
1677 - def GetMultiInstanceInfo(self, instances):
1678 """Get the configuration of multiple instances. 1679 1680 @param instances: list of instance names 1681 @rtype: list 1682 @return: list of tuples (instance, instance_info), where 1683 instance_info is what would GetInstanceInfo return for the 1684 node, while keeping the original order 1685 1686 """ 1687 return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1688 1689 @locking.ssynchronized(_config_lock, shared=1)
1690 - def GetAllInstancesInfo(self):
1691 """Get the configuration of all instances. 1692 1693 @rtype: dict 1694 @return: dict of (instance, instance_info), where instance_info is what 1695 would GetInstanceInfo return for the node 1696 1697 """ 1698 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance)) 1699 for instance in self._UnlockedGetInstanceList()]) 1700 return my_dict
1701 1702 @locking.ssynchronized(_config_lock, shared=1)
1703 - def GetInstancesInfoByFilter(self, filter_fn):
1704 """Get instance configuration with a filter. 1705 1706 @type filter_fn: callable 1707 @param filter_fn: Filter function receiving instance object as parameter, 1708 returning boolean. Important: this function is called while the 1709 configuration locks is held. It must not do any complex work or call 1710 functions potentially leading to a deadlock. Ideally it doesn't call any 1711 other functions and just compares instance attributes. 1712 1713 """ 1714 return dict((name, inst) 1715 for (name, inst) in self._config_data.instances.items() 1716 if filter_fn(inst))
1717 1718 @locking.ssynchronized(_config_lock)
1719 - def AddNode(self, node, ec_id):
1720 """Add a node to the configuration. 1721 1722 @type node: L{objects.Node} 1723 @param node: a Node instance 1724 1725 """ 1726 logging.info("Adding node %s to configuration", node.name) 1727 1728 self._EnsureUUID(node, ec_id) 1729 1730 node.serial_no = 1 1731 node.ctime = node.mtime = time.time() 1732 self._UnlockedAddNodeToGroup(node.name, node.group) 1733 self._config_data.nodes[node.name] = node 1734 self._config_data.cluster.serial_no += 1 1735 self._WriteConfig()
1736 1737 @locking.ssynchronized(_config_lock)
1738 - def RemoveNode(self, node_name):
1739 """Remove a node from the configuration. 1740 1741 """ 1742 logging.info("Removing node %s from configuration", node_name) 1743 1744 if node_name not in self._config_data.nodes: 1745 raise errors.ConfigurationError("Unknown node '%s'" % node_name) 1746 1747 self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name]) 1748 del self._config_data.nodes[node_name] 1749 self._config_data.cluster.serial_no += 1 1750 self._WriteConfig()
1751
1752 - def ExpandNodeName(self, short_name):
1753 """Attempt to expand an incomplete node name. 1754 1755 """ 1756 # Locking is done in L{ConfigWriter.GetNodeList} 1757 return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1758
1759 - def _UnlockedGetNodeInfo(self, node_name):
1760 """Get the configuration of a node, as stored in the config. 1761 1762 This function is for internal use, when the config lock is already 1763 held. 1764 1765 @param node_name: the node name, e.g. I{node1.example.com} 1766 1767 @rtype: L{objects.Node} 1768 @return: the node object 1769 1770 """ 1771 if node_name not in self._config_data.nodes: 1772 return None 1773 1774 return self._config_data.nodes[node_name]
1775 1776 @locking.ssynchronized(_config_lock, shared=1)
1777 - def GetNodeInfo(self, node_name):
1778 """Get the configuration of a node, as stored in the config. 1779 1780 This is just a locked wrapper over L{_UnlockedGetNodeInfo}. 1781 1782 @param node_name: the node name, e.g. I{node1.example.com} 1783 1784 @rtype: L{objects.Node} 1785 @return: the node object 1786 1787 """ 1788 return self._UnlockedGetNodeInfo(node_name)
1789 1790 @locking.ssynchronized(_config_lock, shared=1)
1791 - def GetNodeInstances(self, node_name):
1792 """Get the instances of a node, as stored in the config. 1793 1794 @param node_name: the node name, e.g. I{node1.example.com} 1795 1796 @rtype: (list, list) 1797 @return: a tuple with two lists: the primary and the secondary instances 1798 1799 """ 1800 pri = [] 1801 sec = [] 1802 for inst in self._config_data.instances.values(): 1803 if inst.primary_node == node_name: 1804 pri.append(inst.name) 1805 if node_name in inst.secondary_nodes: 1806 sec.append(inst.name) 1807 return (pri, sec)
1808 1809 @locking.ssynchronized(_config_lock, shared=1)
1810 - def GetNodeGroupInstances(self, uuid, primary_only=False):
1811 """Get the instances of a node group. 1812 1813 @param uuid: Node group UUID 1814 @param primary_only: Whether to only consider primary nodes 1815 @rtype: frozenset 1816 @return: List of instance names in node group 1817 1818 """ 1819 if primary_only: 1820 nodes_fn = lambda inst: [inst.primary_node] 1821 else: 1822 nodes_fn = lambda inst: inst.all_nodes 1823 1824 return frozenset(inst.name 1825 for inst in self._config_data.instances.values() 1826 for node_name in nodes_fn(inst) 1827 if self._UnlockedGetNodeInfo(node_name).group == uuid)
1828
1829 - def _UnlockedGetNodeList(self):
1830 """Return the list of nodes which are in the configuration. 1831 1832 This function is for internal use, when the config lock is already 1833 held. 1834 1835 @rtype: list 1836 1837 """ 1838 return self._config_data.nodes.keys()
1839 1840 @locking.ssynchronized(_config_lock, shared=1)
1841 - def GetNodeList(self):
1842 """Return the list of nodes which are in the configuration. 1843 1844 """ 1845 return self._UnlockedGetNodeList()
1846
1847 - def _UnlockedGetOnlineNodeList(self):
1848 """Return the list of nodes which are online. 1849 1850 """ 1851 all_nodes = [self._UnlockedGetNodeInfo(node) 1852 for node in self._UnlockedGetNodeList()] 1853 return [node.name for node in all_nodes if not node.offline]
1854 1855 @locking.ssynchronized(_config_lock, shared=1)
1856 - def GetOnlineNodeList(self):
1857 """Return the list of nodes which are online. 1858 1859 """ 1860 return self._UnlockedGetOnlineNodeList()
1861 1862 @locking.ssynchronized(_config_lock, shared=1)
1863 - def GetVmCapableNodeList(self):
1864 """Return the list of nodes which are not vm capable. 1865 1866 """ 1867 all_nodes = [self._UnlockedGetNodeInfo(node) 1868 for node in self._UnlockedGetNodeList()] 1869 return [node.name for node in all_nodes if node.vm_capable]
1870 1871 @locking.ssynchronized(_config_lock, shared=1)
1872 - def GetNonVmCapableNodeList(self):
1873 """Return the list of nodes which are not vm capable. 1874 1875 """ 1876 all_nodes = [self._UnlockedGetNodeInfo(node) 1877 for node in self._UnlockedGetNodeList()] 1878 return [node.name for node in all_nodes if not node.vm_capable]
1879 1880 @locking.ssynchronized(_config_lock, shared=1)
1881 - def GetMultiNodeInfo(self, nodes):
1882 """Get the configuration of multiple nodes. 1883 1884 @param nodes: list of node names 1885 @rtype: list 1886 @return: list of tuples of (node, node_info), where node_info is 1887 what would GetNodeInfo return for the node, in the original 1888 order 1889 1890 """ 1891 return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1892 1893 @locking.ssynchronized(_config_lock, shared=1)
1894 - def GetAllNodesInfo(self):
1895 """Get the configuration of all nodes. 1896 1897 @rtype: dict 1898 @return: dict of (node, node_info), where node_info is what 1899 would GetNodeInfo return for the node 1900 1901 """ 1902 return self._UnlockedGetAllNodesInfo()
1903
1904 - def _UnlockedGetAllNodesInfo(self):
1905 """Gets configuration of all nodes. 1906 1907 @note: See L{GetAllNodesInfo} 1908 1909 """ 1910 return dict([(node, self._UnlockedGetNodeInfo(node)) 1911 for node in self._UnlockedGetNodeList()])
1912 1913 @locking.ssynchronized(_config_lock, shared=1)
1914 - def GetNodeGroupsFromNodes(self, nodes):
1915 """Returns groups for a list of nodes. 1916 1917 @type nodes: list of string 1918 @param nodes: List of node names 1919 @rtype: frozenset 1920 1921 """ 1922 return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1923
1924 - def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1925 """Get the number of current and maximum desired and possible candidates. 1926 1927 @type exceptions: list 1928 @param exceptions: if passed, list of nodes that should be ignored 1929 @rtype: tuple 1930 @return: tuple of (current, desired and possible, possible) 1931 1932 """ 1933 mc_now = mc_should = mc_max = 0 1934 for node in self._config_data.nodes.values(): 1935 if exceptions and node.name in exceptions: 1936 continue 1937 if not (node.offline or node.drained) and node.master_capable: 1938 mc_max += 1 1939 if node.master_candidate: 1940 mc_now += 1 1941 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size) 1942 return (mc_now, mc_should, mc_max)
1943 1944 @locking.ssynchronized(_config_lock, shared=1)
1945 - def GetMasterCandidateStats(self, exceptions=None):
1946 """Get the number of current and maximum possible candidates. 1947 1948 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}. 1949 1950 @type exceptions: list 1951 @param exceptions: if passed, list of nodes that should be ignored 1952 @rtype: tuple 1953 @return: tuple of (current, max) 1954 1955 """ 1956 return self._UnlockedGetMasterCandidateStats(exceptions)
1957 1958 @locking.ssynchronized(_config_lock)
1959 - def MaintainCandidatePool(self, exceptions):
1960 """Try to grow the candidate pool to the desired size. 1961 1962 @type exceptions: list 1963 @param exceptions: if passed, list of nodes that should be ignored 1964 @rtype: list 1965 @return: list with the adjusted nodes (L{objects.Node} instances) 1966 1967 """ 1968 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions) 1969 mod_list = [] 1970 if mc_now < mc_max: 1971 node_list = self._config_data.nodes.keys() 1972 random.shuffle(node_list) 1973 for name in node_list: 1974 if mc_now >= mc_max: 1975 break 1976 node = self._config_data.nodes[name] 1977 if (node.master_candidate or node.offline or node.drained or 1978 node.name in exceptions or not node.master_capable): 1979 continue 1980 mod_list.append(node) 1981 node.master_candidate = True 1982 node.serial_no += 1 1983 mc_now += 1 1984 if mc_now != mc_max: 1985 # this should not happen 1986 logging.warning("Warning: MaintainCandidatePool didn't manage to" 1987 " fill the candidate pool (%d/%d)", mc_now, mc_max) 1988 if mod_list: 1989 self._config_data.cluster.serial_no += 1 1990 self._WriteConfig() 1991 1992 return mod_list
1993
1994 - def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1995 """Add a given node to the specified group. 1996 1997 """ 1998 if nodegroup_uuid not in self._config_data.nodegroups: 1999 # This can happen if a node group gets deleted between its lookup and 2000 # when we're adding the first node to it, since we don't keep a lock in 2001 # the meantime. It's ok though, as we'll fail cleanly if the node group 2002 # is not found anymore. 2003 raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid) 2004 if node_name not in self._config_data.nodegroups[nodegroup_uuid].members: 2005 self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
2006
2007 - def _UnlockedRemoveNodeFromGroup(self, node):
2008 """Remove a given node from its group. 2009 2010 """ 2011 nodegroup = node.group 2012 if nodegroup not in self._config_data.nodegroups: 2013 logging.warning("Warning: node '%s' has unknown node group '%s'" 2014 " (while being removed from it)", node.name, nodegroup) 2015 nodegroup_obj = self._config_data.nodegroups[nodegroup] 2016 if node.name not in nodegroup_obj.members: 2017 logging.warning("Warning: node '%s' not a member of its node group '%s'" 2018 " (while being removed from it)", node.name, nodegroup) 2019 else: 2020 nodegroup_obj.members.remove(node.name)
2021 2022 @locking.ssynchronized(_config_lock)
2023 - def AssignGroupNodes(self, mods):
2024 """Changes the group of a number of nodes. 2025 2026 @type mods: list of tuples; (node name, new group UUID) 2027 @param mods: Node membership modifications 2028 2029 """ 2030 groups = self._config_data.nodegroups 2031 nodes = self._config_data.nodes 2032 2033 resmod = [] 2034 2035 # Try to resolve names/UUIDs first 2036 for (node_name, new_group_uuid) in mods: 2037 try: 2038 node = nodes[node_name] 2039 except KeyError: 2040 raise errors.ConfigurationError("Unable to find node '%s'" % node_name) 2041 2042 if node.group == new_group_uuid: 2043 # Node is being assigned to its current group 2044 logging.debug("Node '%s' was assigned to its current group (%s)", 2045 node_name, node.group) 2046 continue 2047 2048 # Try to find current group of node 2049 try: 2050 old_group = groups[node.group] 2051 except KeyError: 2052 raise errors.ConfigurationError("Unable to find old group '%s'" % 2053 node.group) 2054 2055 # Try to find new group for node 2056 try: 2057 new_group = groups[new_group_uuid] 2058 except KeyError: 2059 raise errors.ConfigurationError("Unable to find new group '%s'" % 2060 new_group_uuid) 2061 2062 assert node.name in old_group.members, \ 2063 ("Inconsistent configuration: node '%s' not listed in members for its" 2064 " old group '%s'" % (node.name, old_group.uuid)) 2065 assert node.name not in new_group.members, \ 2066 ("Inconsistent configuration: node '%s' already listed in members for" 2067 " its new group '%s'" % (node.name, new_group.uuid)) 2068 2069 resmod.append((node, old_group, new_group)) 2070 2071 # Apply changes 2072 for (node, old_group, new_group) in resmod: 2073 assert node.uuid != new_group.uuid and old_group.uuid != new_group.uuid, \ 2074 "Assigning to current group is not possible" 2075 2076 node.group = new_group.uuid 2077 2078 # Update members of involved groups 2079 if node.name in old_group.members: 2080 old_group.members.remove(node.name) 2081 if node.name not in new_group.members: 2082 new_group.members.append(node.name) 2083 2084 # Update timestamps and serials (only once per node/group object) 2085 now = time.time() 2086 for obj in frozenset(itertools.chain(*resmod)): # pylint: disable=W0142 2087 obj.serial_no += 1 2088 obj.mtime = now 2089 2090 # Force ssconf update 2091 self._config_data.cluster.serial_no += 1 2092 2093 self._WriteConfig()
2094
2095 - def _BumpSerialNo(self):
2096 """Bump up the serial number of the config. 2097 2098 """ 2099 self._config_data.serial_no += 1 2100 self._config_data.mtime = time.time()
2101
2102 - def _AllUUIDObjects(self):
2103 """Returns all objects with uuid attributes. 2104 2105 """ 2106 return (self._config_data.instances.values() + 2107 self._config_data.nodes.values() + 2108 self._config_data.nodegroups.values() + 2109 self._config_data.networks.values() + 2110 self._AllDisks() + 2111 self._AllNICs() + 2112 [self._config_data.cluster])
2113
2114 - def _OpenConfig(self, accept_foreign):
2115 """Read the config data from disk. 2116 2117 """ 2118 raw_data = utils.ReadFile(self._cfg_file) 2119 2120 try: 2121 data = objects.ConfigData.FromDict(serializer.Load(raw_data)) 2122 except Exception, err: 2123 raise errors.ConfigurationError(err) 2124 2125 # Make sure the configuration has the right version 2126 _ValidateConfig(data) 2127 2128 if (not hasattr(data, "cluster") or 2129 not hasattr(data.cluster, "rsahostkeypub")): 2130 raise errors.ConfigurationError("Incomplete configuration" 2131 " (missing cluster.rsahostkeypub)") 2132 2133 if data.cluster.master_node != self._my_hostname and not accept_foreign: 2134 msg = ("The configuration denotes node %s as master, while my" 2135 " hostname is %s; opening a foreign configuration is only" 2136 " possible in accept_foreign mode" % 2137 (data.cluster.master_node, self._my_hostname)) 2138 raise errors.ConfigurationError(msg) 2139 2140 self._config_data = data 2141 # reset the last serial as -1 so that the next write will cause 2142 # ssconf update 2143 self._last_cluster_serial = -1 2144 2145 # Upgrade configuration if needed 2146 self._UpgradeConfig() 2147 2148 self._cfg_id = utils.GetFileID(path=self._cfg_file)
2149
2150 - def _UpgradeConfig(self):
2151 """Run any upgrade steps. 2152 2153 This method performs both in-object upgrades and also update some data 2154 elements that need uniqueness across the whole configuration or interact 2155 with other objects. 2156 2157 @warning: this function will call L{_WriteConfig()}, but also 2158 L{DropECReservations} so it needs to be called only from a 2159 "safe" place (the constructor). If one wanted to call it with 2160 the lock held, a DropECReservationUnlocked would need to be 2161 created first, to avoid causing deadlock. 2162 2163 """ 2164 # Keep a copy of the persistent part of _config_data to check for changes 2165 # Serialization doesn't guarantee order in dictionaries 2166 oldconf = copy.deepcopy(self._config_data.ToDict()) 2167 2168 # In-object upgrades 2169 self._config_data.UpgradeConfig() 2170 2171 for item in self._AllUUIDObjects(): 2172 if item.uuid is None: 2173 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID) 2174 if not self._config_data.nodegroups: 2175 default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME 2176 default_nodegroup = objects.NodeGroup(name=default_nodegroup_name, 2177 members=[]) 2178 self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True) 2179 for node in self._config_data.nodes.values(): 2180 if not node.group: 2181 node.group = self.LookupNodeGroup(None) 2182 # This is technically *not* an upgrade, but needs to be done both when 2183 # nodegroups are being added, and upon normally loading the config, 2184 # because the members list of a node group is discarded upon 2185 # serializing/deserializing the object. 2186 self._UnlockedAddNodeToGroup(node.name, node.group) 2187 2188 modified = (oldconf != self._config_data.ToDict()) 2189 if modified: 2190 self._WriteConfig() 2191 # This is ok even if it acquires the internal lock, as _UpgradeConfig is 2192 # only called at config init time, without the lock held 2193 self.DropECReservations(_UPGRADE_CONFIG_JID) 2194 else: 2195 config_errors = self._UnlockedVerifyConfig() 2196 if config_errors: 2197 errmsg = ("Loaded configuration data is not consistent: %s" % 2198 (utils.CommaJoin(config_errors))) 2199 logging.critical(errmsg)
2200
2201 - def _DistributeConfig(self, feedback_fn):
2202 """Distribute the configuration to the other nodes. 2203 2204 Currently, this only copies the configuration file. In the future, 2205 it could be used to encapsulate the 2/3-phase update mechanism. 2206 2207 """ 2208 if self._offline: 2209 return True 2210 2211 bad = False 2212 2213 node_list = [] 2214 addr_list = [] 2215 myhostname = self._my_hostname 2216 # we can skip checking whether _UnlockedGetNodeInfo returns None 2217 # since the node list comes from _UnlocketGetNodeList, and we are 2218 # called with the lock held, so no modifications should take place 2219 # in between 2220 for node_name in self._UnlockedGetNodeList(): 2221 if node_name == myhostname: 2222 continue 2223 node_info = self._UnlockedGetNodeInfo(node_name) 2224 if not node_info.master_candidate: 2225 continue 2226 node_list.append(node_info.name) 2227 addr_list.append(node_info.primary_ip) 2228 2229 # TODO: Use dedicated resolver talking to config writer for name resolution 2230 result = \ 2231 self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file) 2232 for to_node, to_result in result.items(): 2233 msg = to_result.fail_msg 2234 if msg: 2235 msg = ("Copy of file %s to node %s failed: %s" % 2236 (self._cfg_file, to_node, msg)) 2237 logging.error(msg) 2238 2239 if feedback_fn: 2240 feedback_fn(msg) 2241 2242 bad = True 2243 2244 return not bad
2245
2246 - def _WriteConfig(self, destination=None, feedback_fn=None):
2247 """Write the configuration data to persistent storage. 2248 2249 """ 2250 assert feedback_fn is None or callable(feedback_fn) 2251 2252 # Warn on config errors, but don't abort the save - the 2253 # configuration has already been modified, and we can't revert; 2254 # the best we can do is to warn the user and save as is, leaving 2255 # recovery to the user 2256 config_errors = self._UnlockedVerifyConfig() 2257 if config_errors: 2258 errmsg = ("Configuration data is not consistent: %s" % 2259 (utils.CommaJoin(config_errors))) 2260 logging.critical(errmsg) 2261 if feedback_fn: 2262 feedback_fn(errmsg) 2263 2264 if destination is None: 2265 destination = self._cfg_file 2266 self._BumpSerialNo() 2267 txt = serializer.Dump(self._config_data.ToDict()) 2268 2269 getents = self._getents() 2270 try: 2271 fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt, 2272 close=False, gid=getents.confd_gid, mode=0640) 2273 except errors.LockError: 2274 raise errors.ConfigurationError("The configuration file has been" 2275 " modified since the last write, cannot" 2276 " update") 2277 try: 2278 self._cfg_id = utils.GetFileID(fd=fd) 2279 finally: 2280 os.close(fd) 2281 2282 self.write_count += 1 2283 2284 # and redistribute the config file to master candidates 2285 self._DistributeConfig(feedback_fn) 2286 2287 # Write ssconf files on all nodes (including locally) 2288 if self._last_cluster_serial < self._config_data.cluster.serial_no: 2289 if not self._offline: 2290 result = self._GetRpc(None).call_write_ssconf_files( 2291 self._UnlockedGetOnlineNodeList(), 2292 self._UnlockedGetSsconfValues()) 2293 2294 for nname, nresu in result.items(): 2295 msg = nresu.fail_msg 2296 if msg: 2297 errmsg = ("Error while uploading ssconf files to" 2298 " node %s: %s" % (nname, msg)) 2299 logging.warning(errmsg) 2300 2301 if feedback_fn: 2302 feedback_fn(errmsg) 2303 2304 self._last_cluster_serial = self._config_data.cluster.serial_no
2305
2306 - def _UnlockedGetSsconfValues(self):
2307 """Return the values needed by ssconf. 2308 2309 @rtype: dict 2310 @return: a dictionary with keys the ssconf names and values their 2311 associated value 2312 2313 """ 2314 fn = "\n".join 2315 instance_names = utils.NiceSort(self._UnlockedGetInstanceList()) 2316 node_names = utils.NiceSort(self._UnlockedGetNodeList()) 2317 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names] 2318 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip) 2319 for ninfo in node_info] 2320 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip) 2321 for ninfo in node_info] 2322 2323 instance_data = fn(instance_names) 2324 off_data = fn(node.name for node in node_info if node.offline) 2325 on_data = fn(node.name for node in node_info if not node.offline) 2326 mc_data = fn(node.name for node in node_info if node.master_candidate) 2327 mc_ips_data = fn(node.primary_ip for node in node_info 2328 if node.master_candidate) 2329 node_data = fn(node_names) 2330 node_pri_ips_data = fn(node_pri_ips) 2331 node_snd_ips_data = fn(node_snd_ips) 2332 2333 cluster = self._config_data.cluster 2334 cluster_tags = fn(cluster.GetTags()) 2335 2336 hypervisor_list = fn(cluster.enabled_hypervisors) 2337 2338 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n") 2339 2340 nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in 2341 self._config_data.nodegroups.values()] 2342 nodegroups_data = fn(utils.NiceSort(nodegroups)) 2343 networks = ["%s %s" % (net.uuid, net.name) for net in 2344 self._config_data.networks.values()] 2345 networks_data = fn(utils.NiceSort(networks)) 2346 2347 ssconf_values = { 2348 constants.SS_CLUSTER_NAME: cluster.cluster_name, 2349 constants.SS_CLUSTER_TAGS: cluster_tags, 2350 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir, 2351 constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir, 2352 constants.SS_MASTER_CANDIDATES: mc_data, 2353 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data, 2354 constants.SS_MASTER_IP: cluster.master_ip, 2355 constants.SS_MASTER_NETDEV: cluster.master_netdev, 2356 constants.SS_MASTER_NETMASK: str(cluster.master_netmask), 2357 constants.SS_MASTER_NODE: cluster.master_node, 2358 constants.SS_NODE_LIST: node_data, 2359 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data, 2360 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data, 2361 constants.SS_OFFLINE_NODES: off_data, 2362 constants.SS_ONLINE_NODES: on_data, 2363 constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family), 2364 constants.SS_INSTANCE_LIST: instance_data, 2365 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION, 2366 constants.SS_HYPERVISOR_LIST: hypervisor_list, 2367 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health), 2368 constants.SS_UID_POOL: uid_pool, 2369 constants.SS_NODEGROUPS: nodegroups_data, 2370 constants.SS_NETWORKS: networks_data, 2371 } 2372 bad_values = [(k, v) for k, v in ssconf_values.items() 2373 if not isinstance(v, (str, basestring))] 2374 if bad_values: 2375 err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values) 2376 raise errors.ConfigurationError("Some ssconf key(s) have non-string" 2377 " values: %s" % err) 2378 return ssconf_values
2379 2380 @locking.ssynchronized(_config_lock, shared=1)
2381 - def GetSsconfValues(self):
2382 """Wrapper using lock around _UnlockedGetSsconf(). 2383 2384 """ 2385 return self._UnlockedGetSsconfValues()
2386 2387 @locking.ssynchronized(_config_lock, shared=1)
2388 - def GetVGName(self):
2389 """Return the volume group name. 2390 2391 """ 2392 return self._config_data.cluster.volume_group_name
2393 2394 @locking.ssynchronized(_config_lock)
2395 - def SetVGName(self, vg_name):
2396 """Set the volume group name. 2397 2398 """ 2399 self._config_data.cluster.volume_group_name = vg_name 2400 self._config_data.cluster.serial_no += 1 2401 self._WriteConfig()
2402 2403 @locking.ssynchronized(_config_lock, shared=1)
2404 - def GetDRBDHelper(self):
2405 """Return DRBD usermode helper. 2406 2407 """ 2408 return self._config_data.cluster.drbd_usermode_helper
2409 2410 @locking.ssynchronized(_config_lock)
2411 - def SetDRBDHelper(self, drbd_helper):
2412 """Set DRBD usermode helper. 2413 2414 """ 2415 self._config_data.cluster.drbd_usermode_helper = drbd_helper 2416 self._config_data.cluster.serial_no += 1 2417 self._WriteConfig()
2418 2419 @locking.ssynchronized(_config_lock, shared=1)
2420 - def GetMACPrefix(self):
2421 """Return the mac prefix. 2422 2423 """ 2424 return self._config_data.cluster.mac_prefix
2425 2426 @locking.ssynchronized(_config_lock, shared=1)
2427 - def GetClusterInfo(self):
2428 """Returns information about the cluster 2429 2430 @rtype: L{objects.Cluster} 2431 @return: the cluster object 2432 2433 """ 2434 return self._config_data.cluster
2435 2436 @locking.ssynchronized(_config_lock, shared=1)
2437 - def HasAnyDiskOfType(self, dev_type):
2438 """Check if in there is at disk of the given type in the configuration. 2439 2440 """ 2441 return self._config_data.HasAnyDiskOfType(dev_type)
2442 2443 @locking.ssynchronized(_config_lock)
2444 - def Update(self, target, feedback_fn, ec_id=None):
2445 """Notify function to be called after updates. 2446 2447 This function must be called when an object (as returned by 2448 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the 2449 caller wants the modifications saved to the backing store. Note 2450 that all modified objects will be saved, but the target argument 2451 is the one the caller wants to ensure that it's saved. 2452 2453 @param target: an instance of either L{objects.Cluster}, 2454 L{objects.Node} or L{objects.Instance} which is existing in 2455 the cluster 2456 @param feedback_fn: Callable feedback function 2457 2458 """ 2459 if self._config_data is None: 2460 raise errors.ProgrammerError("Configuration file not read," 2461 " cannot save.") 2462 update_serial = False 2463 if isinstance(target, objects.Cluster): 2464 test = target == self._config_data.cluster 2465 elif isinstance(target, objects.Node): 2466 test = target in self._config_data.nodes.values() 2467 update_serial = True 2468 elif isinstance(target, objects.Instance): 2469 test = target in self._config_data.instances.values() 2470 elif isinstance(target, objects.NodeGroup): 2471 test = target in self._config_data.nodegroups.values() 2472 elif isinstance(target, objects.Network): 2473 test = target in self._config_data.networks.values() 2474 else: 2475 raise errors.ProgrammerError("Invalid object type (%s) passed to" 2476 " ConfigWriter.Update" % type(target)) 2477 if not test: 2478 raise errors.ConfigurationError("Configuration updated since object" 2479 " has been read or unknown object") 2480 target.serial_no += 1 2481 target.mtime = now = time.time() 2482 2483 if update_serial: 2484 # for node updates, we need to increase the cluster serial too 2485 self._config_data.cluster.serial_no += 1 2486 self._config_data.cluster.mtime = now 2487 2488 if isinstance(target, objects.Instance): 2489 self._UnlockedReleaseDRBDMinors(target.name) 2490 2491 if ec_id is not None: 2492 # Commit all ips reserved by OpInstanceSetParams and OpGroupSetParams 2493 self._UnlockedCommitTemporaryIps(ec_id) 2494 2495 self._WriteConfig(feedback_fn=feedback_fn)
2496 2497 @locking.ssynchronized(_config_lock)
2498 - def DropECReservations(self, ec_id):
2499 """Drop per-execution-context reservations 2500 2501 """ 2502 for rm in self._all_rms: 2503 rm.DropECReservations(ec_id)
2504 2505 @locking.ssynchronized(_config_lock, shared=1)
2506 - def GetAllNetworksInfo(self):
2507 """Get configuration info of all the networks. 2508 2509 """ 2510 return dict(self._config_data.networks)
2511
2512 - def _UnlockedGetNetworkList(self):
2513 """Get the list of networks. 2514 2515 This function is for internal use, when the config lock is already held. 2516 2517 """ 2518 return self._config_data.networks.keys()
2519 2520 @locking.ssynchronized(_config_lock, shared=1)
2521 - def GetNetworkList(self):
2522 """Get the list of networks. 2523 2524 @return: array of networks, ex. ["main", "vlan100", "200] 2525 2526 """ 2527 return self._UnlockedGetNetworkList()
2528 2529 @locking.ssynchronized(_config_lock, shared=1)
2530 - def GetNetworkNames(self):
2531 """Get a list of network names 2532 2533 """ 2534 names = [net.name 2535 for net in self._config_data.networks.values()] 2536 return names
2537
2538 - def _UnlockedGetNetwork(self, uuid):
2539 """Returns information about a network. 2540 2541 This function is for internal use, when the config lock is already held. 2542 2543 """ 2544 if uuid not in self._config_data.networks: 2545 return None 2546 2547 return self._config_data.networks[uuid]
2548 2549 @locking.ssynchronized(_config_lock, shared=1)
2550 - def GetNetwork(self, uuid):
2551 """Returns information about a network. 2552 2553 It takes the information from the configuration file. 2554 2555 @param uuid: UUID of the network 2556 2557 @rtype: L{objects.Network} 2558 @return: the network object 2559 2560 """ 2561 return self._UnlockedGetNetwork(uuid)
2562 2563 @locking.ssynchronized(_config_lock)
2564 - def AddNetwork(self, net, ec_id, check_uuid=True):
2565 """Add a network to the configuration. 2566 2567 @type net: L{objects.Network} 2568 @param net: the Network object to add 2569 @type ec_id: string 2570 @param ec_id: unique id for the job to use when creating a missing UUID 2571 2572 """ 2573 self._UnlockedAddNetwork(net, ec_id, check_uuid) 2574 self._WriteConfig()
2575
2576 - def _UnlockedAddNetwork(self, net, ec_id, check_uuid):
2577 """Add a network to the configuration. 2578 2579 """ 2580 logging.info("Adding network %s to configuration", net.name) 2581 2582 if check_uuid: 2583 self._EnsureUUID(net, ec_id) 2584 2585 net.serial_no = 1 2586 net.ctime = net.mtime = time.time() 2587 self._config_data.networks[net.uuid] = net 2588 self._config_data.cluster.serial_no += 1
2589
2590 - def _UnlockedLookupNetwork(self, target):
2591 """Lookup a network's UUID. 2592 2593 @type target: string 2594 @param target: network name or UUID 2595 @rtype: string 2596 @return: network UUID 2597 @raises errors.OpPrereqError: when the target network cannot be found 2598 2599 """ 2600 if target is None: 2601 return None 2602 if target in self._config_data.networks: 2603 return target 2604 for net in self._config_data.networks.values(): 2605 if net.name == target: 2606 return net.uuid 2607 raise errors.OpPrereqError("Network '%s' not found" % target, 2608 errors.ECODE_NOENT)
2609 2610 @locking.ssynchronized(_config_lock, shared=1)
2611 - def LookupNetwork(self, target):
2612 """Lookup a network's UUID. 2613 2614 This function is just a wrapper over L{_UnlockedLookupNetwork}. 2615 2616 @type target: string 2617 @param target: network name or UUID 2618 @rtype: string 2619 @return: network UUID 2620 2621 """ 2622 return self._UnlockedLookupNetwork(target)
2623 2624 @locking.ssynchronized(_config_lock)
2625 - def RemoveNetwork(self, network_uuid):
2626 """Remove a network from the configuration. 2627 2628 @type network_uuid: string 2629 @param network_uuid: the UUID of the network to remove 2630 2631 """ 2632 logging.info("Removing network %s from configuration", network_uuid) 2633 2634 if network_uuid not in self._config_data.networks: 2635 raise errors.ConfigurationError("Unknown network '%s'" % network_uuid) 2636 2637 del self._config_data.networks[network_uuid] 2638 self._config_data.cluster.serial_no += 1 2639 self._WriteConfig()
2640
2641 - def _UnlockedGetGroupNetParams(self, net_uuid, node):
2642 """Get the netparams (mode, link) of a network. 2643 2644 Get a network's netparams for a given node. 2645 2646 @type net_uuid: string 2647 @param net_uuid: network uuid 2648 @type node: string 2649 @param node: node name 2650 @rtype: dict or None 2651 @return: netparams 2652 2653 """ 2654 node_info = self._UnlockedGetNodeInfo(node) 2655 nodegroup_info = self._UnlockedGetNodeGroup(node_info.group) 2656 netparams = nodegroup_info.networks.get(net_uuid, None) 2657 2658 return netparams
2659 2660 @locking.ssynchronized(_config_lock, shared=1)
2661 - def GetGroupNetParams(self, net_uuid, node):
2662 """Locking wrapper of _UnlockedGetGroupNetParams() 2663 2664 """ 2665 return self._UnlockedGetGroupNetParams(net_uuid, node)
2666 2667 @locking.ssynchronized(_config_lock, shared=1)
2668 - def CheckIPInNodeGroup(self, ip, node):
2669 """Check IP uniqueness in nodegroup. 2670 2671 Check networks that are connected in the node's node group 2672 if ip is contained in any of them. Used when creating/adding 2673 a NIC to ensure uniqueness among nodegroups. 2674 2675 @type ip: string 2676 @param ip: ip address 2677 @type node: string 2678 @param node: node name 2679 @rtype: (string, dict) or (None, None) 2680 @return: (network name, netparams) 2681 2682 """ 2683 if ip is None: 2684 return (None, None) 2685 node_info = self._UnlockedGetNodeInfo(node) 2686 nodegroup_info = self._UnlockedGetNodeGroup(node_info.group) 2687 for net_uuid in nodegroup_info.networks.keys(): 2688 net_info = self._UnlockedGetNetwork(net_uuid) 2689 pool = network.AddressPool(net_info) 2690 if pool.Contains(ip): 2691 return (net_info.name, nodegroup_info.networks[net_uuid]) 2692 2693 return (None, None)
2694