Package ganeti :: Module config
[hide private]
[frames] | no frames]

Source Code for Module ganeti.config

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Configuration management for Ganeti 
  23   
  24  This module provides the interface to the Ganeti cluster configuration. 
  25   
  26  The configuration data is stored on every node but is updated on the master 
  27  only. After each update, the master distributes the data to the other nodes. 
  28   
  29  Currently, the data storage format is JSON. YAML was slow and consuming too 
  30  much memory. 
  31   
  32  """ 
  33   
  34  # pylint: disable=R0904 
  35  # R0904: Too many public methods 
  36   
  37  import copy 
  38  import os 
  39  import random 
  40  import logging 
  41  import time 
  42  import itertools 
  43   
  44  from ganeti import errors 
  45  from ganeti import locking 
  46  from ganeti import utils 
  47  from ganeti import constants 
  48  from ganeti import rpc 
  49  from ganeti import objects 
  50  from ganeti import serializer 
  51  from ganeti import uidpool 
  52  from ganeti import netutils 
  53  from ganeti import runtime 
  54  from ganeti import pathutils 
  55  from ganeti import network 
  56   
  57   
  58  _config_lock = locking.SharedLock("ConfigWriter") 
  59   
  60  # job id used for resource management at config upgrade time 
  61  _UPGRADE_CONFIG_JID = "jid-cfg-upgrade" 
62 63 64 -def _ValidateConfig(data):
65 """Verifies that a configuration objects looks valid. 66 67 This only verifies the version of the configuration. 68 69 @raise errors.ConfigurationError: if the version differs from what 70 we expect 71 72 """ 73 if data.version != constants.CONFIG_VERSION: 74 raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
75
76 77 -class TemporaryReservationManager:
78 """A temporary resource reservation manager. 79 80 This is used to reserve resources in a job, before using them, making sure 81 other jobs cannot get them in the meantime. 82 83 """
84 - def __init__(self):
85 self._ec_reserved = {}
86
87 - def Reserved(self, resource):
88 for holder_reserved in self._ec_reserved.values(): 89 if resource in holder_reserved: 90 return True 91 return False
92
93 - def Reserve(self, ec_id, resource):
94 if self.Reserved(resource): 95 raise errors.ReservationError("Duplicate reservation for resource '%s'" 96 % str(resource)) 97 if ec_id not in self._ec_reserved: 98 self._ec_reserved[ec_id] = set([resource]) 99 else: 100 self._ec_reserved[ec_id].add(resource)
101
102 - def DropECReservations(self, ec_id):
103 if ec_id in self._ec_reserved: 104 del self._ec_reserved[ec_id]
105
106 - def GetReserved(self):
107 all_reserved = set() 108 for holder_reserved in self._ec_reserved.values(): 109 all_reserved.update(holder_reserved) 110 return all_reserved
111
112 - def GetECReserved(self, ec_id):
113 """ Used when you want to retrieve all reservations for a specific 114 execution context. E.g when commiting reserved IPs for a specific 115 network. 116 117 """ 118 ec_reserved = set() 119 if ec_id in self._ec_reserved: 120 ec_reserved.update(self._ec_reserved[ec_id]) 121 return ec_reserved
122
123 - def Generate(self, existing, generate_one_fn, ec_id):
124 """Generate a new resource of this type 125 126 """ 127 assert callable(generate_one_fn) 128 129 all_elems = self.GetReserved() 130 all_elems.update(existing) 131 retries = 64 132 while retries > 0: 133 new_resource = generate_one_fn() 134 if new_resource is not None and new_resource not in all_elems: 135 break 136 else: 137 raise errors.ConfigurationError("Not able generate new resource" 138 " (last tried: %s)" % new_resource) 139 self.Reserve(ec_id, new_resource) 140 return new_resource
141
142 143 -def _MatchNameComponentIgnoreCase(short_name, names):
144 """Wrapper around L{utils.text.MatchNameComponent}. 145 146 """ 147 return utils.MatchNameComponent(short_name, names, case_sensitive=False)
148
149 150 -def _CheckInstanceDiskIvNames(disks):
151 """Checks if instance's disks' C{iv_name} attributes are in order. 152 153 @type disks: list of L{objects.Disk} 154 @param disks: List of disks 155 @rtype: list of tuples; (int, string, string) 156 @return: List of wrongly named disks, each tuple contains disk index, 157 expected and actual name 158 159 """ 160 result = [] 161 162 for (idx, disk) in enumerate(disks): 163 exp_iv_name = "disk/%s" % idx 164 if disk.iv_name != exp_iv_name: 165 result.append((idx, exp_iv_name, disk.iv_name)) 166 167 return result
168
169 170 -class ConfigWriter:
171 """The interface to the cluster configuration. 172 173 @ivar _temporary_lvs: reservation manager for temporary LVs 174 @ivar _all_rms: a list of all temporary reservation managers 175 176 """
177 - def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts, 178 accept_foreign=False):
179 self.write_count = 0 180 self._lock = _config_lock 181 self._config_data = None 182 self._offline = offline 183 if cfg_file is None: 184 self._cfg_file = pathutils.CLUSTER_CONF_FILE 185 else: 186 self._cfg_file = cfg_file 187 self._getents = _getents 188 self._temporary_ids = TemporaryReservationManager() 189 self._temporary_drbds = {} 190 self._temporary_macs = TemporaryReservationManager() 191 self._temporary_secrets = TemporaryReservationManager() 192 self._temporary_lvs = TemporaryReservationManager() 193 self._temporary_ips = TemporaryReservationManager() 194 self._all_rms = [self._temporary_ids, self._temporary_macs, 195 self._temporary_secrets, self._temporary_lvs, 196 self._temporary_ips] 197 # Note: in order to prevent errors when resolving our name in 198 # _DistributeConfig, we compute it here once and reuse it; it's 199 # better to raise an error before starting to modify the config 200 # file than after it was modified 201 self._my_hostname = netutils.Hostname.GetSysName() 202 self._last_cluster_serial = -1 203 self._cfg_id = None 204 self._context = None 205 self._OpenConfig(accept_foreign)
206
207 - def _GetRpc(self, address_list):
208 """Returns RPC runner for configuration. 209 210 """ 211 return rpc.ConfigRunner(self._context, address_list)
212
213 - def SetContext(self, context):
214 """Sets Ganeti context. 215 216 """ 217 self._context = context
218 219 # this method needs to be static, so that we can call it on the class 220 @staticmethod
221 - def IsCluster():
222 """Check if the cluster is configured. 223 224 """ 225 return os.path.exists(pathutils.CLUSTER_CONF_FILE)
226 227 @locking.ssynchronized(_config_lock, shared=1)
228 - def GetNdParams(self, node):
229 """Get the node params populated with cluster defaults. 230 231 @type node: L{objects.Node} 232 @param node: The node we want to know the params for 233 @return: A dict with the filled in node params 234 235 """ 236 nodegroup = self._UnlockedGetNodeGroup(node.group) 237 return self._config_data.cluster.FillND(node, nodegroup)
238 239 @locking.ssynchronized(_config_lock, shared=1)
240 - def GetInstanceDiskParams(self, instance):
241 """Get the disk params populated with inherit chain. 242 243 @type instance: L{objects.Instance} 244 @param instance: The instance we want to know the params for 245 @return: A dict with the filled in disk params 246 247 """ 248 node = self._UnlockedGetNodeInfo(instance.primary_node) 249 nodegroup = self._UnlockedGetNodeGroup(node.group) 250 return self._UnlockedGetGroupDiskParams(nodegroup)
251 252 @locking.ssynchronized(_config_lock, shared=1)
253 - def GetGroupDiskParams(self, group):
254 """Get the disk params populated with inherit chain. 255 256 @type group: L{objects.NodeGroup} 257 @param group: The group we want to know the params for 258 @return: A dict with the filled in disk params 259 260 """ 261 return self._UnlockedGetGroupDiskParams(group)
262
263 - def _UnlockedGetGroupDiskParams(self, group):
264 """Get the disk params populated with inherit chain down to node-group. 265 266 @type group: L{objects.NodeGroup} 267 @param group: The group we want to know the params for 268 @return: A dict with the filled in disk params 269 270 """ 271 return self._config_data.cluster.SimpleFillDP(group.diskparams)
272
273 - def _UnlockedGetNetworkMACPrefix(self, net_uuid):
274 """Return the network mac prefix if it exists or the cluster level default. 275 276 """ 277 prefix = None 278 if net_uuid: 279 nobj = self._UnlockedGetNetwork(net_uuid) 280 if nobj.mac_prefix: 281 prefix = nobj.mac_prefix 282 283 return prefix
284
285 - def _GenerateOneMAC(self, prefix=None):
286 """Return a function that randomly generates a MAC suffic 287 and appends it to the given prefix. If prefix is not given get 288 the cluster level default. 289 290 """ 291 if not prefix: 292 prefix = self._config_data.cluster.mac_prefix 293 294 def GenMac(): 295 byte1 = random.randrange(0, 256) 296 byte2 = random.randrange(0, 256) 297 byte3 = random.randrange(0, 256) 298 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3) 299 return mac
300 301 return GenMac
302 303 @locking.ssynchronized(_config_lock, shared=1)
304 - def GenerateMAC(self, net_uuid, ec_id):
305 """Generate a MAC for an instance. 306 307 This should check the current instances for duplicates. 308 309 """ 310 existing = self._AllMACs() 311 prefix = self._UnlockedGetNetworkMACPrefix(net_uuid) 312 gen_mac = self._GenerateOneMAC(prefix) 313 return self._temporary_ids.Generate(existing, gen_mac, ec_id)
314 315 @locking.ssynchronized(_config_lock, shared=1)
316 - def ReserveMAC(self, mac, ec_id):
317 """Reserve a MAC for an instance. 318 319 This only checks instances managed by this cluster, it does not 320 check for potential collisions elsewhere. 321 322 """ 323 all_macs = self._AllMACs() 324 if mac in all_macs: 325 raise errors.ReservationError("mac already in use") 326 else: 327 self._temporary_macs.Reserve(ec_id, mac)
328
329 - def _UnlockedCommitTemporaryIps(self, ec_id):
330 """Commit all reserved IP address to their respective pools 331 332 """ 333 for action, address, net_uuid in self._temporary_ips.GetECReserved(ec_id): 334 self._UnlockedCommitIp(action, net_uuid, address)
335
336 - def _UnlockedCommitIp(self, action, net_uuid, address):
337 """Commit a reserved IP address to an IP pool. 338 339 The IP address is taken from the network's IP pool and marked as reserved. 340 341 """ 342 nobj = self._UnlockedGetNetwork(net_uuid) 343 pool = network.AddressPool(nobj) 344 if action == constants.RESERVE_ACTION: 345 pool.Reserve(address) 346 elif action == constants.RELEASE_ACTION: 347 pool.Release(address)
348
349 - def _UnlockedReleaseIp(self, net_uuid, address, ec_id):
350 """Give a specific IP address back to an IP pool. 351 352 The IP address is returned to the IP pool designated by pool_id and marked 353 as reserved. 354 355 """ 356 self._temporary_ips.Reserve(ec_id, 357 (constants.RELEASE_ACTION, address, net_uuid))
358 359 @locking.ssynchronized(_config_lock, shared=1)
360 - def ReleaseIp(self, net_uuid, address, ec_id):
361 """Give a specified IP address back to an IP pool. 362 363 This is just a wrapper around _UnlockedReleaseIp. 364 365 """ 366 if net_uuid: 367 self._UnlockedReleaseIp(net_uuid, address, ec_id)
368 369 @locking.ssynchronized(_config_lock, shared=1)
370 - def GenerateIp(self, net_uuid, ec_id):
371 """Find a free IPv4 address for an instance. 372 373 """ 374 nobj = self._UnlockedGetNetwork(net_uuid) 375 pool = network.AddressPool(nobj) 376 377 def gen_one(): 378 try: 379 ip = pool.GenerateFree() 380 except errors.AddressPoolError: 381 raise errors.ReservationError("Cannot generate IP. Network is full") 382 return (constants.RESERVE_ACTION, ip, net_uuid)
383 384 _, address, _ = self._temporary_ips.Generate([], gen_one, ec_id) 385 return address 386
387 - def _UnlockedReserveIp(self, net_uuid, address, ec_id):
388 """Reserve a given IPv4 address for use by an instance. 389 390 """ 391 nobj = self._UnlockedGetNetwork(net_uuid) 392 pool = network.AddressPool(nobj) 393 try: 394 isreserved = pool.IsReserved(address) 395 except errors.AddressPoolError: 396 raise errors.ReservationError("IP address not in network") 397 if isreserved: 398 raise errors.ReservationError("IP address already in use") 399 400 return self._temporary_ips.Reserve(ec_id, 401 (constants.RESERVE_ACTION, 402 address, net_uuid))
403 404 @locking.ssynchronized(_config_lock, shared=1)
405 - def ReserveIp(self, net_uuid, address, ec_id):
406 """Reserve a given IPv4 address for use by an instance. 407 408 """ 409 if net_uuid: 410 return self._UnlockedReserveIp(net_uuid, address, ec_id)
411 412 @locking.ssynchronized(_config_lock, shared=1)
413 - def ReserveLV(self, lv_name, ec_id):
414 """Reserve an VG/LV pair for an instance. 415 416 @type lv_name: string 417 @param lv_name: the logical volume name to reserve 418 419 """ 420 all_lvs = self._AllLVs() 421 if lv_name in all_lvs: 422 raise errors.ReservationError("LV already in use") 423 else: 424 self._temporary_lvs.Reserve(ec_id, lv_name)
425 426 @locking.ssynchronized(_config_lock, shared=1)
427 - def GenerateDRBDSecret(self, ec_id):
428 """Generate a DRBD secret. 429 430 This checks the current disks for duplicates. 431 432 """ 433 return self._temporary_secrets.Generate(self._AllDRBDSecrets(), 434 utils.GenerateSecret, 435 ec_id)
436
437 - def _AllLVs(self):
438 """Compute the list of all LVs. 439 440 """ 441 lvnames = set() 442 for instance in self._config_data.instances.values(): 443 node_data = instance.MapLVsByNode() 444 for lv_list in node_data.values(): 445 lvnames.update(lv_list) 446 return lvnames
447
448 - def _AllIDs(self, include_temporary):
449 """Compute the list of all UUIDs and names we have. 450 451 @type include_temporary: boolean 452 @param include_temporary: whether to include the _temporary_ids set 453 @rtype: set 454 @return: a set of IDs 455 456 """ 457 existing = set() 458 if include_temporary: 459 existing.update(self._temporary_ids.GetReserved()) 460 existing.update(self._AllLVs()) 461 existing.update(self._config_data.instances.keys()) 462 existing.update(self._config_data.nodes.keys()) 463 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid]) 464 return existing
465
466 - def _GenerateUniqueID(self, ec_id):
467 """Generate an unique UUID. 468 469 This checks the current node, instances and disk names for 470 duplicates. 471 472 @rtype: string 473 @return: the unique id 474 475 """ 476 existing = self._AllIDs(include_temporary=False) 477 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
478 479 @locking.ssynchronized(_config_lock, shared=1)
480 - def GenerateUniqueID(self, ec_id):
481 """Generate an unique ID. 482 483 This is just a wrapper over the unlocked version. 484 485 @type ec_id: string 486 @param ec_id: unique id for the job to reserve the id to 487 488 """ 489 return self._GenerateUniqueID(ec_id)
490
491 - def _AllMACs(self):
492 """Return all MACs present in the config. 493 494 @rtype: list 495 @return: the list of all MACs 496 497 """ 498 result = [] 499 for instance in self._config_data.instances.values(): 500 for nic in instance.nics: 501 result.append(nic.mac) 502 503 return result
504
505 - def _AllDRBDSecrets(self):
506 """Return all DRBD secrets present in the config. 507 508 @rtype: list 509 @return: the list of all DRBD secrets 510 511 """ 512 def helper(disk, result): 513 """Recursively gather secrets from this disk.""" 514 if disk.dev_type == constants.DT_DRBD8: 515 result.append(disk.logical_id[5]) 516 if disk.children: 517 for child in disk.children: 518 helper(child, result)
519 520 result = [] 521 for instance in self._config_data.instances.values(): 522 for disk in instance.disks: 523 helper(disk, result) 524 525 return result 526
527 - def _CheckDiskIDs(self, disk, l_ids, p_ids):
528 """Compute duplicate disk IDs 529 530 @type disk: L{objects.Disk} 531 @param disk: the disk at which to start searching 532 @type l_ids: list 533 @param l_ids: list of current logical ids 534 @type p_ids: list 535 @param p_ids: list of current physical ids 536 @rtype: list 537 @return: a list of error messages 538 539 """ 540 result = [] 541 if disk.logical_id is not None: 542 if disk.logical_id in l_ids: 543 result.append("duplicate logical id %s" % str(disk.logical_id)) 544 else: 545 l_ids.append(disk.logical_id) 546 if disk.physical_id is not None: 547 if disk.physical_id in p_ids: 548 result.append("duplicate physical id %s" % str(disk.physical_id)) 549 else: 550 p_ids.append(disk.physical_id) 551 552 if disk.children: 553 for child in disk.children: 554 result.extend(self._CheckDiskIDs(child, l_ids, p_ids)) 555 return result
556
557 - def _UnlockedVerifyConfig(self):
558 """Verify function. 559 560 @rtype: list 561 @return: a list of error messages; a non-empty list signifies 562 configuration errors 563 564 """ 565 # pylint: disable=R0914 566 result = [] 567 seen_macs = [] 568 ports = {} 569 data = self._config_data 570 cluster = data.cluster 571 seen_lids = [] 572 seen_pids = [] 573 574 # global cluster checks 575 if not cluster.enabled_hypervisors: 576 result.append("enabled hypervisors list doesn't have any entries") 577 invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES 578 if invalid_hvs: 579 result.append("enabled hypervisors contains invalid entries: %s" % 580 invalid_hvs) 581 missing_hvp = (set(cluster.enabled_hypervisors) - 582 set(cluster.hvparams.keys())) 583 if missing_hvp: 584 result.append("hypervisor parameters missing for the enabled" 585 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp)) 586 587 if cluster.master_node not in data.nodes: 588 result.append("cluster has invalid primary node '%s'" % 589 cluster.master_node) 590 591 def _helper(owner, attr, value, template): 592 try: 593 utils.ForceDictType(value, template) 594 except errors.GenericError, err: 595 result.append("%s has invalid %s: %s" % (owner, attr, err))
596 597 def _helper_nic(owner, params): 598 try: 599 objects.NIC.CheckParameterSyntax(params) 600 except errors.ConfigurationError, err: 601 result.append("%s has invalid nicparams: %s" % (owner, err)) 602 603 def _helper_ipolicy(owner, params, check_std): 604 try: 605 objects.InstancePolicy.CheckParameterSyntax(params, check_std) 606 except errors.ConfigurationError, err: 607 result.append("%s has invalid instance policy: %s" % (owner, err)) 608 609 def _helper_ispecs(owner, params): 610 for key, value in params.items(): 611 if key in constants.IPOLICY_ISPECS: 612 fullkey = "ipolicy/" + key 613 _helper(owner, fullkey, value, constants.ISPECS_PARAMETER_TYPES) 614 else: 615 # FIXME: assuming list type 616 if key in constants.IPOLICY_PARAMETERS: 617 exp_type = float 618 else: 619 exp_type = list 620 if not isinstance(value, exp_type): 621 result.append("%s has invalid instance policy: for %s," 622 " expecting %s, got %s" % 623 (owner, key, exp_type.__name__, type(value))) 624 625 # check cluster parameters 626 _helper("cluster", "beparams", cluster.SimpleFillBE({}), 627 constants.BES_PARAMETER_TYPES) 628 _helper("cluster", "nicparams", cluster.SimpleFillNIC({}), 629 constants.NICS_PARAMETER_TYPES) 630 _helper_nic("cluster", cluster.SimpleFillNIC({})) 631 _helper("cluster", "ndparams", cluster.SimpleFillND({}), 632 constants.NDS_PARAMETER_TYPES) 633 _helper_ipolicy("cluster", cluster.SimpleFillIPolicy({}), True) 634 _helper_ispecs("cluster", cluster.SimpleFillIPolicy({})) 635 636 # per-instance checks 637 for instance_name in data.instances: 638 instance = data.instances[instance_name] 639 if instance.name != instance_name: 640 result.append("instance '%s' is indexed by wrong name '%s'" % 641 (instance.name, instance_name)) 642 if instance.primary_node not in data.nodes: 643 result.append("instance '%s' has invalid primary node '%s'" % 644 (instance_name, instance.primary_node)) 645 for snode in instance.secondary_nodes: 646 if snode not in data.nodes: 647 result.append("instance '%s' has invalid secondary node '%s'" % 648 (instance_name, snode)) 649 for idx, nic in enumerate(instance.nics): 650 if nic.mac in seen_macs: 651 result.append("instance '%s' has NIC %d mac %s duplicate" % 652 (instance_name, idx, nic.mac)) 653 else: 654 seen_macs.append(nic.mac) 655 if nic.nicparams: 656 filled = cluster.SimpleFillNIC(nic.nicparams) 657 owner = "instance %s nic %d" % (instance.name, idx) 658 _helper(owner, "nicparams", 659 filled, constants.NICS_PARAMETER_TYPES) 660 _helper_nic(owner, filled) 661 662 # parameter checks 663 if instance.beparams: 664 _helper("instance %s" % instance.name, "beparams", 665 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES) 666 667 # gather the drbd ports for duplicate checks 668 for (idx, dsk) in enumerate(instance.disks): 669 if dsk.dev_type in constants.LDS_DRBD: 670 tcp_port = dsk.logical_id[2] 671 if tcp_port not in ports: 672 ports[tcp_port] = [] 673 ports[tcp_port].append((instance.name, "drbd disk %s" % idx)) 674 # gather network port reservation 675 net_port = getattr(instance, "network_port", None) 676 if net_port is not None: 677 if net_port not in ports: 678 ports[net_port] = [] 679 ports[net_port].append((instance.name, "network port")) 680 681 # instance disk verify 682 for idx, disk in enumerate(instance.disks): 683 result.extend(["instance '%s' disk %d error: %s" % 684 (instance.name, idx, msg) for msg in disk.Verify()]) 685 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids)) 686 687 wrong_names = _CheckInstanceDiskIvNames(instance.disks) 688 if wrong_names: 689 tmp = "; ".join(("name of disk %s should be '%s', but is '%s'" % 690 (idx, exp_name, actual_name)) 691 for (idx, exp_name, actual_name) in wrong_names) 692 693 result.append("Instance '%s' has wrongly named disks: %s" % 694 (instance.name, tmp)) 695 696 # cluster-wide pool of free ports 697 for free_port in cluster.tcpudp_port_pool: 698 if free_port not in ports: 699 ports[free_port] = [] 700 ports[free_port].append(("cluster", "port marked as free")) 701 702 # compute tcp/udp duplicate ports 703 keys = ports.keys() 704 keys.sort() 705 for pnum in keys: 706 pdata = ports[pnum] 707 if len(pdata) > 1: 708 txt = utils.CommaJoin(["%s/%s" % val for val in pdata]) 709 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt)) 710 711 # highest used tcp port check 712 if keys: 713 if keys[-1] > cluster.highest_used_port: 714 result.append("Highest used port mismatch, saved %s, computed %s" % 715 (cluster.highest_used_port, keys[-1])) 716 717 if not data.nodes[cluster.master_node].master_candidate: 718 result.append("Master node is not a master candidate") 719 720 # master candidate checks 721 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats() 722 if mc_now < mc_max: 723 result.append("Not enough master candidates: actual %d, target %d" % 724 (mc_now, mc_max)) 725 726 # node checks 727 for node_name, node in data.nodes.items(): 728 if node.name != node_name: 729 result.append("Node '%s' is indexed by wrong name '%s'" % 730 (node.name, node_name)) 731 if [node.master_candidate, node.drained, node.offline].count(True) > 1: 732 result.append("Node %s state is invalid: master_candidate=%s," 733 " drain=%s, offline=%s" % 734 (node.name, node.master_candidate, node.drained, 735 node.offline)) 736 if node.group not in data.nodegroups: 737 result.append("Node '%s' has invalid group '%s'" % 738 (node.name, node.group)) 739 else: 740 _helper("node %s" % node.name, "ndparams", 741 cluster.FillND(node, data.nodegroups[node.group]), 742 constants.NDS_PARAMETER_TYPES) 743 used_globals = constants.NDC_GLOBALS.intersection(node.ndparams) 744 if used_globals: 745 result.append("Node '%s' has some global parameters set: %s" % 746 (node.name, utils.CommaJoin(used_globals))) 747 748 # nodegroups checks 749 nodegroups_names = set() 750 for nodegroup_uuid in data.nodegroups: 751 nodegroup = data.nodegroups[nodegroup_uuid] 752 if nodegroup.uuid != nodegroup_uuid: 753 result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'" 754 % (nodegroup.name, nodegroup.uuid, nodegroup_uuid)) 755 if utils.UUID_RE.match(nodegroup.name.lower()): 756 result.append("node group '%s' (uuid: '%s') has uuid-like name" % 757 (nodegroup.name, nodegroup.uuid)) 758 if nodegroup.name in nodegroups_names: 759 result.append("duplicate node group name '%s'" % nodegroup.name) 760 else: 761 nodegroups_names.add(nodegroup.name) 762 group_name = "group %s" % nodegroup.name 763 _helper_ipolicy(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy), 764 False) 765 _helper_ispecs(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy)) 766 if nodegroup.ndparams: 767 _helper(group_name, "ndparams", 768 cluster.SimpleFillND(nodegroup.ndparams), 769 constants.NDS_PARAMETER_TYPES) 770 771 # drbd minors check 772 _, duplicates = self._UnlockedComputeDRBDMap() 773 for node, minor, instance_a, instance_b in duplicates: 774 result.append("DRBD minor %d on node %s is assigned twice to instances" 775 " %s and %s" % (minor, node, instance_a, instance_b)) 776 777 # IP checks 778 default_nicparams = cluster.nicparams[constants.PP_DEFAULT] 779 ips = {} 780 781 def _AddIpAddress(ip, name): 782 ips.setdefault(ip, []).append(name) 783 784 _AddIpAddress(cluster.master_ip, "cluster_ip") 785 786 for node in data.nodes.values(): 787 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name) 788 if node.secondary_ip != node.primary_ip: 789 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name) 790 791 for instance in data.instances.values(): 792 for idx, nic in enumerate(instance.nics): 793 if nic.ip is None: 794 continue 795 796 nicparams = objects.FillDict(default_nicparams, nic.nicparams) 797 nic_mode = nicparams[constants.NIC_MODE] 798 nic_link = nicparams[constants.NIC_LINK] 799 800 if nic_mode == constants.NIC_MODE_BRIDGED: 801 link = "bridge:%s" % nic_link 802 elif nic_mode == constants.NIC_MODE_ROUTED: 803 link = "route:%s" % nic_link 804 else: 805 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode) 806 807 _AddIpAddress("%s/%s/%s" % (link, nic.ip, nic.network), 808 "instance:%s/nic:%d" % (instance.name, idx)) 809 810 for ip, owners in ips.items(): 811 if len(owners) > 1: 812 result.append("IP address %s is used by multiple owners: %s" % 813 (ip, utils.CommaJoin(owners))) 814 815 return result 816 817 @locking.ssynchronized(_config_lock, shared=1)
818 - def VerifyConfig(self):
819 """Verify function. 820 821 This is just a wrapper over L{_UnlockedVerifyConfig}. 822 823 @rtype: list 824 @return: a list of error messages; a non-empty list signifies 825 configuration errors 826 827 """ 828 return self._UnlockedVerifyConfig()
829
830 - def _UnlockedSetDiskID(self, disk, node_name):
831 """Convert the unique ID to the ID needed on the target nodes. 832 833 This is used only for drbd, which needs ip/port configuration. 834 835 The routine descends down and updates its children also, because 836 this helps when the only the top device is passed to the remote 837 node. 838 839 This function is for internal use, when the config lock is already held. 840 841 """ 842 if disk.children: 843 for child in disk.children: 844 self._UnlockedSetDiskID(child, node_name) 845 846 if disk.logical_id is None and disk.physical_id is not None: 847 return 848 if disk.dev_type == constants.LD_DRBD8: 849 pnode, snode, port, pminor, sminor, secret = disk.logical_id 850 if node_name not in (pnode, snode): 851 raise errors.ConfigurationError("DRBD device not knowing node %s" % 852 node_name) 853 pnode_info = self._UnlockedGetNodeInfo(pnode) 854 snode_info = self._UnlockedGetNodeInfo(snode) 855 if pnode_info is None or snode_info is None: 856 raise errors.ConfigurationError("Can't find primary or secondary node" 857 " for %s" % str(disk)) 858 p_data = (pnode_info.secondary_ip, port) 859 s_data = (snode_info.secondary_ip, port) 860 if pnode == node_name: 861 disk.physical_id = p_data + s_data + (pminor, secret) 862 else: # it must be secondary, we tested above 863 disk.physical_id = s_data + p_data + (sminor, secret) 864 else: 865 disk.physical_id = disk.logical_id 866 return
867 868 @locking.ssynchronized(_config_lock)
869 - def SetDiskID(self, disk, node_name):
870 """Convert the unique ID to the ID needed on the target nodes. 871 872 This is used only for drbd, which needs ip/port configuration. 873 874 The routine descends down and updates its children also, because 875 this helps when the only the top device is passed to the remote 876 node. 877 878 """ 879 return self._UnlockedSetDiskID(disk, node_name)
880 881 @locking.ssynchronized(_config_lock)
882 - def AddTcpUdpPort(self, port):
883 """Adds a new port to the available port pool. 884 885 @warning: this method does not "flush" the configuration (via 886 L{_WriteConfig}); callers should do that themselves once the 887 configuration is stable 888 889 """ 890 if not isinstance(port, int): 891 raise errors.ProgrammerError("Invalid type passed for port") 892 893 self._config_data.cluster.tcpudp_port_pool.add(port)
894 895 @locking.ssynchronized(_config_lock, shared=1)
896 - def GetPortList(self):
897 """Returns a copy of the current port list. 898 899 """ 900 return self._config_data.cluster.tcpudp_port_pool.copy()
901 902 @locking.ssynchronized(_config_lock)
903 - def AllocatePort(self):
904 """Allocate a port. 905 906 The port will be taken from the available port pool or from the 907 default port range (and in this case we increase 908 highest_used_port). 909 910 """ 911 # If there are TCP/IP ports configured, we use them first. 912 if self._config_data.cluster.tcpudp_port_pool: 913 port = self._config_data.cluster.tcpudp_port_pool.pop() 914 else: 915 port = self._config_data.cluster.highest_used_port + 1 916 if port >= constants.LAST_DRBD_PORT: 917 raise errors.ConfigurationError("The highest used port is greater" 918 " than %s. Aborting." % 919 constants.LAST_DRBD_PORT) 920 self._config_data.cluster.highest_used_port = port 921 922 self._WriteConfig() 923 return port
924
925 - def _UnlockedComputeDRBDMap(self):
926 """Compute the used DRBD minor/nodes. 927 928 @rtype: (dict, list) 929 @return: dictionary of node_name: dict of minor: instance_name; 930 the returned dict will have all the nodes in it (even if with 931 an empty list), and a list of duplicates; if the duplicates 932 list is not empty, the configuration is corrupted and its caller 933 should raise an exception 934 935 """ 936 def _AppendUsedPorts(instance_name, disk, used): 937 duplicates = [] 938 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5: 939 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5] 940 for node, port in ((node_a, minor_a), (node_b, minor_b)): 941 assert node in used, ("Node '%s' of instance '%s' not found" 942 " in node list" % (node, instance_name)) 943 if port in used[node]: 944 duplicates.append((node, port, instance_name, used[node][port])) 945 else: 946 used[node][port] = instance_name 947 if disk.children: 948 for child in disk.children: 949 duplicates.extend(_AppendUsedPorts(instance_name, child, used)) 950 return duplicates
951 952 duplicates = [] 953 my_dict = dict((node, {}) for node in self._config_data.nodes) 954 for instance in self._config_data.instances.itervalues(): 955 for disk in instance.disks: 956 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict)) 957 for (node, minor), instance in self._temporary_drbds.iteritems(): 958 if minor in my_dict[node] and my_dict[node][minor] != instance: 959 duplicates.append((node, minor, instance, my_dict[node][minor])) 960 else: 961 my_dict[node][minor] = instance 962 return my_dict, duplicates 963 964 @locking.ssynchronized(_config_lock)
965 - def ComputeDRBDMap(self):
966 """Compute the used DRBD minor/nodes. 967 968 This is just a wrapper over L{_UnlockedComputeDRBDMap}. 969 970 @return: dictionary of node_name: dict of minor: instance_name; 971 the returned dict will have all the nodes in it (even if with 972 an empty list). 973 974 """ 975 d_map, duplicates = self._UnlockedComputeDRBDMap() 976 if duplicates: 977 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 978 str(duplicates)) 979 return d_map
980 981 @locking.ssynchronized(_config_lock)
982 - def AllocateDRBDMinor(self, nodes, instance):
983 """Allocate a drbd minor. 984 985 The free minor will be automatically computed from the existing 986 devices. A node can be given multiple times in order to allocate 987 multiple minors. The result is the list of minors, in the same 988 order as the passed nodes. 989 990 @type instance: string 991 @param instance: the instance for which we allocate minors 992 993 """ 994 assert isinstance(instance, basestring), \ 995 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance 996 997 d_map, duplicates = self._UnlockedComputeDRBDMap() 998 if duplicates: 999 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 1000 str(duplicates)) 1001 result = [] 1002 for nname in nodes: 1003 ndata = d_map[nname] 1004 if not ndata: 1005 # no minors used, we can start at 0 1006 result.append(0) 1007 ndata[0] = instance 1008 self._temporary_drbds[(nname, 0)] = instance 1009 continue 1010 keys = ndata.keys() 1011 keys.sort() 1012 ffree = utils.FirstFree(keys) 1013 if ffree is None: 1014 # return the next minor 1015 # TODO: implement high-limit check 1016 minor = keys[-1] + 1 1017 else: 1018 minor = ffree 1019 # double-check minor against current instances 1020 assert minor not in d_map[nname], \ 1021 ("Attempt to reuse allocated DRBD minor %d on node %s," 1022 " already allocated to instance %s" % 1023 (minor, nname, d_map[nname][minor])) 1024 ndata[minor] = instance 1025 # double-check minor against reservation 1026 r_key = (nname, minor) 1027 assert r_key not in self._temporary_drbds, \ 1028 ("Attempt to reuse reserved DRBD minor %d on node %s," 1029 " reserved for instance %s" % 1030 (minor, nname, self._temporary_drbds[r_key])) 1031 self._temporary_drbds[r_key] = instance 1032 result.append(minor) 1033 logging.debug("Request to allocate drbd minors, input: %s, returning %s", 1034 nodes, result) 1035 return result
1036
1037 - def _UnlockedReleaseDRBDMinors(self, instance):
1038 """Release temporary drbd minors allocated for a given instance. 1039 1040 @type instance: string 1041 @param instance: the instance for which temporary minors should be 1042 released 1043 1044 """ 1045 assert isinstance(instance, basestring), \ 1046 "Invalid argument passed to ReleaseDRBDMinors" 1047 for key, name in self._temporary_drbds.items(): 1048 if name == instance: 1049 del self._temporary_drbds[key]
1050 1051 @locking.ssynchronized(_config_lock)
1052 - def ReleaseDRBDMinors(self, instance):
1053 """Release temporary drbd minors allocated for a given instance. 1054 1055 This should be called on the error paths, on the success paths 1056 it's automatically called by the ConfigWriter add and update 1057 functions. 1058 1059 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}. 1060 1061 @type instance: string 1062 @param instance: the instance for which temporary minors should be 1063 released 1064 1065 """ 1066 self._UnlockedReleaseDRBDMinors(instance)
1067 1068 @locking.ssynchronized(_config_lock, shared=1)
1069 - def GetConfigVersion(self):
1070 """Get the configuration version. 1071 1072 @return: Config version 1073 1074 """ 1075 return self._config_data.version
1076 1077 @locking.ssynchronized(_config_lock, shared=1)
1078 - def GetClusterName(self):
1079 """Get cluster name. 1080 1081 @return: Cluster name 1082 1083 """ 1084 return self._config_data.cluster.cluster_name
1085 1086 @locking.ssynchronized(_config_lock, shared=1)
1087 - def GetMasterNode(self):
1088 """Get the hostname of the master node for this cluster. 1089 1090 @return: Master hostname 1091 1092 """ 1093 return self._config_data.cluster.master_node
1094 1095 @locking.ssynchronized(_config_lock, shared=1)
1096 - def GetMasterIP(self):
1097 """Get the IP of the master node for this cluster. 1098 1099 @return: Master IP 1100 1101 """ 1102 return self._config_data.cluster.master_ip
1103 1104 @locking.ssynchronized(_config_lock, shared=1)
1105 - def GetMasterNetdev(self):
1106 """Get the master network device for this cluster. 1107 1108 """ 1109 return self._config_data.cluster.master_netdev
1110 1111 @locking.ssynchronized(_config_lock, shared=1)
1112 - def GetMasterNetmask(self):
1113 """Get the netmask of the master node for this cluster. 1114 1115 """ 1116 return self._config_data.cluster.master_netmask
1117 1118 @locking.ssynchronized(_config_lock, shared=1)
1119 - def GetUseExternalMipScript(self):
1120 """Get flag representing whether to use the external master IP setup script. 1121 1122 """ 1123 return self._config_data.cluster.use_external_mip_script
1124 1125 @locking.ssynchronized(_config_lock, shared=1)
1126 - def GetFileStorageDir(self):
1127 """Get the file storage dir for this cluster. 1128 1129 """ 1130 return self._config_data.cluster.file_storage_dir
1131 1132 @locking.ssynchronized(_config_lock, shared=1)
1133 - def GetSharedFileStorageDir(self):
1134 """Get the shared file storage dir for this cluster. 1135 1136 """ 1137 return self._config_data.cluster.shared_file_storage_dir
1138 1139 @locking.ssynchronized(_config_lock, shared=1)
1140 - def GetHypervisorType(self):
1141 """Get the hypervisor type for this cluster. 1142 1143 """ 1144 return self._config_data.cluster.enabled_hypervisors[0]
1145 1146 @locking.ssynchronized(_config_lock, shared=1)
1147 - def GetHostKey(self):
1148 """Return the rsa hostkey from the config. 1149 1150 @rtype: string 1151 @return: the rsa hostkey 1152 1153 """ 1154 return self._config_data.cluster.rsahostkeypub
1155 1156 @locking.ssynchronized(_config_lock, shared=1)
1157 - def GetDefaultIAllocator(self):
1158 """Get the default instance allocator for this cluster. 1159 1160 """ 1161 return self._config_data.cluster.default_iallocator
1162 1163 @locking.ssynchronized(_config_lock, shared=1)
1164 - def GetPrimaryIPFamily(self):
1165 """Get cluster primary ip family. 1166 1167 @return: primary ip family 1168 1169 """ 1170 return self._config_data.cluster.primary_ip_family
1171 1172 @locking.ssynchronized(_config_lock, shared=1)
1173 - def GetMasterNetworkParameters(self):
1174 """Get network parameters of the master node. 1175 1176 @rtype: L{object.MasterNetworkParameters} 1177 @return: network parameters of the master node 1178 1179 """ 1180 cluster = self._config_data.cluster 1181 result = objects.MasterNetworkParameters( 1182 name=cluster.master_node, ip=cluster.master_ip, 1183 netmask=cluster.master_netmask, netdev=cluster.master_netdev, 1184 ip_family=cluster.primary_ip_family) 1185 1186 return result
1187 1188 @locking.ssynchronized(_config_lock)
1189 - def AddNodeGroup(self, group, ec_id, check_uuid=True):
1190 """Add a node group to the configuration. 1191 1192 This method calls group.UpgradeConfig() to fill any missing attributes 1193 according to their default values. 1194 1195 @type group: L{objects.NodeGroup} 1196 @param group: the NodeGroup object to add 1197 @type ec_id: string 1198 @param ec_id: unique id for the job to use when creating a missing UUID 1199 @type check_uuid: bool 1200 @param check_uuid: add an UUID to the group if it doesn't have one or, if 1201 it does, ensure that it does not exist in the 1202 configuration already 1203 1204 """ 1205 self._UnlockedAddNodeGroup(group, ec_id, check_uuid) 1206 self._WriteConfig()
1207
1208 - def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
1209 """Add a node group to the configuration. 1210 1211 """ 1212 logging.info("Adding node group %s to configuration", group.name) 1213 1214 # Some code might need to add a node group with a pre-populated UUID 1215 # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass 1216 # the "does this UUID" exist already check. 1217 if check_uuid: 1218 self._EnsureUUID(group, ec_id) 1219 1220 try: 1221 existing_uuid = self._UnlockedLookupNodeGroup(group.name) 1222 except errors.OpPrereqError: 1223 pass 1224 else: 1225 raise errors.OpPrereqError("Desired group name '%s' already exists as a" 1226 " node group (UUID: %s)" % 1227 (group.name, existing_uuid), 1228 errors.ECODE_EXISTS) 1229 1230 group.serial_no = 1 1231 group.ctime = group.mtime = time.time() 1232 group.UpgradeConfig() 1233 1234 self._config_data.nodegroups[group.uuid] = group 1235 self._config_data.cluster.serial_no += 1
1236 1237 @locking.ssynchronized(_config_lock)
1238 - def RemoveNodeGroup(self, group_uuid):
1239 """Remove a node group from the configuration. 1240 1241 @type group_uuid: string 1242 @param group_uuid: the UUID of the node group to remove 1243 1244 """ 1245 logging.info("Removing node group %s from configuration", group_uuid) 1246 1247 if group_uuid not in self._config_data.nodegroups: 1248 raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid) 1249 1250 assert len(self._config_data.nodegroups) != 1, \ 1251 "Group '%s' is the only group, cannot be removed" % group_uuid 1252 1253 del self._config_data.nodegroups[group_uuid] 1254 self._config_data.cluster.serial_no += 1 1255 self._WriteConfig()
1256
1257 - def _UnlockedLookupNodeGroup(self, target):
1258 """Lookup a node group's UUID. 1259 1260 @type target: string or None 1261 @param target: group name or UUID or None to look for the default 1262 @rtype: string 1263 @return: nodegroup UUID 1264 @raises errors.OpPrereqError: when the target group cannot be found 1265 1266 """ 1267 if target is None: 1268 if len(self._config_data.nodegroups) != 1: 1269 raise errors.OpPrereqError("More than one node group exists. Target" 1270 " group must be specified explicitly.") 1271 else: 1272 return self._config_data.nodegroups.keys()[0] 1273 if target in self._config_data.nodegroups: 1274 return target 1275 for nodegroup in self._config_data.nodegroups.values(): 1276 if nodegroup.name == target: 1277 return nodegroup.uuid 1278 raise errors.OpPrereqError("Node group '%s' not found" % target, 1279 errors.ECODE_NOENT)
1280 1281 @locking.ssynchronized(_config_lock, shared=1)
1282 - def LookupNodeGroup(self, target):
1283 """Lookup a node group's UUID. 1284 1285 This function is just a wrapper over L{_UnlockedLookupNodeGroup}. 1286 1287 @type target: string or None 1288 @param target: group name or UUID or None to look for the default 1289 @rtype: string 1290 @return: nodegroup UUID 1291 1292 """ 1293 return self._UnlockedLookupNodeGroup(target)
1294
1295 - def _UnlockedGetNodeGroup(self, uuid):
1296 """Lookup a node group. 1297 1298 @type uuid: string 1299 @param uuid: group UUID 1300 @rtype: L{objects.NodeGroup} or None 1301 @return: nodegroup object, or None if not found 1302 1303 """ 1304 if uuid not in self._config_data.nodegroups: 1305 return None 1306 1307 return self._config_data.nodegroups[uuid]
1308 1309 @locking.ssynchronized(_config_lock, shared=1)
1310 - def GetNodeGroup(self, uuid):
1311 """Lookup a node group. 1312 1313 @type uuid: string 1314 @param uuid: group UUID 1315 @rtype: L{objects.NodeGroup} or None 1316 @return: nodegroup object, or None if not found 1317 1318 """ 1319 return self._UnlockedGetNodeGroup(uuid)
1320 1321 @locking.ssynchronized(_config_lock, shared=1)
1322 - def GetAllNodeGroupsInfo(self):
1323 """Get the configuration of all node groups. 1324 1325 """ 1326 return dict(self._config_data.nodegroups)
1327 1328 @locking.ssynchronized(_config_lock, shared=1)
1329 - def GetNodeGroupList(self):
1330 """Get a list of node groups. 1331 1332 """ 1333 return self._config_data.nodegroups.keys()
1334 1335 @locking.ssynchronized(_config_lock, shared=1)
1336 - def GetNodeGroupMembersByNodes(self, nodes):
1337 """Get nodes which are member in the same nodegroups as the given nodes. 1338 1339 """ 1340 ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group 1341 return frozenset(member_name 1342 for node_name in nodes 1343 for member_name in 1344 self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1345 1346 @locking.ssynchronized(_config_lock, shared=1)
1347 - def GetMultiNodeGroupInfo(self, group_uuids):
1348 """Get the configuration of multiple node groups. 1349 1350 @param group_uuids: List of node group UUIDs 1351 @rtype: list 1352 @return: List of tuples of (group_uuid, group_info) 1353 1354 """ 1355 return [(uuid, self._UnlockedGetNodeGroup(uuid)) for uuid in group_uuids]
1356 1357 @locking.ssynchronized(_config_lock)
1358 - def AddInstance(self, instance, ec_id):
1359 """Add an instance to the config. 1360 1361 This should be used after creating a new instance. 1362 1363 @type instance: L{objects.Instance} 1364 @param instance: the instance object 1365 1366 """ 1367 if not isinstance(instance, objects.Instance): 1368 raise errors.ProgrammerError("Invalid type passed to AddInstance") 1369 1370 if instance.disk_template != constants.DT_DISKLESS: 1371 all_lvs = instance.MapLVsByNode() 1372 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs) 1373 1374 all_macs = self._AllMACs() 1375 for nic in instance.nics: 1376 if nic.mac in all_macs: 1377 raise errors.ConfigurationError("Cannot add instance %s:" 1378 " MAC address '%s' already in use." % 1379 (instance.name, nic.mac)) 1380 1381 self._EnsureUUID(instance, ec_id) 1382 1383 instance.serial_no = 1 1384 instance.ctime = instance.mtime = time.time() 1385 self._config_data.instances[instance.name] = instance 1386 self._config_data.cluster.serial_no += 1 1387 self._UnlockedReleaseDRBDMinors(instance.name) 1388 self._UnlockedCommitTemporaryIps(ec_id) 1389 self._WriteConfig()
1390
1391 - def _EnsureUUID(self, item, ec_id):
1392 """Ensures a given object has a valid UUID. 1393 1394 @param item: the instance or node to be checked 1395 @param ec_id: the execution context id for the uuid reservation 1396 1397 """ 1398 if not item.uuid: 1399 item.uuid = self._GenerateUniqueID(ec_id) 1400 elif item.uuid in self._AllIDs(include_temporary=True): 1401 raise errors.ConfigurationError("Cannot add '%s': UUID %s already" 1402 " in use" % (item.name, item.uuid))
1403
1404 - def _SetInstanceStatus(self, instance_name, status):
1405 """Set the instance's status to a given value. 1406 1407 """ 1408 assert status in constants.ADMINST_ALL, \ 1409 "Invalid status '%s' passed to SetInstanceStatus" % (status,) 1410 1411 if instance_name not in self._config_data.instances: 1412 raise errors.ConfigurationError("Unknown instance '%s'" % 1413 instance_name) 1414 instance = self._config_data.instances[instance_name] 1415 if instance.admin_state != status: 1416 instance.admin_state = status 1417 instance.serial_no += 1 1418 instance.mtime = time.time() 1419 self._WriteConfig()
1420 1421 @locking.ssynchronized(_config_lock)
1422 - def MarkInstanceUp(self, instance_name):
1423 """Mark the instance status to up in the config. 1424 1425 """ 1426 self._SetInstanceStatus(instance_name, constants.ADMINST_UP)
1427 1428 @locking.ssynchronized(_config_lock)
1429 - def MarkInstanceOffline(self, instance_name):
1430 """Mark the instance status to down in the config. 1431 1432 """ 1433 self._SetInstanceStatus(instance_name, constants.ADMINST_OFFLINE)
1434 1435 @locking.ssynchronized(_config_lock)
1436 - def RemoveInstance(self, instance_name):
1437 """Remove the instance from the configuration. 1438 1439 """ 1440 if instance_name not in self._config_data.instances: 1441 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) 1442 1443 # If a network port has been allocated to the instance, 1444 # return it to the pool of free ports. 1445 inst = self._config_data.instances[instance_name] 1446 network_port = getattr(inst, "network_port", None) 1447 if network_port is not None: 1448 self._config_data.cluster.tcpudp_port_pool.add(network_port) 1449 1450 instance = self._UnlockedGetInstanceInfo(instance_name) 1451 1452 for nic in instance.nics: 1453 if nic.network and nic.ip: 1454 # Return all IP addresses to the respective address pools 1455 self._UnlockedCommitIp(constants.RELEASE_ACTION, nic.network, nic.ip) 1456 1457 del self._config_data.instances[instance_name] 1458 self._config_data.cluster.serial_no += 1 1459 self._WriteConfig()
1460 1461 @locking.ssynchronized(_config_lock)
1462 - def RenameInstance(self, old_name, new_name):
1463 """Rename an instance. 1464 1465 This needs to be done in ConfigWriter and not by RemoveInstance 1466 combined with AddInstance as only we can guarantee an atomic 1467 rename. 1468 1469 """ 1470 if old_name not in self._config_data.instances: 1471 raise errors.ConfigurationError("Unknown instance '%s'" % old_name) 1472 1473 # Operate on a copy to not loose instance object in case of a failure 1474 inst = self._config_data.instances[old_name].Copy() 1475 inst.name = new_name 1476 1477 for (idx, disk) in enumerate(inst.disks): 1478 if disk.dev_type == constants.LD_FILE: 1479 # rename the file paths in logical and physical id 1480 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1])) 1481 disk.logical_id = (disk.logical_id[0], 1482 utils.PathJoin(file_storage_dir, inst.name, 1483 "disk%s" % idx)) 1484 disk.physical_id = disk.logical_id 1485 1486 # Actually replace instance object 1487 del self._config_data.instances[old_name] 1488 self._config_data.instances[inst.name] = inst 1489 1490 # Force update of ssconf files 1491 self._config_data.cluster.serial_no += 1 1492 1493 self._WriteConfig()
1494 1495 @locking.ssynchronized(_config_lock)
1496 - def MarkInstanceDown(self, instance_name):
1497 """Mark the status of an instance to down in the configuration. 1498 1499 """ 1500 self._SetInstanceStatus(instance_name, constants.ADMINST_DOWN)
1501
1502 - def _UnlockedGetInstanceList(self):
1503 """Get the list of instances. 1504 1505 This function is for internal use, when the config lock is already held. 1506 1507 """ 1508 return self._config_data.instances.keys()
1509 1510 @locking.ssynchronized(_config_lock, shared=1)
1511 - def GetInstanceList(self):
1512 """Get the list of instances. 1513 1514 @return: array of instances, ex. ['instance2.example.com', 1515 'instance1.example.com'] 1516 1517 """ 1518 return self._UnlockedGetInstanceList()
1519
1520 - def ExpandInstanceName(self, short_name):
1521 """Attempt to expand an incomplete instance name. 1522 1523 """ 1524 # Locking is done in L{ConfigWriter.GetInstanceList} 1525 return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1526
1527 - def _UnlockedGetInstanceInfo(self, instance_name):
1528 """Returns information about an instance. 1529 1530 This function is for internal use, when the config lock is already held. 1531 1532 """ 1533 if instance_name not in self._config_data.instances: 1534 return None 1535 1536 return self._config_data.instances[instance_name]
1537 1538 @locking.ssynchronized(_config_lock, shared=1)
1539 - def GetInstanceInfo(self, instance_name):
1540 """Returns information about an instance. 1541 1542 It takes the information from the configuration file. Other information of 1543 an instance are taken from the live systems. 1544 1545 @param instance_name: name of the instance, e.g. 1546 I{instance1.example.com} 1547 1548 @rtype: L{objects.Instance} 1549 @return: the instance object 1550 1551 """ 1552 return self._UnlockedGetInstanceInfo(instance_name)
1553 1554 @locking.ssynchronized(_config_lock, shared=1)
1555 - def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1556 """Returns set of node group UUIDs for instance's nodes. 1557 1558 @rtype: frozenset 1559 1560 """ 1561 instance = self._UnlockedGetInstanceInfo(instance_name) 1562 if not instance: 1563 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) 1564 1565 if primary_only: 1566 nodes = [instance.primary_node] 1567 else: 1568 nodes = instance.all_nodes 1569 1570 return frozenset(self._UnlockedGetNodeInfo(node_name).group 1571 for node_name in nodes)
1572 1573 @locking.ssynchronized(_config_lock, shared=1)
1574 - def GetInstanceNetworks(self, instance_name):
1575 """Returns set of network UUIDs for instance's nics. 1576 1577 @rtype: frozenset 1578 1579 """ 1580 instance = self._UnlockedGetInstanceInfo(instance_name) 1581 if not instance: 1582 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) 1583 1584 networks = set() 1585 for nic in instance.nics: 1586 if nic.network: 1587 networks.add(nic.network) 1588 1589 return frozenset(networks)
1590 1591 @locking.ssynchronized(_config_lock, shared=1)
1592 - def GetMultiInstanceInfo(self, instances):
1593 """Get the configuration of multiple instances. 1594 1595 @param instances: list of instance names 1596 @rtype: list 1597 @return: list of tuples (instance, instance_info), where 1598 instance_info is what would GetInstanceInfo return for the 1599 node, while keeping the original order 1600 1601 """ 1602 return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1603 1604 @locking.ssynchronized(_config_lock, shared=1)
1605 - def GetAllInstancesInfo(self):
1606 """Get the configuration of all instances. 1607 1608 @rtype: dict 1609 @return: dict of (instance, instance_info), where instance_info is what 1610 would GetInstanceInfo return for the node 1611 1612 """ 1613 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance)) 1614 for instance in self._UnlockedGetInstanceList()]) 1615 return my_dict
1616 1617 @locking.ssynchronized(_config_lock, shared=1)
1618 - def GetInstancesInfoByFilter(self, filter_fn):
1619 """Get instance configuration with a filter. 1620 1621 @type filter_fn: callable 1622 @param filter_fn: Filter function receiving instance object as parameter, 1623 returning boolean. Important: this function is called while the 1624 configuration locks is held. It must not do any complex work or call 1625 functions potentially leading to a deadlock. Ideally it doesn't call any 1626 other functions and just compares instance attributes. 1627 1628 """ 1629 return dict((name, inst) 1630 for (name, inst) in self._config_data.instances.items() 1631 if filter_fn(inst))
1632 1633 @locking.ssynchronized(_config_lock)
1634 - def AddNode(self, node, ec_id):
1635 """Add a node to the configuration. 1636 1637 @type node: L{objects.Node} 1638 @param node: a Node instance 1639 1640 """ 1641 logging.info("Adding node %s to configuration", node.name) 1642 1643 self._EnsureUUID(node, ec_id) 1644 1645 node.serial_no = 1 1646 node.ctime = node.mtime = time.time() 1647 self._UnlockedAddNodeToGroup(node.name, node.group) 1648 self._config_data.nodes[node.name] = node 1649 self._config_data.cluster.serial_no += 1 1650 self._WriteConfig()
1651 1652 @locking.ssynchronized(_config_lock)
1653 - def RemoveNode(self, node_name):
1654 """Remove a node from the configuration. 1655 1656 """ 1657 logging.info("Removing node %s from configuration", node_name) 1658 1659 if node_name not in self._config_data.nodes: 1660 raise errors.ConfigurationError("Unknown node '%s'" % node_name) 1661 1662 self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name]) 1663 del self._config_data.nodes[node_name] 1664 self._config_data.cluster.serial_no += 1 1665 self._WriteConfig()
1666
1667 - def ExpandNodeName(self, short_name):
1668 """Attempt to expand an incomplete node name. 1669 1670 """ 1671 # Locking is done in L{ConfigWriter.GetNodeList} 1672 return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1673
1674 - def _UnlockedGetNodeInfo(self, node_name):
1675 """Get the configuration of a node, as stored in the config. 1676 1677 This function is for internal use, when the config lock is already 1678 held. 1679 1680 @param node_name: the node name, e.g. I{node1.example.com} 1681 1682 @rtype: L{objects.Node} 1683 @return: the node object 1684 1685 """ 1686 if node_name not in self._config_data.nodes: 1687 return None 1688 1689 return self._config_data.nodes[node_name]
1690 1691 @locking.ssynchronized(_config_lock, shared=1)
1692 - def GetNodeInfo(self, node_name):
1693 """Get the configuration of a node, as stored in the config. 1694 1695 This is just a locked wrapper over L{_UnlockedGetNodeInfo}. 1696 1697 @param node_name: the node name, e.g. I{node1.example.com} 1698 1699 @rtype: L{objects.Node} 1700 @return: the node object 1701 1702 """ 1703 return self._UnlockedGetNodeInfo(node_name)
1704 1705 @locking.ssynchronized(_config_lock, shared=1)
1706 - def GetNodeInstances(self, node_name):
1707 """Get the instances of a node, as stored in the config. 1708 1709 @param node_name: the node name, e.g. I{node1.example.com} 1710 1711 @rtype: (list, list) 1712 @return: a tuple with two lists: the primary and the secondary instances 1713 1714 """ 1715 pri = [] 1716 sec = [] 1717 for inst in self._config_data.instances.values(): 1718 if inst.primary_node == node_name: 1719 pri.append(inst.name) 1720 if node_name in inst.secondary_nodes: 1721 sec.append(inst.name) 1722 return (pri, sec)
1723 1724 @locking.ssynchronized(_config_lock, shared=1)
1725 - def GetNodeGroupInstances(self, uuid, primary_only=False):
1726 """Get the instances of a node group. 1727 1728 @param uuid: Node group UUID 1729 @param primary_only: Whether to only consider primary nodes 1730 @rtype: frozenset 1731 @return: List of instance names in node group 1732 1733 """ 1734 if primary_only: 1735 nodes_fn = lambda inst: [inst.primary_node] 1736 else: 1737 nodes_fn = lambda inst: inst.all_nodes 1738 1739 return frozenset(inst.name 1740 for inst in self._config_data.instances.values() 1741 for node_name in nodes_fn(inst) 1742 if self._UnlockedGetNodeInfo(node_name).group == uuid)
1743
1744 - def _UnlockedGetNodeList(self):
1745 """Return the list of nodes which are in the configuration. 1746 1747 This function is for internal use, when the config lock is already 1748 held. 1749 1750 @rtype: list 1751 1752 """ 1753 return self._config_data.nodes.keys()
1754 1755 @locking.ssynchronized(_config_lock, shared=1)
1756 - def GetNodeList(self):
1757 """Return the list of nodes which are in the configuration. 1758 1759 """ 1760 return self._UnlockedGetNodeList()
1761
1762 - def _UnlockedGetOnlineNodeList(self):
1763 """Return the list of nodes which are online. 1764 1765 """ 1766 all_nodes = [self._UnlockedGetNodeInfo(node) 1767 for node in self._UnlockedGetNodeList()] 1768 return [node.name for node in all_nodes if not node.offline]
1769 1770 @locking.ssynchronized(_config_lock, shared=1)
1771 - def GetOnlineNodeList(self):
1772 """Return the list of nodes which are online. 1773 1774 """ 1775 return self._UnlockedGetOnlineNodeList()
1776 1777 @locking.ssynchronized(_config_lock, shared=1)
1778 - def GetVmCapableNodeList(self):
1779 """Return the list of nodes which are not vm capable. 1780 1781 """ 1782 all_nodes = [self._UnlockedGetNodeInfo(node) 1783 for node in self._UnlockedGetNodeList()] 1784 return [node.name for node in all_nodes if node.vm_capable]
1785 1786 @locking.ssynchronized(_config_lock, shared=1)
1787 - def GetNonVmCapableNodeList(self):
1788 """Return the list of nodes which are not vm capable. 1789 1790 """ 1791 all_nodes = [self._UnlockedGetNodeInfo(node) 1792 for node in self._UnlockedGetNodeList()] 1793 return [node.name for node in all_nodes if not node.vm_capable]
1794 1795 @locking.ssynchronized(_config_lock, shared=1)
1796 - def GetMultiNodeInfo(self, nodes):
1797 """Get the configuration of multiple nodes. 1798 1799 @param nodes: list of node names 1800 @rtype: list 1801 @return: list of tuples of (node, node_info), where node_info is 1802 what would GetNodeInfo return for the node, in the original 1803 order 1804 1805 """ 1806 return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1807 1808 @locking.ssynchronized(_config_lock, shared=1)
1809 - def GetAllNodesInfo(self):
1810 """Get the configuration of all nodes. 1811 1812 @rtype: dict 1813 @return: dict of (node, node_info), where node_info is what 1814 would GetNodeInfo return for the node 1815 1816 """ 1817 return self._UnlockedGetAllNodesInfo()
1818
1819 - def _UnlockedGetAllNodesInfo(self):
1820 """Gets configuration of all nodes. 1821 1822 @note: See L{GetAllNodesInfo} 1823 1824 """ 1825 return dict([(node, self._UnlockedGetNodeInfo(node)) 1826 for node in self._UnlockedGetNodeList()])
1827 1828 @locking.ssynchronized(_config_lock, shared=1)
1829 - def GetNodeGroupsFromNodes(self, nodes):
1830 """Returns groups for a list of nodes. 1831 1832 @type nodes: list of string 1833 @param nodes: List of node names 1834 @rtype: frozenset 1835 1836 """ 1837 return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1838
1839 - def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1840 """Get the number of current and maximum desired and possible candidates. 1841 1842 @type exceptions: list 1843 @param exceptions: if passed, list of nodes that should be ignored 1844 @rtype: tuple 1845 @return: tuple of (current, desired and possible, possible) 1846 1847 """ 1848 mc_now = mc_should = mc_max = 0 1849 for node in self._config_data.nodes.values(): 1850 if exceptions and node.name in exceptions: 1851 continue 1852 if not (node.offline or node.drained) and node.master_capable: 1853 mc_max += 1 1854 if node.master_candidate: 1855 mc_now += 1 1856 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size) 1857 return (mc_now, mc_should, mc_max)
1858 1859 @locking.ssynchronized(_config_lock, shared=1)
1860 - def GetMasterCandidateStats(self, exceptions=None):
1861 """Get the number of current and maximum possible candidates. 1862 1863 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}. 1864 1865 @type exceptions: list 1866 @param exceptions: if passed, list of nodes that should be ignored 1867 @rtype: tuple 1868 @return: tuple of (current, max) 1869 1870 """ 1871 return self._UnlockedGetMasterCandidateStats(exceptions)
1872 1873 @locking.ssynchronized(_config_lock)
1874 - def MaintainCandidatePool(self, exceptions):
1875 """Try to grow the candidate pool to the desired size. 1876 1877 @type exceptions: list 1878 @param exceptions: if passed, list of nodes that should be ignored 1879 @rtype: list 1880 @return: list with the adjusted nodes (L{objects.Node} instances) 1881 1882 """ 1883 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions) 1884 mod_list = [] 1885 if mc_now < mc_max: 1886 node_list = self._config_data.nodes.keys() 1887 random.shuffle(node_list) 1888 for name in node_list: 1889 if mc_now >= mc_max: 1890 break 1891 node = self._config_data.nodes[name] 1892 if (node.master_candidate or node.offline or node.drained or 1893 node.name in exceptions or not node.master_capable): 1894 continue 1895 mod_list.append(node) 1896 node.master_candidate = True 1897 node.serial_no += 1 1898 mc_now += 1 1899 if mc_now != mc_max: 1900 # this should not happen 1901 logging.warning("Warning: MaintainCandidatePool didn't manage to" 1902 " fill the candidate pool (%d/%d)", mc_now, mc_max) 1903 if mod_list: 1904 self._config_data.cluster.serial_no += 1 1905 self._WriteConfig() 1906 1907 return mod_list
1908
1909 - def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1910 """Add a given node to the specified group. 1911 1912 """ 1913 if nodegroup_uuid not in self._config_data.nodegroups: 1914 # This can happen if a node group gets deleted between its lookup and 1915 # when we're adding the first node to it, since we don't keep a lock in 1916 # the meantime. It's ok though, as we'll fail cleanly if the node group 1917 # is not found anymore. 1918 raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid) 1919 if node_name not in self._config_data.nodegroups[nodegroup_uuid].members: 1920 self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1921
1922 - def _UnlockedRemoveNodeFromGroup(self, node):
1923 """Remove a given node from its group. 1924 1925 """ 1926 nodegroup = node.group 1927 if nodegroup not in self._config_data.nodegroups: 1928 logging.warning("Warning: node '%s' has unknown node group '%s'" 1929 " (while being removed from it)", node.name, nodegroup) 1930 nodegroup_obj = self._config_data.nodegroups[nodegroup] 1931 if node.name not in nodegroup_obj.members: 1932 logging.warning("Warning: node '%s' not a member of its node group '%s'" 1933 " (while being removed from it)", node.name, nodegroup) 1934 else: 1935 nodegroup_obj.members.remove(node.name)
1936 1937 @locking.ssynchronized(_config_lock)
1938 - def AssignGroupNodes(self, mods):
1939 """Changes the group of a number of nodes. 1940 1941 @type mods: list of tuples; (node name, new group UUID) 1942 @param mods: Node membership modifications 1943 1944 """ 1945 groups = self._config_data.nodegroups 1946 nodes = self._config_data.nodes 1947 1948 resmod = [] 1949 1950 # Try to resolve names/UUIDs first 1951 for (node_name, new_group_uuid) in mods: 1952 try: 1953 node = nodes[node_name] 1954 except KeyError: 1955 raise errors.ConfigurationError("Unable to find node '%s'" % node_name) 1956 1957 if node.group == new_group_uuid: 1958 # Node is being assigned to its current group 1959 logging.debug("Node '%s' was assigned to its current group (%s)", 1960 node_name, node.group) 1961 continue 1962 1963 # Try to find current group of node 1964 try: 1965 old_group = groups[node.group] 1966 except KeyError: 1967 raise errors.ConfigurationError("Unable to find old group '%s'" % 1968 node.group) 1969 1970 # Try to find new group for node 1971 try: 1972 new_group = groups[new_group_uuid] 1973 except KeyError: 1974 raise errors.ConfigurationError("Unable to find new group '%s'" % 1975 new_group_uuid) 1976 1977 assert node.name in old_group.members, \ 1978 ("Inconsistent configuration: node '%s' not listed in members for its" 1979 " old group '%s'" % (node.name, old_group.uuid)) 1980 assert node.name not in new_group.members, \ 1981 ("Inconsistent configuration: node '%s' already listed in members for" 1982 " its new group '%s'" % (node.name, new_group.uuid)) 1983 1984 resmod.append((node, old_group, new_group)) 1985 1986 # Apply changes 1987 for (node, old_group, new_group) in resmod: 1988 assert node.uuid != new_group.uuid and old_group.uuid != new_group.uuid, \ 1989 "Assigning to current group is not possible" 1990 1991 node.group = new_group.uuid 1992 1993 # Update members of involved groups 1994 if node.name in old_group.members: 1995 old_group.members.remove(node.name) 1996 if node.name not in new_group.members: 1997 new_group.members.append(node.name) 1998 1999 # Update timestamps and serials (only once per node/group object) 2000 now = time.time() 2001 for obj in frozenset(itertools.chain(*resmod)): # pylint: disable=W0142 2002 obj.serial_no += 1 2003 obj.mtime = now 2004 2005 # Force ssconf update 2006 self._config_data.cluster.serial_no += 1 2007 2008 self._WriteConfig()
2009
2010 - def _BumpSerialNo(self):
2011 """Bump up the serial number of the config. 2012 2013 """ 2014 self._config_data.serial_no += 1 2015 self._config_data.mtime = time.time()
2016
2017 - def _AllUUIDObjects(self):
2018 """Returns all objects with uuid attributes. 2019 2020 """ 2021 return (self._config_data.instances.values() + 2022 self._config_data.nodes.values() + 2023 self._config_data.nodegroups.values() + 2024 self._config_data.networks.values() + 2025 [self._config_data.cluster])
2026
2027 - def _OpenConfig(self, accept_foreign):
2028 """Read the config data from disk. 2029 2030 """ 2031 raw_data = utils.ReadFile(self._cfg_file) 2032 2033 try: 2034 data = objects.ConfigData.FromDict(serializer.Load(raw_data)) 2035 except Exception, err: 2036 raise errors.ConfigurationError(err) 2037 2038 # Make sure the configuration has the right version 2039 _ValidateConfig(data) 2040 2041 if (not hasattr(data, "cluster") or 2042 not hasattr(data.cluster, "rsahostkeypub")): 2043 raise errors.ConfigurationError("Incomplete configuration" 2044 " (missing cluster.rsahostkeypub)") 2045 2046 if data.cluster.master_node != self._my_hostname and not accept_foreign: 2047 msg = ("The configuration denotes node %s as master, while my" 2048 " hostname is %s; opening a foreign configuration is only" 2049 " possible in accept_foreign mode" % 2050 (data.cluster.master_node, self._my_hostname)) 2051 raise errors.ConfigurationError(msg) 2052 2053 self._config_data = data 2054 # reset the last serial as -1 so that the next write will cause 2055 # ssconf update 2056 self._last_cluster_serial = -1 2057 2058 # Upgrade configuration if needed 2059 self._UpgradeConfig() 2060 2061 self._cfg_id = utils.GetFileID(path=self._cfg_file)
2062
2063 - def _UpgradeConfig(self):
2064 """Run any upgrade steps. 2065 2066 This method performs both in-object upgrades and also update some data 2067 elements that need uniqueness across the whole configuration or interact 2068 with other objects. 2069 2070 @warning: this function will call L{_WriteConfig()}, but also 2071 L{DropECReservations} so it needs to be called only from a 2072 "safe" place (the constructor). If one wanted to call it with 2073 the lock held, a DropECReservationUnlocked would need to be 2074 created first, to avoid causing deadlock. 2075 2076 """ 2077 # Keep a copy of the persistent part of _config_data to check for changes 2078 # Serialization doesn't guarantee order in dictionaries 2079 oldconf = copy.deepcopy(self._config_data.ToDict()) 2080 2081 # In-object upgrades 2082 self._config_data.UpgradeConfig() 2083 2084 for item in self._AllUUIDObjects(): 2085 if item.uuid is None: 2086 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID) 2087 if not self._config_data.nodegroups: 2088 default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME 2089 default_nodegroup = objects.NodeGroup(name=default_nodegroup_name, 2090 members=[]) 2091 self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True) 2092 for node in self._config_data.nodes.values(): 2093 if not node.group: 2094 node.group = self.LookupNodeGroup(None) 2095 # This is technically *not* an upgrade, but needs to be done both when 2096 # nodegroups are being added, and upon normally loading the config, 2097 # because the members list of a node group is discarded upon 2098 # serializing/deserializing the object. 2099 self._UnlockedAddNodeToGroup(node.name, node.group) 2100 2101 modified = (oldconf != self._config_data.ToDict()) 2102 if modified: 2103 self._WriteConfig() 2104 # This is ok even if it acquires the internal lock, as _UpgradeConfig is 2105 # only called at config init time, without the lock held 2106 self.DropECReservations(_UPGRADE_CONFIG_JID)
2107
2108 - def _DistributeConfig(self, feedback_fn):
2109 """Distribute the configuration to the other nodes. 2110 2111 Currently, this only copies the configuration file. In the future, 2112 it could be used to encapsulate the 2/3-phase update mechanism. 2113 2114 """ 2115 if self._offline: 2116 return True 2117 2118 bad = False 2119 2120 node_list = [] 2121 addr_list = [] 2122 myhostname = self._my_hostname 2123 # we can skip checking whether _UnlockedGetNodeInfo returns None 2124 # since the node list comes from _UnlocketGetNodeList, and we are 2125 # called with the lock held, so no modifications should take place 2126 # in between 2127 for node_name in self._UnlockedGetNodeList(): 2128 if node_name == myhostname: 2129 continue 2130 node_info = self._UnlockedGetNodeInfo(node_name) 2131 if not node_info.master_candidate: 2132 continue 2133 node_list.append(node_info.name) 2134 addr_list.append(node_info.primary_ip) 2135 2136 # TODO: Use dedicated resolver talking to config writer for name resolution 2137 result = \ 2138 self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file) 2139 for to_node, to_result in result.items(): 2140 msg = to_result.fail_msg 2141 if msg: 2142 msg = ("Copy of file %s to node %s failed: %s" % 2143 (self._cfg_file, to_node, msg)) 2144 logging.error(msg) 2145 2146 if feedback_fn: 2147 feedback_fn(msg) 2148 2149 bad = True 2150 2151 return not bad
2152
2153 - def _WriteConfig(self, destination=None, feedback_fn=None):
2154 """Write the configuration data to persistent storage. 2155 2156 """ 2157 assert feedback_fn is None or callable(feedback_fn) 2158 2159 # Warn on config errors, but don't abort the save - the 2160 # configuration has already been modified, and we can't revert; 2161 # the best we can do is to warn the user and save as is, leaving 2162 # recovery to the user 2163 config_errors = self._UnlockedVerifyConfig() 2164 if config_errors: 2165 errmsg = ("Configuration data is not consistent: %s" % 2166 (utils.CommaJoin(config_errors))) 2167 logging.critical(errmsg) 2168 if feedback_fn: 2169 feedback_fn(errmsg) 2170 2171 if destination is None: 2172 destination = self._cfg_file 2173 self._BumpSerialNo() 2174 txt = serializer.Dump(self._config_data.ToDict()) 2175 2176 getents = self._getents() 2177 try: 2178 fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt, 2179 close=False, gid=getents.confd_gid, mode=0640) 2180 except errors.LockError: 2181 raise errors.ConfigurationError("The configuration file has been" 2182 " modified since the last write, cannot" 2183 " update") 2184 try: 2185 self._cfg_id = utils.GetFileID(fd=fd) 2186 finally: 2187 os.close(fd) 2188 2189 self.write_count += 1 2190 2191 # and redistribute the config file to master candidates 2192 self._DistributeConfig(feedback_fn) 2193 2194 # Write ssconf files on all nodes (including locally) 2195 if self._last_cluster_serial < self._config_data.cluster.serial_no: 2196 if not self._offline: 2197 result = self._GetRpc(None).call_write_ssconf_files( 2198 self._UnlockedGetOnlineNodeList(), 2199 self._UnlockedGetSsconfValues()) 2200 2201 for nname, nresu in result.items(): 2202 msg = nresu.fail_msg 2203 if msg: 2204 errmsg = ("Error while uploading ssconf files to" 2205 " node %s: %s" % (nname, msg)) 2206 logging.warning(errmsg) 2207 2208 if feedback_fn: 2209 feedback_fn(errmsg) 2210 2211 self._last_cluster_serial = self._config_data.cluster.serial_no
2212
2213 - def _UnlockedGetSsconfValues(self):
2214 """Return the values needed by ssconf. 2215 2216 @rtype: dict 2217 @return: a dictionary with keys the ssconf names and values their 2218 associated value 2219 2220 """ 2221 fn = "\n".join 2222 instance_names = utils.NiceSort(self._UnlockedGetInstanceList()) 2223 node_names = utils.NiceSort(self._UnlockedGetNodeList()) 2224 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names] 2225 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip) 2226 for ninfo in node_info] 2227 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip) 2228 for ninfo in node_info] 2229 2230 instance_data = fn(instance_names) 2231 off_data = fn(node.name for node in node_info if node.offline) 2232 on_data = fn(node.name for node in node_info if not node.offline) 2233 mc_data = fn(node.name for node in node_info if node.master_candidate) 2234 mc_ips_data = fn(node.primary_ip for node in node_info 2235 if node.master_candidate) 2236 node_data = fn(node_names) 2237 node_pri_ips_data = fn(node_pri_ips) 2238 node_snd_ips_data = fn(node_snd_ips) 2239 2240 cluster = self._config_data.cluster 2241 cluster_tags = fn(cluster.GetTags()) 2242 2243 hypervisor_list = fn(cluster.enabled_hypervisors) 2244 2245 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n") 2246 2247 nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in 2248 self._config_data.nodegroups.values()] 2249 nodegroups_data = fn(utils.NiceSort(nodegroups)) 2250 networks = ["%s %s" % (net.uuid, net.name) for net in 2251 self._config_data.networks.values()] 2252 networks_data = fn(utils.NiceSort(networks)) 2253 2254 ssconf_values = { 2255 constants.SS_CLUSTER_NAME: cluster.cluster_name, 2256 constants.SS_CLUSTER_TAGS: cluster_tags, 2257 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir, 2258 constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir, 2259 constants.SS_MASTER_CANDIDATES: mc_data, 2260 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data, 2261 constants.SS_MASTER_IP: cluster.master_ip, 2262 constants.SS_MASTER_NETDEV: cluster.master_netdev, 2263 constants.SS_MASTER_NETMASK: str(cluster.master_netmask), 2264 constants.SS_MASTER_NODE: cluster.master_node, 2265 constants.SS_NODE_LIST: node_data, 2266 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data, 2267 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data, 2268 constants.SS_OFFLINE_NODES: off_data, 2269 constants.SS_ONLINE_NODES: on_data, 2270 constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family), 2271 constants.SS_INSTANCE_LIST: instance_data, 2272 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION, 2273 constants.SS_HYPERVISOR_LIST: hypervisor_list, 2274 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health), 2275 constants.SS_UID_POOL: uid_pool, 2276 constants.SS_NODEGROUPS: nodegroups_data, 2277 constants.SS_NETWORKS: networks_data, 2278 } 2279 bad_values = [(k, v) for k, v in ssconf_values.items() 2280 if not isinstance(v, (str, basestring))] 2281 if bad_values: 2282 err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values) 2283 raise errors.ConfigurationError("Some ssconf key(s) have non-string" 2284 " values: %s" % err) 2285 return ssconf_values
2286 2287 @locking.ssynchronized(_config_lock, shared=1)
2288 - def GetSsconfValues(self):
2289 """Wrapper using lock around _UnlockedGetSsconf(). 2290 2291 """ 2292 return self._UnlockedGetSsconfValues()
2293 2294 @locking.ssynchronized(_config_lock, shared=1)
2295 - def GetVGName(self):
2296 """Return the volume group name. 2297 2298 """ 2299 return self._config_data.cluster.volume_group_name
2300 2301 @locking.ssynchronized(_config_lock)
2302 - def SetVGName(self, vg_name):
2303 """Set the volume group name. 2304 2305 """ 2306 self._config_data.cluster.volume_group_name = vg_name 2307 self._config_data.cluster.serial_no += 1 2308 self._WriteConfig()
2309 2310 @locking.ssynchronized(_config_lock, shared=1)
2311 - def GetDRBDHelper(self):
2312 """Return DRBD usermode helper. 2313 2314 """ 2315 return self._config_data.cluster.drbd_usermode_helper
2316 2317 @locking.ssynchronized(_config_lock)
2318 - def SetDRBDHelper(self, drbd_helper):
2319 """Set DRBD usermode helper. 2320 2321 """ 2322 self._config_data.cluster.drbd_usermode_helper = drbd_helper 2323 self._config_data.cluster.serial_no += 1 2324 self._WriteConfig()
2325 2326 @locking.ssynchronized(_config_lock, shared=1)
2327 - def GetMACPrefix(self):
2328 """Return the mac prefix. 2329 2330 """ 2331 return self._config_data.cluster.mac_prefix
2332 2333 @locking.ssynchronized(_config_lock, shared=1)
2334 - def GetClusterInfo(self):
2335 """Returns information about the cluster 2336 2337 @rtype: L{objects.Cluster} 2338 @return: the cluster object 2339 2340 """ 2341 return self._config_data.cluster
2342 2343 @locking.ssynchronized(_config_lock, shared=1)
2344 - def HasAnyDiskOfType(self, dev_type):
2345 """Check if in there is at disk of the given type in the configuration. 2346 2347 """ 2348 return self._config_data.HasAnyDiskOfType(dev_type)
2349 2350 @locking.ssynchronized(_config_lock)
2351 - def Update(self, target, feedback_fn, ec_id=None):
2352 """Notify function to be called after updates. 2353 2354 This function must be called when an object (as returned by 2355 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the 2356 caller wants the modifications saved to the backing store. Note 2357 that all modified objects will be saved, but the target argument 2358 is the one the caller wants to ensure that it's saved. 2359 2360 @param target: an instance of either L{objects.Cluster}, 2361 L{objects.Node} or L{objects.Instance} which is existing in 2362 the cluster 2363 @param feedback_fn: Callable feedback function 2364 2365 """ 2366 if self._config_data is None: 2367 raise errors.ProgrammerError("Configuration file not read," 2368 " cannot save.") 2369 update_serial = False 2370 if isinstance(target, objects.Cluster): 2371 test = target == self._config_data.cluster 2372 elif isinstance(target, objects.Node): 2373 test = target in self._config_data.nodes.values() 2374 update_serial = True 2375 elif isinstance(target, objects.Instance): 2376 test = target in self._config_data.instances.values() 2377 elif isinstance(target, objects.NodeGroup): 2378 test = target in self._config_data.nodegroups.values() 2379 elif isinstance(target, objects.Network): 2380 test = target in self._config_data.networks.values() 2381 else: 2382 raise errors.ProgrammerError("Invalid object type (%s) passed to" 2383 " ConfigWriter.Update" % type(target)) 2384 if not test: 2385 raise errors.ConfigurationError("Configuration updated since object" 2386 " has been read or unknown object") 2387 target.serial_no += 1 2388 target.mtime = now = time.time() 2389 2390 if update_serial: 2391 # for node updates, we need to increase the cluster serial too 2392 self._config_data.cluster.serial_no += 1 2393 self._config_data.cluster.mtime = now 2394 2395 if isinstance(target, objects.Instance): 2396 self._UnlockedReleaseDRBDMinors(target.name) 2397 2398 if ec_id is not None: 2399 # Commit all ips reserved by OpInstanceSetParams and OpGroupSetParams 2400 self._UnlockedCommitTemporaryIps(ec_id) 2401 2402 self._WriteConfig(feedback_fn=feedback_fn)
2403 2404 @locking.ssynchronized(_config_lock)
2405 - def DropECReservations(self, ec_id):
2406 """Drop per-execution-context reservations 2407 2408 """ 2409 for rm in self._all_rms: 2410 rm.DropECReservations(ec_id)
2411 2412 @locking.ssynchronized(_config_lock, shared=1)
2413 - def GetAllNetworksInfo(self):
2414 """Get configuration info of all the networks. 2415 2416 """ 2417 return dict(self._config_data.networks)
2418
2419 - def _UnlockedGetNetworkList(self):
2420 """Get the list of networks. 2421 2422 This function is for internal use, when the config lock is already held. 2423 2424 """ 2425 return self._config_data.networks.keys()
2426 2427 @locking.ssynchronized(_config_lock, shared=1)
2428 - def GetNetworkList(self):
2429 """Get the list of networks. 2430 2431 @return: array of networks, ex. ["main", "vlan100", "200] 2432 2433 """ 2434 return self._UnlockedGetNetworkList()
2435 2436 @locking.ssynchronized(_config_lock, shared=1)
2437 - def GetNetworkNames(self):
2438 """Get a list of network names 2439 2440 """ 2441 names = [net.name 2442 for net in self._config_data.networks.values()] 2443 return names
2444
2445 - def _UnlockedGetNetwork(self, uuid):
2446 """Returns information about a network. 2447 2448 This function is for internal use, when the config lock is already held. 2449 2450 """ 2451 if uuid not in self._config_data.networks: 2452 return None 2453 2454 return self._config_data.networks[uuid]
2455 2456 @locking.ssynchronized(_config_lock, shared=1)
2457 - def GetNetwork(self, uuid):
2458 """Returns information about a network. 2459 2460 It takes the information from the configuration file. 2461 2462 @param uuid: UUID of the network 2463 2464 @rtype: L{objects.Network} 2465 @return: the network object 2466 2467 """ 2468 return self._UnlockedGetNetwork(uuid)
2469 2470 @locking.ssynchronized(_config_lock)
2471 - def AddNetwork(self, net, ec_id, check_uuid=True):
2472 """Add a network to the configuration. 2473 2474 @type net: L{objects.Network} 2475 @param net: the Network object to add 2476 @type ec_id: string 2477 @param ec_id: unique id for the job to use when creating a missing UUID 2478 2479 """ 2480 self._UnlockedAddNetwork(net, ec_id, check_uuid) 2481 self._WriteConfig()
2482
2483 - def _UnlockedAddNetwork(self, net, ec_id, check_uuid):
2484 """Add a network to the configuration. 2485 2486 """ 2487 logging.info("Adding network %s to configuration", net.name) 2488 2489 if check_uuid: 2490 self._EnsureUUID(net, ec_id) 2491 2492 net.serial_no = 1 2493 net.ctime = net.mtime = time.time() 2494 self._config_data.networks[net.uuid] = net 2495 self._config_data.cluster.serial_no += 1
2496
2497 - def _UnlockedLookupNetwork(self, target):
2498 """Lookup a network's UUID. 2499 2500 @type target: string 2501 @param target: network name or UUID 2502 @rtype: string 2503 @return: network UUID 2504 @raises errors.OpPrereqError: when the target network cannot be found 2505 2506 """ 2507 if target is None: 2508 return None 2509 if target in self._config_data.networks: 2510 return target 2511 for net in self._config_data.networks.values(): 2512 if net.name == target: 2513 return net.uuid 2514 raise errors.OpPrereqError("Network '%s' not found" % target, 2515 errors.ECODE_NOENT)
2516 2517 @locking.ssynchronized(_config_lock, shared=1)
2518 - def LookupNetwork(self, target):
2519 """Lookup a network's UUID. 2520 2521 This function is just a wrapper over L{_UnlockedLookupNetwork}. 2522 2523 @type target: string 2524 @param target: network name or UUID 2525 @rtype: string 2526 @return: network UUID 2527 2528 """ 2529 return self._UnlockedLookupNetwork(target)
2530 2531 @locking.ssynchronized(_config_lock)
2532 - def RemoveNetwork(self, network_uuid):
2533 """Remove a network from the configuration. 2534 2535 @type network_uuid: string 2536 @param network_uuid: the UUID of the network to remove 2537 2538 """ 2539 logging.info("Removing network %s from configuration", network_uuid) 2540 2541 if network_uuid not in self._config_data.networks: 2542 raise errors.ConfigurationError("Unknown network '%s'" % network_uuid) 2543 2544 del self._config_data.networks[network_uuid] 2545 self._config_data.cluster.serial_no += 1 2546 self._WriteConfig()
2547
2548 - def _UnlockedGetGroupNetParams(self, net_uuid, node):
2549 """Get the netparams (mode, link) of a network. 2550 2551 Get a network's netparams for a given node. 2552 2553 @type net_uuid: string 2554 @param net_uuid: network uuid 2555 @type node: string 2556 @param node: node name 2557 @rtype: dict or None 2558 @return: netparams 2559 2560 """ 2561 node_info = self._UnlockedGetNodeInfo(node) 2562 nodegroup_info = self._UnlockedGetNodeGroup(node_info.group) 2563 netparams = nodegroup_info.networks.get(net_uuid, None) 2564 2565 return netparams
2566 2567 @locking.ssynchronized(_config_lock, shared=1)
2568 - def GetGroupNetParams(self, net_uuid, node):
2569 """Locking wrapper of _UnlockedGetGroupNetParams() 2570 2571 """ 2572 return self._UnlockedGetGroupNetParams(net_uuid, node)
2573 2574 @locking.ssynchronized(_config_lock, shared=1)
2575 - def CheckIPInNodeGroup(self, ip, node):
2576 """Check IP uniqueness in nodegroup. 2577 2578 Check networks that are connected in the node's node group 2579 if ip is contained in any of them. Used when creating/adding 2580 a NIC to ensure uniqueness among nodegroups. 2581 2582 @type ip: string 2583 @param ip: ip address 2584 @type node: string 2585 @param node: node name 2586 @rtype: (string, dict) or (None, None) 2587 @return: (network name, netparams) 2588 2589 """ 2590 if ip is None: 2591 return (None, None) 2592 node_info = self._UnlockedGetNodeInfo(node) 2593 nodegroup_info = self._UnlockedGetNodeGroup(node_info.group) 2594 for net_uuid in nodegroup_info.networks.keys(): 2595 net_info = self._UnlockedGetNetwork(net_uuid) 2596 pool = network.AddressPool(net_info) 2597 if pool.Contains(ip): 2598 return (net_info.name, nodegroup_info.networks[net_uuid]) 2599 2600 return (None, None)
2601