Package ganeti :: Module config
[hide private]
[frames] | no frames]

Source Code for Module ganeti.config

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Configuration management for Ganeti 
  23   
  24  This module provides the interface to the Ganeti cluster configuration. 
  25   
  26  The configuration data is stored on every node but is updated on the master 
  27  only. After each update, the master distributes the data to the other nodes. 
  28   
  29  Currently, the data storage format is JSON. YAML was slow and consuming too 
  30  much memory. 
  31   
  32  """ 
  33   
  34  import os 
  35  import random 
  36  import logging 
  37  import time 
  38   
  39  from ganeti import errors 
  40  from ganeti import locking 
  41  from ganeti import utils 
  42  from ganeti import constants 
  43  from ganeti import rpc 
  44  from ganeti import objects 
  45  from ganeti import serializer 
  46  from ganeti import uidpool 
  47  from ganeti import netutils 
  48   
  49   
  50  _config_lock = locking.SharedLock("ConfigWriter") 
  51   
  52  # job id used for resource management at config upgrade time 
  53  _UPGRADE_CONFIG_JID = "jid-cfg-upgrade" 
54 55 56 -def _ValidateConfig(data):
57 """Verifies that a configuration objects looks valid. 58 59 This only verifies the version of the configuration. 60 61 @raise errors.ConfigurationError: if the version differs from what 62 we expect 63 64 """ 65 if data.version != constants.CONFIG_VERSION: 66 raise errors.ConfigurationError("Cluster configuration version" 67 " mismatch, got %s instead of %s" % 68 (data.version, 69 constants.CONFIG_VERSION))
70
71 72 -class TemporaryReservationManager:
73 """A temporary resource reservation manager. 74 75 This is used to reserve resources in a job, before using them, making sure 76 other jobs cannot get them in the meantime. 77 78 """
79 - def __init__(self):
80 self._ec_reserved = {}
81
82 - def Reserved(self, resource):
83 for holder_reserved in self._ec_reserved.items(): 84 if resource in holder_reserved: 85 return True 86 return False
87
88 - def Reserve(self, ec_id, resource):
89 if self.Reserved(resource): 90 raise errors.ReservationError("Duplicate reservation for resource: %s." % 91 (resource)) 92 if ec_id not in self._ec_reserved: 93 self._ec_reserved[ec_id] = set([resource]) 94 else: 95 self._ec_reserved[ec_id].add(resource)
96
97 - def DropECReservations(self, ec_id):
98 if ec_id in self._ec_reserved: 99 del self._ec_reserved[ec_id]
100
101 - def GetReserved(self):
102 all_reserved = set() 103 for holder_reserved in self._ec_reserved.values(): 104 all_reserved.update(holder_reserved) 105 return all_reserved
106
107 - def Generate(self, existing, generate_one_fn, ec_id):
108 """Generate a new resource of this type 109 110 """ 111 assert callable(generate_one_fn) 112 113 all_elems = self.GetReserved() 114 all_elems.update(existing) 115 retries = 64 116 while retries > 0: 117 new_resource = generate_one_fn() 118 if new_resource is not None and new_resource not in all_elems: 119 break 120 else: 121 raise errors.ConfigurationError("Not able generate new resource" 122 " (last tried: %s)" % new_resource) 123 self.Reserve(ec_id, new_resource) 124 return new_resource
125
126 127 -class ConfigWriter:
128 """The interface to the cluster configuration. 129 130 @ivar _temporary_lvs: reservation manager for temporary LVs 131 @ivar _all_rms: a list of all temporary reservation managers 132 133 """
134 - def __init__(self, cfg_file=None, offline=False):
135 self.write_count = 0 136 self._lock = _config_lock 137 self._config_data = None 138 self._offline = offline 139 if cfg_file is None: 140 self._cfg_file = constants.CLUSTER_CONF_FILE 141 else: 142 self._cfg_file = cfg_file 143 self._temporary_ids = TemporaryReservationManager() 144 self._temporary_drbds = {} 145 self._temporary_macs = TemporaryReservationManager() 146 self._temporary_secrets = TemporaryReservationManager() 147 self._temporary_lvs = TemporaryReservationManager() 148 self._all_rms = [self._temporary_ids, self._temporary_macs, 149 self._temporary_secrets, self._temporary_lvs] 150 # Note: in order to prevent errors when resolving our name in 151 # _DistributeConfig, we compute it here once and reuse it; it's 152 # better to raise an error before starting to modify the config 153 # file than after it was modified 154 self._my_hostname = netutils.HostInfo().name 155 self._last_cluster_serial = -1 156 self._OpenConfig()
157 158 # this method needs to be static, so that we can call it on the class 159 @staticmethod
160 - def IsCluster():
161 """Check if the cluster is configured. 162 163 """ 164 return os.path.exists(constants.CLUSTER_CONF_FILE)
165
166 - def _GenerateOneMAC(self):
167 """Generate one mac address 168 169 """ 170 prefix = self._config_data.cluster.mac_prefix 171 byte1 = random.randrange(0, 256) 172 byte2 = random.randrange(0, 256) 173 byte3 = random.randrange(0, 256) 174 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3) 175 return mac
176 177 @locking.ssynchronized(_config_lock, shared=1)
178 - def GenerateMAC(self, ec_id):
179 """Generate a MAC for an instance. 180 181 This should check the current instances for duplicates. 182 183 """ 184 existing = self._AllMACs() 185 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
186 187 @locking.ssynchronized(_config_lock, shared=1)
188 - def ReserveMAC(self, mac, ec_id):
189 """Reserve a MAC for an instance. 190 191 This only checks instances managed by this cluster, it does not 192 check for potential collisions elsewhere. 193 194 """ 195 all_macs = self._AllMACs() 196 if mac in all_macs: 197 raise errors.ReservationError("mac already in use") 198 else: 199 self._temporary_macs.Reserve(mac, ec_id)
200 201 @locking.ssynchronized(_config_lock, shared=1)
202 - def ReserveLV(self, lv_name, ec_id):
203 """Reserve an VG/LV pair for an instance. 204 205 @type lv_name: string 206 @param lv_name: the logical volume name to reserve 207 208 """ 209 all_lvs = self._AllLVs() 210 if lv_name in all_lvs: 211 raise errors.ReservationError("LV already in use") 212 else: 213 self._temporary_lvs.Reserve(lv_name, ec_id)
214 215 @locking.ssynchronized(_config_lock, shared=1)
216 - def GenerateDRBDSecret(self, ec_id):
217 """Generate a DRBD secret. 218 219 This checks the current disks for duplicates. 220 221 """ 222 return self._temporary_secrets.Generate(self._AllDRBDSecrets(), 223 utils.GenerateSecret, 224 ec_id)
225
226 - def _AllLVs(self):
227 """Compute the list of all LVs. 228 229 """ 230 lvnames = set() 231 for instance in self._config_data.instances.values(): 232 node_data = instance.MapLVsByNode() 233 for lv_list in node_data.values(): 234 lvnames.update(lv_list) 235 return lvnames
236
237 - def _AllIDs(self, include_temporary):
238 """Compute the list of all UUIDs and names we have. 239 240 @type include_temporary: boolean 241 @param include_temporary: whether to include the _temporary_ids set 242 @rtype: set 243 @return: a set of IDs 244 245 """ 246 existing = set() 247 if include_temporary: 248 existing.update(self._temporary_ids.GetReserved()) 249 existing.update(self._AllLVs()) 250 existing.update(self._config_data.instances.keys()) 251 existing.update(self._config_data.nodes.keys()) 252 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid]) 253 return existing
254
255 - def _GenerateUniqueID(self, ec_id):
256 """Generate an unique UUID. 257 258 This checks the current node, instances and disk names for 259 duplicates. 260 261 @rtype: string 262 @return: the unique id 263 264 """ 265 existing = self._AllIDs(include_temporary=False) 266 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
267 268 @locking.ssynchronized(_config_lock, shared=1)
269 - def GenerateUniqueID(self, ec_id):
270 """Generate an unique ID. 271 272 This is just a wrapper over the unlocked version. 273 274 @type ec_id: string 275 @param ec_id: unique id for the job to reserve the id to 276 277 """ 278 return self._GenerateUniqueID(ec_id)
279
280 - def _AllMACs(self):
281 """Return all MACs present in the config. 282 283 @rtype: list 284 @return: the list of all MACs 285 286 """ 287 result = [] 288 for instance in self._config_data.instances.values(): 289 for nic in instance.nics: 290 result.append(nic.mac) 291 292 return result
293
294 - def _AllDRBDSecrets(self):
295 """Return all DRBD secrets present in the config. 296 297 @rtype: list 298 @return: the list of all DRBD secrets 299 300 """ 301 def helper(disk, result): 302 """Recursively gather secrets from this disk.""" 303 if disk.dev_type == constants.DT_DRBD8: 304 result.append(disk.logical_id[5]) 305 if disk.children: 306 for child in disk.children: 307 helper(child, result)
308 309 result = [] 310 for instance in self._config_data.instances.values(): 311 for disk in instance.disks: 312 helper(disk, result) 313 314 return result
315
316 - def _CheckDiskIDs(self, disk, l_ids, p_ids):
317 """Compute duplicate disk IDs 318 319 @type disk: L{objects.Disk} 320 @param disk: the disk at which to start searching 321 @type l_ids: list 322 @param l_ids: list of current logical ids 323 @type p_ids: list 324 @param p_ids: list of current physical ids 325 @rtype: list 326 @return: a list of error messages 327 328 """ 329 result = [] 330 if disk.logical_id is not None: 331 if disk.logical_id in l_ids: 332 result.append("duplicate logical id %s" % str(disk.logical_id)) 333 else: 334 l_ids.append(disk.logical_id) 335 if disk.physical_id is not None: 336 if disk.physical_id in p_ids: 337 result.append("duplicate physical id %s" % str(disk.physical_id)) 338 else: 339 p_ids.append(disk.physical_id) 340 341 if disk.children: 342 for child in disk.children: 343 result.extend(self._CheckDiskIDs(child, l_ids, p_ids)) 344 return result
345
346 - def _UnlockedVerifyConfig(self):
347 """Verify function. 348 349 @rtype: list 350 @return: a list of error messages; a non-empty list signifies 351 configuration errors 352 353 """ 354 result = [] 355 seen_macs = [] 356 ports = {} 357 data = self._config_data 358 seen_lids = [] 359 seen_pids = [] 360 361 # global cluster checks 362 if not data.cluster.enabled_hypervisors: 363 result.append("enabled hypervisors list doesn't have any entries") 364 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES 365 if invalid_hvs: 366 result.append("enabled hypervisors contains invalid entries: %s" % 367 invalid_hvs) 368 missing_hvp = (set(data.cluster.enabled_hypervisors) - 369 set(data.cluster.hvparams.keys())) 370 if missing_hvp: 371 result.append("hypervisor parameters missing for the enabled" 372 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp)) 373 374 if data.cluster.master_node not in data.nodes: 375 result.append("cluster has invalid primary node '%s'" % 376 data.cluster.master_node) 377 378 # per-instance checks 379 for instance_name in data.instances: 380 instance = data.instances[instance_name] 381 if instance.name != instance_name: 382 result.append("instance '%s' is indexed by wrong name '%s'" % 383 (instance.name, instance_name)) 384 if instance.primary_node not in data.nodes: 385 result.append("instance '%s' has invalid primary node '%s'" % 386 (instance_name, instance.primary_node)) 387 for snode in instance.secondary_nodes: 388 if snode not in data.nodes: 389 result.append("instance '%s' has invalid secondary node '%s'" % 390 (instance_name, snode)) 391 for idx, nic in enumerate(instance.nics): 392 if nic.mac in seen_macs: 393 result.append("instance '%s' has NIC %d mac %s duplicate" % 394 (instance_name, idx, nic.mac)) 395 else: 396 seen_macs.append(nic.mac) 397 398 # gather the drbd ports for duplicate checks 399 for dsk in instance.disks: 400 if dsk.dev_type in constants.LDS_DRBD: 401 tcp_port = dsk.logical_id[2] 402 if tcp_port not in ports: 403 ports[tcp_port] = [] 404 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name)) 405 # gather network port reservation 406 net_port = getattr(instance, "network_port", None) 407 if net_port is not None: 408 if net_port not in ports: 409 ports[net_port] = [] 410 ports[net_port].append((instance.name, "network port")) 411 412 # instance disk verify 413 for idx, disk in enumerate(instance.disks): 414 result.extend(["instance '%s' disk %d error: %s" % 415 (instance.name, idx, msg) for msg in disk.Verify()]) 416 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids)) 417 418 # cluster-wide pool of free ports 419 for free_port in data.cluster.tcpudp_port_pool: 420 if free_port not in ports: 421 ports[free_port] = [] 422 ports[free_port].append(("cluster", "port marked as free")) 423 424 # compute tcp/udp duplicate ports 425 keys = ports.keys() 426 keys.sort() 427 for pnum in keys: 428 pdata = ports[pnum] 429 if len(pdata) > 1: 430 txt = utils.CommaJoin(["%s/%s" % val for val in pdata]) 431 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt)) 432 433 # highest used tcp port check 434 if keys: 435 if keys[-1] > data.cluster.highest_used_port: 436 result.append("Highest used port mismatch, saved %s, computed %s" % 437 (data.cluster.highest_used_port, keys[-1])) 438 439 if not data.nodes[data.cluster.master_node].master_candidate: 440 result.append("Master node is not a master candidate") 441 442 # master candidate checks 443 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats() 444 if mc_now < mc_max: 445 result.append("Not enough master candidates: actual %d, target %d" % 446 (mc_now, mc_max)) 447 448 # node checks 449 for node_name, node in data.nodes.items(): 450 if node.name != node_name: 451 result.append("Node '%s' is indexed by wrong name '%s'" % 452 (node.name, node_name)) 453 if [node.master_candidate, node.drained, node.offline].count(True) > 1: 454 result.append("Node %s state is invalid: master_candidate=%s," 455 " drain=%s, offline=%s" % 456 (node.name, node.master_candidate, node.drained, 457 node.offline)) 458 459 # drbd minors check 460 _, duplicates = self._UnlockedComputeDRBDMap() 461 for node, minor, instance_a, instance_b in duplicates: 462 result.append("DRBD minor %d on node %s is assigned twice to instances" 463 " %s and %s" % (minor, node, instance_a, instance_b)) 464 465 # IP checks 466 default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT] 467 ips = {} 468 469 def _AddIpAddress(ip, name): 470 ips.setdefault(ip, []).append(name)
471 472 _AddIpAddress(data.cluster.master_ip, "cluster_ip") 473 474 for node in data.nodes.values(): 475 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name) 476 if node.secondary_ip != node.primary_ip: 477 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name) 478 479 for instance in data.instances.values(): 480 for idx, nic in enumerate(instance.nics): 481 if nic.ip is None: 482 continue 483 484 nicparams = objects.FillDict(default_nicparams, nic.nicparams) 485 nic_mode = nicparams[constants.NIC_MODE] 486 nic_link = nicparams[constants.NIC_LINK] 487 488 if nic_mode == constants.NIC_MODE_BRIDGED: 489 link = "bridge:%s" % nic_link 490 elif nic_mode == constants.NIC_MODE_ROUTED: 491 link = "route:%s" % nic_link 492 else: 493 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode) 494 495 _AddIpAddress("%s/%s" % (link, nic.ip), 496 "instance:%s/nic:%d" % (instance.name, idx)) 497 498 for ip, owners in ips.items(): 499 if len(owners) > 1: 500 result.append("IP address %s is used by multiple owners: %s" % 501 (ip, utils.CommaJoin(owners))) 502 503 return result 504 505 @locking.ssynchronized(_config_lock, shared=1)
506 - def VerifyConfig(self):
507 """Verify function. 508 509 This is just a wrapper over L{_UnlockedVerifyConfig}. 510 511 @rtype: list 512 @return: a list of error messages; a non-empty list signifies 513 configuration errors 514 515 """ 516 return self._UnlockedVerifyConfig()
517
518 - def _UnlockedSetDiskID(self, disk, node_name):
519 """Convert the unique ID to the ID needed on the target nodes. 520 521 This is used only for drbd, which needs ip/port configuration. 522 523 The routine descends down and updates its children also, because 524 this helps when the only the top device is passed to the remote 525 node. 526 527 This function is for internal use, when the config lock is already held. 528 529 """ 530 if disk.children: 531 for child in disk.children: 532 self._UnlockedSetDiskID(child, node_name) 533 534 if disk.logical_id is None and disk.physical_id is not None: 535 return 536 if disk.dev_type == constants.LD_DRBD8: 537 pnode, snode, port, pminor, sminor, secret = disk.logical_id 538 if node_name not in (pnode, snode): 539 raise errors.ConfigurationError("DRBD device not knowing node %s" % 540 node_name) 541 pnode_info = self._UnlockedGetNodeInfo(pnode) 542 snode_info = self._UnlockedGetNodeInfo(snode) 543 if pnode_info is None or snode_info is None: 544 raise errors.ConfigurationError("Can't find primary or secondary node" 545 " for %s" % str(disk)) 546 p_data = (pnode_info.secondary_ip, port) 547 s_data = (snode_info.secondary_ip, port) 548 if pnode == node_name: 549 disk.physical_id = p_data + s_data + (pminor, secret) 550 else: # it must be secondary, we tested above 551 disk.physical_id = s_data + p_data + (sminor, secret) 552 else: 553 disk.physical_id = disk.logical_id 554 return
555 556 @locking.ssynchronized(_config_lock)
557 - def SetDiskID(self, disk, node_name):
558 """Convert the unique ID to the ID needed on the target nodes. 559 560 This is used only for drbd, which needs ip/port configuration. 561 562 The routine descends down and updates its children also, because 563 this helps when the only the top device is passed to the remote 564 node. 565 566 """ 567 return self._UnlockedSetDiskID(disk, node_name)
568 569 @locking.ssynchronized(_config_lock)
570 - def AddTcpUdpPort(self, port):
571 """Adds a new port to the available port pool. 572 573 """ 574 if not isinstance(port, int): 575 raise errors.ProgrammerError("Invalid type passed for port") 576 577 self._config_data.cluster.tcpudp_port_pool.add(port) 578 self._WriteConfig()
579 580 @locking.ssynchronized(_config_lock, shared=1)
581 - def GetPortList(self):
582 """Returns a copy of the current port list. 583 584 """ 585 return self._config_data.cluster.tcpudp_port_pool.copy()
586 587 @locking.ssynchronized(_config_lock)
588 - def AllocatePort(self):
589 """Allocate a port. 590 591 The port will be taken from the available port pool or from the 592 default port range (and in this case we increase 593 highest_used_port). 594 595 """ 596 # If there are TCP/IP ports configured, we use them first. 597 if self._config_data.cluster.tcpudp_port_pool: 598 port = self._config_data.cluster.tcpudp_port_pool.pop() 599 else: 600 port = self._config_data.cluster.highest_used_port + 1 601 if port >= constants.LAST_DRBD_PORT: 602 raise errors.ConfigurationError("The highest used port is greater" 603 " than %s. Aborting." % 604 constants.LAST_DRBD_PORT) 605 self._config_data.cluster.highest_used_port = port 606 607 self._WriteConfig() 608 return port
609
610 - def _UnlockedComputeDRBDMap(self):
611 """Compute the used DRBD minor/nodes. 612 613 @rtype: (dict, list) 614 @return: dictionary of node_name: dict of minor: instance_name; 615 the returned dict will have all the nodes in it (even if with 616 an empty list), and a list of duplicates; if the duplicates 617 list is not empty, the configuration is corrupted and its caller 618 should raise an exception 619 620 """ 621 def _AppendUsedPorts(instance_name, disk, used): 622 duplicates = [] 623 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5: 624 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5] 625 for node, port in ((node_a, minor_a), (node_b, minor_b)): 626 assert node in used, ("Node '%s' of instance '%s' not found" 627 " in node list" % (node, instance_name)) 628 if port in used[node]: 629 duplicates.append((node, port, instance_name, used[node][port])) 630 else: 631 used[node][port] = instance_name 632 if disk.children: 633 for child in disk.children: 634 duplicates.extend(_AppendUsedPorts(instance_name, child, used)) 635 return duplicates
636 637 duplicates = [] 638 my_dict = dict((node, {}) for node in self._config_data.nodes) 639 for instance in self._config_data.instances.itervalues(): 640 for disk in instance.disks: 641 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict)) 642 for (node, minor), instance in self._temporary_drbds.iteritems(): 643 if minor in my_dict[node] and my_dict[node][minor] != instance: 644 duplicates.append((node, minor, instance, my_dict[node][minor])) 645 else: 646 my_dict[node][minor] = instance 647 return my_dict, duplicates 648 649 @locking.ssynchronized(_config_lock)
650 - def ComputeDRBDMap(self):
651 """Compute the used DRBD minor/nodes. 652 653 This is just a wrapper over L{_UnlockedComputeDRBDMap}. 654 655 @return: dictionary of node_name: dict of minor: instance_name; 656 the returned dict will have all the nodes in it (even if with 657 an empty list). 658 659 """ 660 d_map, duplicates = self._UnlockedComputeDRBDMap() 661 if duplicates: 662 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 663 str(duplicates)) 664 return d_map
665 666 @locking.ssynchronized(_config_lock)
667 - def AllocateDRBDMinor(self, nodes, instance):
668 """Allocate a drbd minor. 669 670 The free minor will be automatically computed from the existing 671 devices. A node can be given multiple times in order to allocate 672 multiple minors. The result is the list of minors, in the same 673 order as the passed nodes. 674 675 @type instance: string 676 @param instance: the instance for which we allocate minors 677 678 """ 679 assert isinstance(instance, basestring), \ 680 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance 681 682 d_map, duplicates = self._UnlockedComputeDRBDMap() 683 if duplicates: 684 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % 685 str(duplicates)) 686 result = [] 687 for nname in nodes: 688 ndata = d_map[nname] 689 if not ndata: 690 # no minors used, we can start at 0 691 result.append(0) 692 ndata[0] = instance 693 self._temporary_drbds[(nname, 0)] = instance 694 continue 695 keys = ndata.keys() 696 keys.sort() 697 ffree = utils.FirstFree(keys) 698 if ffree is None: 699 # return the next minor 700 # TODO: implement high-limit check 701 minor = keys[-1] + 1 702 else: 703 minor = ffree 704 # double-check minor against current instances 705 assert minor not in d_map[nname], \ 706 ("Attempt to reuse allocated DRBD minor %d on node %s," 707 " already allocated to instance %s" % 708 (minor, nname, d_map[nname][minor])) 709 ndata[minor] = instance 710 # double-check minor against reservation 711 r_key = (nname, minor) 712 assert r_key not in self._temporary_drbds, \ 713 ("Attempt to reuse reserved DRBD minor %d on node %s," 714 " reserved for instance %s" % 715 (minor, nname, self._temporary_drbds[r_key])) 716 self._temporary_drbds[r_key] = instance 717 result.append(minor) 718 logging.debug("Request to allocate drbd minors, input: %s, returning %s", 719 nodes, result) 720 return result
721
722 - def _UnlockedReleaseDRBDMinors(self, instance):
723 """Release temporary drbd minors allocated for a given instance. 724 725 @type instance: string 726 @param instance: the instance for which temporary minors should be 727 released 728 729 """ 730 assert isinstance(instance, basestring), \ 731 "Invalid argument passed to ReleaseDRBDMinors" 732 for key, name in self._temporary_drbds.items(): 733 if name == instance: 734 del self._temporary_drbds[key]
735 736 @locking.ssynchronized(_config_lock)
737 - def ReleaseDRBDMinors(self, instance):
738 """Release temporary drbd minors allocated for a given instance. 739 740 This should be called on the error paths, on the success paths 741 it's automatically called by the ConfigWriter add and update 742 functions. 743 744 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}. 745 746 @type instance: string 747 @param instance: the instance for which temporary minors should be 748 released 749 750 """ 751 self._UnlockedReleaseDRBDMinors(instance)
752 753 @locking.ssynchronized(_config_lock, shared=1)
754 - def GetConfigVersion(self):
755 """Get the configuration version. 756 757 @return: Config version 758 759 """ 760 return self._config_data.version
761 762 @locking.ssynchronized(_config_lock, shared=1)
763 - def GetClusterName(self):
764 """Get cluster name. 765 766 @return: Cluster name 767 768 """ 769 return self._config_data.cluster.cluster_name
770 771 @locking.ssynchronized(_config_lock, shared=1)
772 - def GetMasterNode(self):
773 """Get the hostname of the master node for this cluster. 774 775 @return: Master hostname 776 777 """ 778 return self._config_data.cluster.master_node
779 780 @locking.ssynchronized(_config_lock, shared=1)
781 - def GetMasterIP(self):
782 """Get the IP of the master node for this cluster. 783 784 @return: Master IP 785 786 """ 787 return self._config_data.cluster.master_ip
788 789 @locking.ssynchronized(_config_lock, shared=1)
790 - def GetMasterNetdev(self):
791 """Get the master network device for this cluster. 792 793 """ 794 return self._config_data.cluster.master_netdev
795 796 @locking.ssynchronized(_config_lock, shared=1)
797 - def GetFileStorageDir(self):
798 """Get the file storage dir for this cluster. 799 800 """ 801 return self._config_data.cluster.file_storage_dir
802 803 @locking.ssynchronized(_config_lock, shared=1)
804 - def GetHypervisorType(self):
805 """Get the hypervisor type for this cluster. 806 807 """ 808 return self._config_data.cluster.enabled_hypervisors[0]
809 810 @locking.ssynchronized(_config_lock, shared=1)
811 - def GetHostKey(self):
812 """Return the rsa hostkey from the config. 813 814 @rtype: string 815 @return: the rsa hostkey 816 817 """ 818 return self._config_data.cluster.rsahostkeypub
819 820 @locking.ssynchronized(_config_lock, shared=1)
821 - def GetDefaultIAllocator(self):
822 """Get the default instance allocator for this cluster. 823 824 """ 825 return self._config_data.cluster.default_iallocator
826 827 @locking.ssynchronized(_config_lock)
828 - def AddInstance(self, instance, ec_id):
829 """Add an instance to the config. 830 831 This should be used after creating a new instance. 832 833 @type instance: L{objects.Instance} 834 @param instance: the instance object 835 836 """ 837 if not isinstance(instance, objects.Instance): 838 raise errors.ProgrammerError("Invalid type passed to AddInstance") 839 840 if instance.disk_template != constants.DT_DISKLESS: 841 all_lvs = instance.MapLVsByNode() 842 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs) 843 844 all_macs = self._AllMACs() 845 for nic in instance.nics: 846 if nic.mac in all_macs: 847 raise errors.ConfigurationError("Cannot add instance %s:" 848 " MAC address '%s' already in use." % 849 (instance.name, nic.mac)) 850 851 self._EnsureUUID(instance, ec_id) 852 853 instance.serial_no = 1 854 instance.ctime = instance.mtime = time.time() 855 self._config_data.instances[instance.name] = instance 856 self._config_data.cluster.serial_no += 1 857 self._UnlockedReleaseDRBDMinors(instance.name) 858 self._WriteConfig()
859
860 - def _EnsureUUID(self, item, ec_id):
861 """Ensures a given object has a valid UUID. 862 863 @param item: the instance or node to be checked 864 @param ec_id: the execution context id for the uuid reservation 865 866 """ 867 if not item.uuid: 868 item.uuid = self._GenerateUniqueID(ec_id) 869 elif item.uuid in self._AllIDs(include_temporary=True): 870 raise errors.ConfigurationError("Cannot add '%s': UUID %s already" 871 " in use" % (item.name, item.uuid))
872
873 - def _SetInstanceStatus(self, instance_name, status):
874 """Set the instance's status to a given value. 875 876 """ 877 assert isinstance(status, bool), \ 878 "Invalid status '%s' passed to SetInstanceStatus" % (status,) 879 880 if instance_name not in self._config_data.instances: 881 raise errors.ConfigurationError("Unknown instance '%s'" % 882 instance_name) 883 instance = self._config_data.instances[instance_name] 884 if instance.admin_up != status: 885 instance.admin_up = status 886 instance.serial_no += 1 887 instance.mtime = time.time() 888 self._WriteConfig()
889 890 @locking.ssynchronized(_config_lock)
891 - def MarkInstanceUp(self, instance_name):
892 """Mark the instance status to up in the config. 893 894 """ 895 self._SetInstanceStatus(instance_name, True)
896 897 @locking.ssynchronized(_config_lock)
898 - def RemoveInstance(self, instance_name):
899 """Remove the instance from the configuration. 900 901 """ 902 if instance_name not in self._config_data.instances: 903 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) 904 del self._config_data.instances[instance_name] 905 self._config_data.cluster.serial_no += 1 906 self._WriteConfig()
907 908 @locking.ssynchronized(_config_lock)
909 - def RenameInstance(self, old_name, new_name):
910 """Rename an instance. 911 912 This needs to be done in ConfigWriter and not by RemoveInstance 913 combined with AddInstance as only we can guarantee an atomic 914 rename. 915 916 """ 917 if old_name not in self._config_data.instances: 918 raise errors.ConfigurationError("Unknown instance '%s'" % old_name) 919 inst = self._config_data.instances[old_name] 920 del self._config_data.instances[old_name] 921 inst.name = new_name 922 923 for disk in inst.disks: 924 if disk.dev_type == constants.LD_FILE: 925 # rename the file paths in logical and physical id 926 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1])) 927 disk_fname = "disk%s" % disk.iv_name.split("/")[1] 928 disk.physical_id = disk.logical_id = (disk.logical_id[0], 929 utils.PathJoin(file_storage_dir, 930 inst.name, 931 disk_fname)) 932 933 self._config_data.instances[inst.name] = inst 934 self._WriteConfig()
935 936 @locking.ssynchronized(_config_lock)
937 - def MarkInstanceDown(self, instance_name):
938 """Mark the status of an instance to down in the configuration. 939 940 """ 941 self._SetInstanceStatus(instance_name, False)
942
943 - def _UnlockedGetInstanceList(self):
944 """Get the list of instances. 945 946 This function is for internal use, when the config lock is already held. 947 948 """ 949 return self._config_data.instances.keys()
950 951 @locking.ssynchronized(_config_lock, shared=1)
952 - def GetInstanceList(self):
953 """Get the list of instances. 954 955 @return: array of instances, ex. ['instance2.example.com', 956 'instance1.example.com'] 957 958 """ 959 return self._UnlockedGetInstanceList()
960 961 @locking.ssynchronized(_config_lock, shared=1)
962 - def ExpandInstanceName(self, short_name):
963 """Attempt to expand an incomplete instance name. 964 965 """ 966 return utils.MatchNameComponent(short_name, 967 self._config_data.instances.keys(), 968 case_sensitive=False)
969
970 - def _UnlockedGetInstanceInfo(self, instance_name):
971 """Returns information about an instance. 972 973 This function is for internal use, when the config lock is already held. 974 975 """ 976 if instance_name not in self._config_data.instances: 977 return None 978 979 return self._config_data.instances[instance_name]
980 981 @locking.ssynchronized(_config_lock, shared=1)
982 - def GetInstanceInfo(self, instance_name):
983 """Returns information about an instance. 984 985 It takes the information from the configuration file. Other information of 986 an instance are taken from the live systems. 987 988 @param instance_name: name of the instance, e.g. 989 I{instance1.example.com} 990 991 @rtype: L{objects.Instance} 992 @return: the instance object 993 994 """ 995 return self._UnlockedGetInstanceInfo(instance_name)
996 997 @locking.ssynchronized(_config_lock, shared=1)
998 - def GetAllInstancesInfo(self):
999 """Get the configuration of all instances. 1000 1001 @rtype: dict 1002 @return: dict of (instance, instance_info), where instance_info is what 1003 would GetInstanceInfo return for the node 1004 1005 """ 1006 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance)) 1007 for instance in self._UnlockedGetInstanceList()]) 1008 return my_dict
1009 1010 @locking.ssynchronized(_config_lock)
1011 - def AddNode(self, node, ec_id):
1012 """Add a node to the configuration. 1013 1014 @type node: L{objects.Node} 1015 @param node: a Node instance 1016 1017 """ 1018 logging.info("Adding node %s to configuration", node.name) 1019 1020 self._EnsureUUID(node, ec_id) 1021 1022 node.serial_no = 1 1023 node.ctime = node.mtime = time.time() 1024 self._config_data.nodes[node.name] = node 1025 self._config_data.cluster.serial_no += 1 1026 self._WriteConfig()
1027 1028 @locking.ssynchronized(_config_lock)
1029 - def RemoveNode(self, node_name):
1030 """Remove a node from the configuration. 1031 1032 """ 1033 logging.info("Removing node %s from configuration", node_name) 1034 1035 if node_name not in self._config_data.nodes: 1036 raise errors.ConfigurationError("Unknown node '%s'" % node_name) 1037 1038 del self._config_data.nodes[node_name] 1039 self._config_data.cluster.serial_no += 1 1040 self._WriteConfig()
1041 1042 @locking.ssynchronized(_config_lock, shared=1)
1043 - def ExpandNodeName(self, short_name):
1044 """Attempt to expand an incomplete instance name. 1045 1046 """ 1047 return utils.MatchNameComponent(short_name, 1048 self._config_data.nodes.keys(), 1049 case_sensitive=False)
1050
1051 - def _UnlockedGetNodeInfo(self, node_name):
1052 """Get the configuration of a node, as stored in the config. 1053 1054 This function is for internal use, when the config lock is already 1055 held. 1056 1057 @param node_name: the node name, e.g. I{node1.example.com} 1058 1059 @rtype: L{objects.Node} 1060 @return: the node object 1061 1062 """ 1063 if node_name not in self._config_data.nodes: 1064 return None 1065 1066 return self._config_data.nodes[node_name]
1067 1068 @locking.ssynchronized(_config_lock, shared=1)
1069 - def GetNodeInfo(self, node_name):
1070 """Get the configuration of a node, as stored in the config. 1071 1072 This is just a locked wrapper over L{_UnlockedGetNodeInfo}. 1073 1074 @param node_name: the node name, e.g. I{node1.example.com} 1075 1076 @rtype: L{objects.Node} 1077 @return: the node object 1078 1079 """ 1080 return self._UnlockedGetNodeInfo(node_name)
1081
1082 - def _UnlockedGetNodeList(self):
1083 """Return the list of nodes which are in the configuration. 1084 1085 This function is for internal use, when the config lock is already 1086 held. 1087 1088 @rtype: list 1089 1090 """ 1091 return self._config_data.nodes.keys()
1092 1093 @locking.ssynchronized(_config_lock, shared=1)
1094 - def GetNodeList(self):
1095 """Return the list of nodes which are in the configuration. 1096 1097 """ 1098 return self._UnlockedGetNodeList()
1099
1100 - def _UnlockedGetOnlineNodeList(self):
1101 """Return the list of nodes which are online. 1102 1103 """ 1104 all_nodes = [self._UnlockedGetNodeInfo(node) 1105 for node in self._UnlockedGetNodeList()] 1106 return [node.name for node in all_nodes if not node.offline]
1107 1108 @locking.ssynchronized(_config_lock, shared=1)
1109 - def GetOnlineNodeList(self):
1110 """Return the list of nodes which are online. 1111 1112 """ 1113 return self._UnlockedGetOnlineNodeList()
1114 1115 @locking.ssynchronized(_config_lock, shared=1)
1116 - def GetAllNodesInfo(self):
1117 """Get the configuration of all nodes. 1118 1119 @rtype: dict 1120 @return: dict of (node, node_info), where node_info is what 1121 would GetNodeInfo return for the node 1122 1123 """ 1124 my_dict = dict([(node, self._UnlockedGetNodeInfo(node)) 1125 for node in self._UnlockedGetNodeList()]) 1126 return my_dict
1127
1128 - def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1129 """Get the number of current and maximum desired and possible candidates. 1130 1131 @type exceptions: list 1132 @param exceptions: if passed, list of nodes that should be ignored 1133 @rtype: tuple 1134 @return: tuple of (current, desired and possible, possible) 1135 1136 """ 1137 mc_now = mc_should = mc_max = 0 1138 for node in self._config_data.nodes.values(): 1139 if exceptions and node.name in exceptions: 1140 continue 1141 if not (node.offline or node.drained): 1142 mc_max += 1 1143 if node.master_candidate: 1144 mc_now += 1 1145 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size) 1146 return (mc_now, mc_should, mc_max)
1147 1148 @locking.ssynchronized(_config_lock, shared=1)
1149 - def GetMasterCandidateStats(self, exceptions=None):
1150 """Get the number of current and maximum possible candidates. 1151 1152 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}. 1153 1154 @type exceptions: list 1155 @param exceptions: if passed, list of nodes that should be ignored 1156 @rtype: tuple 1157 @return: tuple of (current, max) 1158 1159 """ 1160 return self._UnlockedGetMasterCandidateStats(exceptions)
1161 1162 @locking.ssynchronized(_config_lock)
1163 - def MaintainCandidatePool(self, exceptions):
1164 """Try to grow the candidate pool to the desired size. 1165 1166 @type exceptions: list 1167 @param exceptions: if passed, list of nodes that should be ignored 1168 @rtype: list 1169 @return: list with the adjusted nodes (L{objects.Node} instances) 1170 1171 """ 1172 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions) 1173 mod_list = [] 1174 if mc_now < mc_max: 1175 node_list = self._config_data.nodes.keys() 1176 random.shuffle(node_list) 1177 for name in node_list: 1178 if mc_now >= mc_max: 1179 break 1180 node = self._config_data.nodes[name] 1181 if (node.master_candidate or node.offline or node.drained or 1182 node.name in exceptions): 1183 continue 1184 mod_list.append(node) 1185 node.master_candidate = True 1186 node.serial_no += 1 1187 mc_now += 1 1188 if mc_now != mc_max: 1189 # this should not happen 1190 logging.warning("Warning: MaintainCandidatePool didn't manage to" 1191 " fill the candidate pool (%d/%d)", mc_now, mc_max) 1192 if mod_list: 1193 self._config_data.cluster.serial_no += 1 1194 self._WriteConfig() 1195 1196 return mod_list
1197
1198 - def _BumpSerialNo(self):
1199 """Bump up the serial number of the config. 1200 1201 """ 1202 self._config_data.serial_no += 1 1203 self._config_data.mtime = time.time()
1204
1205 - def _AllUUIDObjects(self):
1206 """Returns all objects with uuid attributes. 1207 1208 """ 1209 return (self._config_data.instances.values() + 1210 self._config_data.nodes.values() + 1211 [self._config_data.cluster])
1212
1213 - def _OpenConfig(self):
1214 """Read the config data from disk. 1215 1216 """ 1217 raw_data = utils.ReadFile(self._cfg_file) 1218 1219 try: 1220 data = objects.ConfigData.FromDict(serializer.Load(raw_data)) 1221 except Exception, err: 1222 raise errors.ConfigurationError(err) 1223 1224 # Make sure the configuration has the right version 1225 _ValidateConfig(data) 1226 1227 if (not hasattr(data, 'cluster') or 1228 not hasattr(data.cluster, 'rsahostkeypub')): 1229 raise errors.ConfigurationError("Incomplete configuration" 1230 " (missing cluster.rsahostkeypub)") 1231 1232 # Upgrade configuration if needed 1233 data.UpgradeConfig() 1234 1235 self._config_data = data 1236 # reset the last serial as -1 so that the next write will cause 1237 # ssconf update 1238 self._last_cluster_serial = -1 1239 1240 # And finally run our (custom) config upgrade sequence 1241 self._UpgradeConfig()
1242
1243 - def _UpgradeConfig(self):
1244 """Run upgrade steps that cannot be done purely in the objects. 1245 1246 This is because some data elements need uniqueness across the 1247 whole configuration, etc. 1248 1249 @warning: this function will call L{_WriteConfig()}, but also 1250 L{DropECReservations} so it needs to be called only from a 1251 "safe" place (the constructor). If one wanted to call it with 1252 the lock held, a DropECReservationUnlocked would need to be 1253 created first, to avoid causing deadlock. 1254 1255 """ 1256 modified = False 1257 for item in self._AllUUIDObjects(): 1258 if item.uuid is None: 1259 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID) 1260 modified = True 1261 if modified: 1262 self._WriteConfig() 1263 # This is ok even if it acquires the internal lock, as _UpgradeConfig is 1264 # only called at config init time, without the lock held 1265 self.DropECReservations(_UPGRADE_CONFIG_JID)
1266
1267 - def _DistributeConfig(self, feedback_fn):
1268 """Distribute the configuration to the other nodes. 1269 1270 Currently, this only copies the configuration file. In the future, 1271 it could be used to encapsulate the 2/3-phase update mechanism. 1272 1273 """ 1274 if self._offline: 1275 return True 1276 1277 bad = False 1278 1279 node_list = [] 1280 addr_list = [] 1281 myhostname = self._my_hostname 1282 # we can skip checking whether _UnlockedGetNodeInfo returns None 1283 # since the node list comes from _UnlocketGetNodeList, and we are 1284 # called with the lock held, so no modifications should take place 1285 # in between 1286 for node_name in self._UnlockedGetNodeList(): 1287 if node_name == myhostname: 1288 continue 1289 node_info = self._UnlockedGetNodeInfo(node_name) 1290 if not node_info.master_candidate: 1291 continue 1292 node_list.append(node_info.name) 1293 addr_list.append(node_info.primary_ip) 1294 1295 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file, 1296 address_list=addr_list) 1297 for to_node, to_result in result.items(): 1298 msg = to_result.fail_msg 1299 if msg: 1300 msg = ("Copy of file %s to node %s failed: %s" % 1301 (self._cfg_file, to_node, msg)) 1302 logging.error(msg) 1303 1304 if feedback_fn: 1305 feedback_fn(msg) 1306 1307 bad = True 1308 1309 return not bad
1310
1311 - def _WriteConfig(self, destination=None, feedback_fn=None):
1312 """Write the configuration data to persistent storage. 1313 1314 """ 1315 assert feedback_fn is None or callable(feedback_fn) 1316 1317 # Warn on config errors, but don't abort the save - the 1318 # configuration has already been modified, and we can't revert; 1319 # the best we can do is to warn the user and save as is, leaving 1320 # recovery to the user 1321 config_errors = self._UnlockedVerifyConfig() 1322 if config_errors: 1323 errmsg = ("Configuration data is not consistent: %s" % 1324 (utils.CommaJoin(config_errors))) 1325 logging.critical(errmsg) 1326 if feedback_fn: 1327 feedback_fn(errmsg) 1328 1329 if destination is None: 1330 destination = self._cfg_file 1331 self._BumpSerialNo() 1332 txt = serializer.Dump(self._config_data.ToDict()) 1333 1334 utils.WriteFile(destination, data=txt) 1335 1336 self.write_count += 1 1337 1338 # and redistribute the config file to master candidates 1339 self._DistributeConfig(feedback_fn) 1340 1341 # Write ssconf files on all nodes (including locally) 1342 if self._last_cluster_serial < self._config_data.cluster.serial_no: 1343 if not self._offline: 1344 result = rpc.RpcRunner.call_write_ssconf_files( 1345 self._UnlockedGetOnlineNodeList(), 1346 self._UnlockedGetSsconfValues()) 1347 1348 for nname, nresu in result.items(): 1349 msg = nresu.fail_msg 1350 if msg: 1351 errmsg = ("Error while uploading ssconf files to" 1352 " node %s: %s" % (nname, msg)) 1353 logging.warning(errmsg) 1354 1355 if feedback_fn: 1356 feedback_fn(errmsg) 1357 1358 self._last_cluster_serial = self._config_data.cluster.serial_no
1359
1360 - def _UnlockedGetSsconfValues(self):
1361 """Return the values needed by ssconf. 1362 1363 @rtype: dict 1364 @return: a dictionary with keys the ssconf names and values their 1365 associated value 1366 1367 """ 1368 fn = "\n".join 1369 instance_names = utils.NiceSort(self._UnlockedGetInstanceList()) 1370 node_names = utils.NiceSort(self._UnlockedGetNodeList()) 1371 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names] 1372 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip) 1373 for ninfo in node_info] 1374 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip) 1375 for ninfo in node_info] 1376 1377 instance_data = fn(instance_names) 1378 off_data = fn(node.name for node in node_info if node.offline) 1379 on_data = fn(node.name for node in node_info if not node.offline) 1380 mc_data = fn(node.name for node in node_info if node.master_candidate) 1381 mc_ips_data = fn(node.primary_ip for node in node_info 1382 if node.master_candidate) 1383 node_data = fn(node_names) 1384 node_pri_ips_data = fn(node_pri_ips) 1385 node_snd_ips_data = fn(node_snd_ips) 1386 1387 cluster = self._config_data.cluster 1388 cluster_tags = fn(cluster.GetTags()) 1389 1390 hypervisor_list = fn(cluster.enabled_hypervisors) 1391 1392 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n") 1393 1394 return { 1395 constants.SS_CLUSTER_NAME: cluster.cluster_name, 1396 constants.SS_CLUSTER_TAGS: cluster_tags, 1397 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir, 1398 constants.SS_MASTER_CANDIDATES: mc_data, 1399 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data, 1400 constants.SS_MASTER_IP: cluster.master_ip, 1401 constants.SS_MASTER_NETDEV: cluster.master_netdev, 1402 constants.SS_MASTER_NODE: cluster.master_node, 1403 constants.SS_NODE_LIST: node_data, 1404 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data, 1405 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data, 1406 constants.SS_OFFLINE_NODES: off_data, 1407 constants.SS_ONLINE_NODES: on_data, 1408 constants.SS_INSTANCE_LIST: instance_data, 1409 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION, 1410 constants.SS_HYPERVISOR_LIST: hypervisor_list, 1411 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health), 1412 constants.SS_UID_POOL: uid_pool, 1413 }
1414 1415 @locking.ssynchronized(_config_lock, shared=1)
1416 - def GetVGName(self):
1417 """Return the volume group name. 1418 1419 """ 1420 return self._config_data.cluster.volume_group_name
1421 1422 @locking.ssynchronized(_config_lock)
1423 - def SetVGName(self, vg_name):
1424 """Set the volume group name. 1425 1426 """ 1427 self._config_data.cluster.volume_group_name = vg_name 1428 self._config_data.cluster.serial_no += 1 1429 self._WriteConfig()
1430 1431 @locking.ssynchronized(_config_lock, shared=1)
1432 - def GetDRBDHelper(self):
1433 """Return DRBD usermode helper. 1434 1435 """ 1436 return self._config_data.cluster.drbd_usermode_helper
1437 1438 @locking.ssynchronized(_config_lock)
1439 - def SetDRBDHelper(self, drbd_helper):
1440 """Set DRBD usermode helper. 1441 1442 """ 1443 self._config_data.cluster.drbd_usermode_helper = drbd_helper 1444 self._config_data.cluster.serial_no += 1 1445 self._WriteConfig()
1446 1447 @locking.ssynchronized(_config_lock, shared=1)
1448 - def GetMACPrefix(self):
1449 """Return the mac prefix. 1450 1451 """ 1452 return self._config_data.cluster.mac_prefix
1453 1454 @locking.ssynchronized(_config_lock, shared=1)
1455 - def GetClusterInfo(self):
1456 """Returns information about the cluster 1457 1458 @rtype: L{objects.Cluster} 1459 @return: the cluster object 1460 1461 """ 1462 return self._config_data.cluster
1463 1464 @locking.ssynchronized(_config_lock, shared=1)
1465 - def HasAnyDiskOfType(self, dev_type):
1466 """Check if in there is at disk of the given type in the configuration. 1467 1468 """ 1469 return self._config_data.HasAnyDiskOfType(dev_type)
1470 1471 @locking.ssynchronized(_config_lock)
1472 - def Update(self, target, feedback_fn):
1473 """Notify function to be called after updates. 1474 1475 This function must be called when an object (as returned by 1476 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the 1477 caller wants the modifications saved to the backing store. Note 1478 that all modified objects will be saved, but the target argument 1479 is the one the caller wants to ensure that it's saved. 1480 1481 @param target: an instance of either L{objects.Cluster}, 1482 L{objects.Node} or L{objects.Instance} which is existing in 1483 the cluster 1484 @param feedback_fn: Callable feedback function 1485 1486 """ 1487 if self._config_data is None: 1488 raise errors.ProgrammerError("Configuration file not read," 1489 " cannot save.") 1490 update_serial = False 1491 if isinstance(target, objects.Cluster): 1492 test = target == self._config_data.cluster 1493 elif isinstance(target, objects.Node): 1494 test = target in self._config_data.nodes.values() 1495 update_serial = True 1496 elif isinstance(target, objects.Instance): 1497 test = target in self._config_data.instances.values() 1498 else: 1499 raise errors.ProgrammerError("Invalid object type (%s) passed to" 1500 " ConfigWriter.Update" % type(target)) 1501 if not test: 1502 raise errors.ConfigurationError("Configuration updated since object" 1503 " has been read or unknown object") 1504 target.serial_no += 1 1505 target.mtime = now = time.time() 1506 1507 if update_serial: 1508 # for node updates, we need to increase the cluster serial too 1509 self._config_data.cluster.serial_no += 1 1510 self._config_data.cluster.mtime = now 1511 1512 if isinstance(target, objects.Instance): 1513 self._UnlockedReleaseDRBDMinors(target.name) 1514 1515 self._WriteConfig(feedback_fn=feedback_fn)
1516 1517 @locking.ssynchronized(_config_lock)
1518 - def DropECReservations(self, ec_id):
1519 """Drop per-execution-context reservations 1520 1521 """ 1522 for rm in self._all_rms: 1523 rm.DropECReservations(ec_id)
1524