1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import random
36 import logging
37 import time
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46 from ganeti import uidpool
47 from ganeti import netutils
48
49
50 _config_lock = locking.SharedLock("ConfigWriter")
51
52
53 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
57 """Verifies that a configuration objects looks valid.
58
59 This only verifies the version of the configuration.
60
61 @raise errors.ConfigurationError: if the version differs from what
62 we expect
63
64 """
65 if data.version != constants.CONFIG_VERSION:
66 raise errors.ConfigurationError("Cluster configuration version"
67 " mismatch, got %s instead of %s" %
68 (data.version,
69 constants.CONFIG_VERSION))
70
73 """A temporary resource reservation manager.
74
75 This is used to reserve resources in a job, before using them, making sure
76 other jobs cannot get them in the meantime.
77
78 """
80 self._ec_reserved = {}
81
83 for holder_reserved in self._ec_reserved.items():
84 if resource in holder_reserved:
85 return True
86 return False
87
88 - def Reserve(self, ec_id, resource):
89 if self.Reserved(resource):
90 raise errors.ReservationError("Duplicate reservation for resource: %s." %
91 (resource))
92 if ec_id not in self._ec_reserved:
93 self._ec_reserved[ec_id] = set([resource])
94 else:
95 self._ec_reserved[ec_id].add(resource)
96
98 if ec_id in self._ec_reserved:
99 del self._ec_reserved[ec_id]
100
102 all_reserved = set()
103 for holder_reserved in self._ec_reserved.values():
104 all_reserved.update(holder_reserved)
105 return all_reserved
106
107 - def Generate(self, existing, generate_one_fn, ec_id):
108 """Generate a new resource of this type
109
110 """
111 assert callable(generate_one_fn)
112
113 all_elems = self.GetReserved()
114 all_elems.update(existing)
115 retries = 64
116 while retries > 0:
117 new_resource = generate_one_fn()
118 if new_resource is not None and new_resource not in all_elems:
119 break
120 else:
121 raise errors.ConfigurationError("Not able generate new resource"
122 " (last tried: %s)" % new_resource)
123 self.Reserve(ec_id, new_resource)
124 return new_resource
125
128 """The interface to the cluster configuration.
129
130 @ivar _temporary_lvs: reservation manager for temporary LVs
131 @ivar _all_rms: a list of all temporary reservation managers
132
133 """
134 - def __init__(self, cfg_file=None, offline=False):
157
158
159 @staticmethod
165
167 """Generate one mac address
168
169 """
170 prefix = self._config_data.cluster.mac_prefix
171 byte1 = random.randrange(0, 256)
172 byte2 = random.randrange(0, 256)
173 byte3 = random.randrange(0, 256)
174 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
175 return mac
176
177 @locking.ssynchronized(_config_lock, shared=1)
179 """Generate a MAC for an instance.
180
181 This should check the current instances for duplicates.
182
183 """
184 existing = self._AllMACs()
185 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
186
187 @locking.ssynchronized(_config_lock, shared=1)
189 """Reserve a MAC for an instance.
190
191 This only checks instances managed by this cluster, it does not
192 check for potential collisions elsewhere.
193
194 """
195 all_macs = self._AllMACs()
196 if mac in all_macs:
197 raise errors.ReservationError("mac already in use")
198 else:
199 self._temporary_macs.Reserve(mac, ec_id)
200
201 @locking.ssynchronized(_config_lock, shared=1)
203 """Reserve an VG/LV pair for an instance.
204
205 @type lv_name: string
206 @param lv_name: the logical volume name to reserve
207
208 """
209 all_lvs = self._AllLVs()
210 if lv_name in all_lvs:
211 raise errors.ReservationError("LV already in use")
212 else:
213 self._temporary_lvs.Reserve(lv_name, ec_id)
214
215 @locking.ssynchronized(_config_lock, shared=1)
225
227 """Compute the list of all LVs.
228
229 """
230 lvnames = set()
231 for instance in self._config_data.instances.values():
232 node_data = instance.MapLVsByNode()
233 for lv_list in node_data.values():
234 lvnames.update(lv_list)
235 return lvnames
236
237 - def _AllIDs(self, include_temporary):
238 """Compute the list of all UUIDs and names we have.
239
240 @type include_temporary: boolean
241 @param include_temporary: whether to include the _temporary_ids set
242 @rtype: set
243 @return: a set of IDs
244
245 """
246 existing = set()
247 if include_temporary:
248 existing.update(self._temporary_ids.GetReserved())
249 existing.update(self._AllLVs())
250 existing.update(self._config_data.instances.keys())
251 existing.update(self._config_data.nodes.keys())
252 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
253 return existing
254
256 """Generate an unique UUID.
257
258 This checks the current node, instances and disk names for
259 duplicates.
260
261 @rtype: string
262 @return: the unique id
263
264 """
265 existing = self._AllIDs(include_temporary=False)
266 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
267
268 @locking.ssynchronized(_config_lock, shared=1)
270 """Generate an unique ID.
271
272 This is just a wrapper over the unlocked version.
273
274 @type ec_id: string
275 @param ec_id: unique id for the job to reserve the id to
276
277 """
278 return self._GenerateUniqueID(ec_id)
279
281 """Return all MACs present in the config.
282
283 @rtype: list
284 @return: the list of all MACs
285
286 """
287 result = []
288 for instance in self._config_data.instances.values():
289 for nic in instance.nics:
290 result.append(nic.mac)
291
292 return result
293
295 """Return all DRBD secrets present in the config.
296
297 @rtype: list
298 @return: the list of all DRBD secrets
299
300 """
301 def helper(disk, result):
302 """Recursively gather secrets from this disk."""
303 if disk.dev_type == constants.DT_DRBD8:
304 result.append(disk.logical_id[5])
305 if disk.children:
306 for child in disk.children:
307 helper(child, result)
308
309 result = []
310 for instance in self._config_data.instances.values():
311 for disk in instance.disks:
312 helper(disk, result)
313
314 return result
315
317 """Compute duplicate disk IDs
318
319 @type disk: L{objects.Disk}
320 @param disk: the disk at which to start searching
321 @type l_ids: list
322 @param l_ids: list of current logical ids
323 @type p_ids: list
324 @param p_ids: list of current physical ids
325 @rtype: list
326 @return: a list of error messages
327
328 """
329 result = []
330 if disk.logical_id is not None:
331 if disk.logical_id in l_ids:
332 result.append("duplicate logical id %s" % str(disk.logical_id))
333 else:
334 l_ids.append(disk.logical_id)
335 if disk.physical_id is not None:
336 if disk.physical_id in p_ids:
337 result.append("duplicate physical id %s" % str(disk.physical_id))
338 else:
339 p_ids.append(disk.physical_id)
340
341 if disk.children:
342 for child in disk.children:
343 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
344 return result
345
347 """Verify function.
348
349 @rtype: list
350 @return: a list of error messages; a non-empty list signifies
351 configuration errors
352
353 """
354 result = []
355 seen_macs = []
356 ports = {}
357 data = self._config_data
358 seen_lids = []
359 seen_pids = []
360
361
362 if not data.cluster.enabled_hypervisors:
363 result.append("enabled hypervisors list doesn't have any entries")
364 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
365 if invalid_hvs:
366 result.append("enabled hypervisors contains invalid entries: %s" %
367 invalid_hvs)
368 missing_hvp = (set(data.cluster.enabled_hypervisors) -
369 set(data.cluster.hvparams.keys()))
370 if missing_hvp:
371 result.append("hypervisor parameters missing for the enabled"
372 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
373
374 if data.cluster.master_node not in data.nodes:
375 result.append("cluster has invalid primary node '%s'" %
376 data.cluster.master_node)
377
378
379 for instance_name in data.instances:
380 instance = data.instances[instance_name]
381 if instance.name != instance_name:
382 result.append("instance '%s' is indexed by wrong name '%s'" %
383 (instance.name, instance_name))
384 if instance.primary_node not in data.nodes:
385 result.append("instance '%s' has invalid primary node '%s'" %
386 (instance_name, instance.primary_node))
387 for snode in instance.secondary_nodes:
388 if snode not in data.nodes:
389 result.append("instance '%s' has invalid secondary node '%s'" %
390 (instance_name, snode))
391 for idx, nic in enumerate(instance.nics):
392 if nic.mac in seen_macs:
393 result.append("instance '%s' has NIC %d mac %s duplicate" %
394 (instance_name, idx, nic.mac))
395 else:
396 seen_macs.append(nic.mac)
397
398
399 for dsk in instance.disks:
400 if dsk.dev_type in constants.LDS_DRBD:
401 tcp_port = dsk.logical_id[2]
402 if tcp_port not in ports:
403 ports[tcp_port] = []
404 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
405
406 net_port = getattr(instance, "network_port", None)
407 if net_port is not None:
408 if net_port not in ports:
409 ports[net_port] = []
410 ports[net_port].append((instance.name, "network port"))
411
412
413 for idx, disk in enumerate(instance.disks):
414 result.extend(["instance '%s' disk %d error: %s" %
415 (instance.name, idx, msg) for msg in disk.Verify()])
416 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
417
418
419 for free_port in data.cluster.tcpudp_port_pool:
420 if free_port not in ports:
421 ports[free_port] = []
422 ports[free_port].append(("cluster", "port marked as free"))
423
424
425 keys = ports.keys()
426 keys.sort()
427 for pnum in keys:
428 pdata = ports[pnum]
429 if len(pdata) > 1:
430 txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
431 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
432
433
434 if keys:
435 if keys[-1] > data.cluster.highest_used_port:
436 result.append("Highest used port mismatch, saved %s, computed %s" %
437 (data.cluster.highest_used_port, keys[-1]))
438
439 if not data.nodes[data.cluster.master_node].master_candidate:
440 result.append("Master node is not a master candidate")
441
442
443 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
444 if mc_now < mc_max:
445 result.append("Not enough master candidates: actual %d, target %d" %
446 (mc_now, mc_max))
447
448
449 for node_name, node in data.nodes.items():
450 if node.name != node_name:
451 result.append("Node '%s' is indexed by wrong name '%s'" %
452 (node.name, node_name))
453 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
454 result.append("Node %s state is invalid: master_candidate=%s,"
455 " drain=%s, offline=%s" %
456 (node.name, node.master_candidate, node.drained,
457 node.offline))
458
459
460 _, duplicates = self._UnlockedComputeDRBDMap()
461 for node, minor, instance_a, instance_b in duplicates:
462 result.append("DRBD minor %d on node %s is assigned twice to instances"
463 " %s and %s" % (minor, node, instance_a, instance_b))
464
465
466 default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
467 ips = {}
468
469 def _AddIpAddress(ip, name):
470 ips.setdefault(ip, []).append(name)
471
472 _AddIpAddress(data.cluster.master_ip, "cluster_ip")
473
474 for node in data.nodes.values():
475 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
476 if node.secondary_ip != node.primary_ip:
477 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
478
479 for instance in data.instances.values():
480 for idx, nic in enumerate(instance.nics):
481 if nic.ip is None:
482 continue
483
484 nicparams = objects.FillDict(default_nicparams, nic.nicparams)
485 nic_mode = nicparams[constants.NIC_MODE]
486 nic_link = nicparams[constants.NIC_LINK]
487
488 if nic_mode == constants.NIC_MODE_BRIDGED:
489 link = "bridge:%s" % nic_link
490 elif nic_mode == constants.NIC_MODE_ROUTED:
491 link = "route:%s" % nic_link
492 else:
493 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
494
495 _AddIpAddress("%s/%s" % (link, nic.ip),
496 "instance:%s/nic:%d" % (instance.name, idx))
497
498 for ip, owners in ips.items():
499 if len(owners) > 1:
500 result.append("IP address %s is used by multiple owners: %s" %
501 (ip, utils.CommaJoin(owners)))
502
503 return result
504
505 @locking.ssynchronized(_config_lock, shared=1)
507 """Verify function.
508
509 This is just a wrapper over L{_UnlockedVerifyConfig}.
510
511 @rtype: list
512 @return: a list of error messages; a non-empty list signifies
513 configuration errors
514
515 """
516 return self._UnlockedVerifyConfig()
517
519 """Convert the unique ID to the ID needed on the target nodes.
520
521 This is used only for drbd, which needs ip/port configuration.
522
523 The routine descends down and updates its children also, because
524 this helps when the only the top device is passed to the remote
525 node.
526
527 This function is for internal use, when the config lock is already held.
528
529 """
530 if disk.children:
531 for child in disk.children:
532 self._UnlockedSetDiskID(child, node_name)
533
534 if disk.logical_id is None and disk.physical_id is not None:
535 return
536 if disk.dev_type == constants.LD_DRBD8:
537 pnode, snode, port, pminor, sminor, secret = disk.logical_id
538 if node_name not in (pnode, snode):
539 raise errors.ConfigurationError("DRBD device not knowing node %s" %
540 node_name)
541 pnode_info = self._UnlockedGetNodeInfo(pnode)
542 snode_info = self._UnlockedGetNodeInfo(snode)
543 if pnode_info is None or snode_info is None:
544 raise errors.ConfigurationError("Can't find primary or secondary node"
545 " for %s" % str(disk))
546 p_data = (pnode_info.secondary_ip, port)
547 s_data = (snode_info.secondary_ip, port)
548 if pnode == node_name:
549 disk.physical_id = p_data + s_data + (pminor, secret)
550 else:
551 disk.physical_id = s_data + p_data + (sminor, secret)
552 else:
553 disk.physical_id = disk.logical_id
554 return
555
556 @locking.ssynchronized(_config_lock)
558 """Convert the unique ID to the ID needed on the target nodes.
559
560 This is used only for drbd, which needs ip/port configuration.
561
562 The routine descends down and updates its children also, because
563 this helps when the only the top device is passed to the remote
564 node.
565
566 """
567 return self._UnlockedSetDiskID(disk, node_name)
568
569 @locking.ssynchronized(_config_lock)
571 """Adds a new port to the available port pool.
572
573 """
574 if not isinstance(port, int):
575 raise errors.ProgrammerError("Invalid type passed for port")
576
577 self._config_data.cluster.tcpudp_port_pool.add(port)
578 self._WriteConfig()
579
580 @locking.ssynchronized(_config_lock, shared=1)
582 """Returns a copy of the current port list.
583
584 """
585 return self._config_data.cluster.tcpudp_port_pool.copy()
586
587 @locking.ssynchronized(_config_lock)
589 """Allocate a port.
590
591 The port will be taken from the available port pool or from the
592 default port range (and in this case we increase
593 highest_used_port).
594
595 """
596
597 if self._config_data.cluster.tcpudp_port_pool:
598 port = self._config_data.cluster.tcpudp_port_pool.pop()
599 else:
600 port = self._config_data.cluster.highest_used_port + 1
601 if port >= constants.LAST_DRBD_PORT:
602 raise errors.ConfigurationError("The highest used port is greater"
603 " than %s. Aborting." %
604 constants.LAST_DRBD_PORT)
605 self._config_data.cluster.highest_used_port = port
606
607 self._WriteConfig()
608 return port
609
611 """Compute the used DRBD minor/nodes.
612
613 @rtype: (dict, list)
614 @return: dictionary of node_name: dict of minor: instance_name;
615 the returned dict will have all the nodes in it (even if with
616 an empty list), and a list of duplicates; if the duplicates
617 list is not empty, the configuration is corrupted and its caller
618 should raise an exception
619
620 """
621 def _AppendUsedPorts(instance_name, disk, used):
622 duplicates = []
623 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
624 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
625 for node, port in ((node_a, minor_a), (node_b, minor_b)):
626 assert node in used, ("Node '%s' of instance '%s' not found"
627 " in node list" % (node, instance_name))
628 if port in used[node]:
629 duplicates.append((node, port, instance_name, used[node][port]))
630 else:
631 used[node][port] = instance_name
632 if disk.children:
633 for child in disk.children:
634 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
635 return duplicates
636
637 duplicates = []
638 my_dict = dict((node, {}) for node in self._config_data.nodes)
639 for instance in self._config_data.instances.itervalues():
640 for disk in instance.disks:
641 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
642 for (node, minor), instance in self._temporary_drbds.iteritems():
643 if minor in my_dict[node] and my_dict[node][minor] != instance:
644 duplicates.append((node, minor, instance, my_dict[node][minor]))
645 else:
646 my_dict[node][minor] = instance
647 return my_dict, duplicates
648
649 @locking.ssynchronized(_config_lock)
651 """Compute the used DRBD minor/nodes.
652
653 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
654
655 @return: dictionary of node_name: dict of minor: instance_name;
656 the returned dict will have all the nodes in it (even if with
657 an empty list).
658
659 """
660 d_map, duplicates = self._UnlockedComputeDRBDMap()
661 if duplicates:
662 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
663 str(duplicates))
664 return d_map
665
666 @locking.ssynchronized(_config_lock)
668 """Allocate a drbd minor.
669
670 The free minor will be automatically computed from the existing
671 devices. A node can be given multiple times in order to allocate
672 multiple minors. The result is the list of minors, in the same
673 order as the passed nodes.
674
675 @type instance: string
676 @param instance: the instance for which we allocate minors
677
678 """
679 assert isinstance(instance, basestring), \
680 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
681
682 d_map, duplicates = self._UnlockedComputeDRBDMap()
683 if duplicates:
684 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
685 str(duplicates))
686 result = []
687 for nname in nodes:
688 ndata = d_map[nname]
689 if not ndata:
690
691 result.append(0)
692 ndata[0] = instance
693 self._temporary_drbds[(nname, 0)] = instance
694 continue
695 keys = ndata.keys()
696 keys.sort()
697 ffree = utils.FirstFree(keys)
698 if ffree is None:
699
700
701 minor = keys[-1] + 1
702 else:
703 minor = ffree
704
705 assert minor not in d_map[nname], \
706 ("Attempt to reuse allocated DRBD minor %d on node %s,"
707 " already allocated to instance %s" %
708 (minor, nname, d_map[nname][minor]))
709 ndata[minor] = instance
710
711 r_key = (nname, minor)
712 assert r_key not in self._temporary_drbds, \
713 ("Attempt to reuse reserved DRBD minor %d on node %s,"
714 " reserved for instance %s" %
715 (minor, nname, self._temporary_drbds[r_key]))
716 self._temporary_drbds[r_key] = instance
717 result.append(minor)
718 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
719 nodes, result)
720 return result
721
723 """Release temporary drbd minors allocated for a given instance.
724
725 @type instance: string
726 @param instance: the instance for which temporary minors should be
727 released
728
729 """
730 assert isinstance(instance, basestring), \
731 "Invalid argument passed to ReleaseDRBDMinors"
732 for key, name in self._temporary_drbds.items():
733 if name == instance:
734 del self._temporary_drbds[key]
735
736 @locking.ssynchronized(_config_lock)
738 """Release temporary drbd minors allocated for a given instance.
739
740 This should be called on the error paths, on the success paths
741 it's automatically called by the ConfigWriter add and update
742 functions.
743
744 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
745
746 @type instance: string
747 @param instance: the instance for which temporary minors should be
748 released
749
750 """
751 self._UnlockedReleaseDRBDMinors(instance)
752
753 @locking.ssynchronized(_config_lock, shared=1)
755 """Get the configuration version.
756
757 @return: Config version
758
759 """
760 return self._config_data.version
761
762 @locking.ssynchronized(_config_lock, shared=1)
764 """Get cluster name.
765
766 @return: Cluster name
767
768 """
769 return self._config_data.cluster.cluster_name
770
771 @locking.ssynchronized(_config_lock, shared=1)
773 """Get the hostname of the master node for this cluster.
774
775 @return: Master hostname
776
777 """
778 return self._config_data.cluster.master_node
779
780 @locking.ssynchronized(_config_lock, shared=1)
782 """Get the IP of the master node for this cluster.
783
784 @return: Master IP
785
786 """
787 return self._config_data.cluster.master_ip
788
789 @locking.ssynchronized(_config_lock, shared=1)
791 """Get the master network device for this cluster.
792
793 """
794 return self._config_data.cluster.master_netdev
795
796 @locking.ssynchronized(_config_lock, shared=1)
798 """Get the file storage dir for this cluster.
799
800 """
801 return self._config_data.cluster.file_storage_dir
802
803 @locking.ssynchronized(_config_lock, shared=1)
805 """Get the hypervisor type for this cluster.
806
807 """
808 return self._config_data.cluster.enabled_hypervisors[0]
809
810 @locking.ssynchronized(_config_lock, shared=1)
812 """Return the rsa hostkey from the config.
813
814 @rtype: string
815 @return: the rsa hostkey
816
817 """
818 return self._config_data.cluster.rsahostkeypub
819
820 @locking.ssynchronized(_config_lock, shared=1)
822 """Get the default instance allocator for this cluster.
823
824 """
825 return self._config_data.cluster.default_iallocator
826
827 @locking.ssynchronized(_config_lock)
859
861 """Ensures a given object has a valid UUID.
862
863 @param item: the instance or node to be checked
864 @param ec_id: the execution context id for the uuid reservation
865
866 """
867 if not item.uuid:
868 item.uuid = self._GenerateUniqueID(ec_id)
869 elif item.uuid in self._AllIDs(include_temporary=True):
870 raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
871 " in use" % (item.name, item.uuid))
872
874 """Set the instance's status to a given value.
875
876 """
877 assert isinstance(status, bool), \
878 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
879
880 if instance_name not in self._config_data.instances:
881 raise errors.ConfigurationError("Unknown instance '%s'" %
882 instance_name)
883 instance = self._config_data.instances[instance_name]
884 if instance.admin_up != status:
885 instance.admin_up = status
886 instance.serial_no += 1
887 instance.mtime = time.time()
888 self._WriteConfig()
889
890 @locking.ssynchronized(_config_lock)
892 """Mark the instance status to up in the config.
893
894 """
895 self._SetInstanceStatus(instance_name, True)
896
897 @locking.ssynchronized(_config_lock)
899 """Remove the instance from the configuration.
900
901 """
902 if instance_name not in self._config_data.instances:
903 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
904 del self._config_data.instances[instance_name]
905 self._config_data.cluster.serial_no += 1
906 self._WriteConfig()
907
908 @locking.ssynchronized(_config_lock)
910 """Rename an instance.
911
912 This needs to be done in ConfigWriter and not by RemoveInstance
913 combined with AddInstance as only we can guarantee an atomic
914 rename.
915
916 """
917 if old_name not in self._config_data.instances:
918 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
919 inst = self._config_data.instances[old_name]
920 del self._config_data.instances[old_name]
921 inst.name = new_name
922
923 for disk in inst.disks:
924 if disk.dev_type == constants.LD_FILE:
925
926 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
927 disk_fname = "disk%s" % disk.iv_name.split("/")[1]
928 disk.physical_id = disk.logical_id = (disk.logical_id[0],
929 utils.PathJoin(file_storage_dir,
930 inst.name,
931 disk_fname))
932
933 self._config_data.instances[inst.name] = inst
934 self._WriteConfig()
935
936 @locking.ssynchronized(_config_lock)
938 """Mark the status of an instance to down in the configuration.
939
940 """
941 self._SetInstanceStatus(instance_name, False)
942
944 """Get the list of instances.
945
946 This function is for internal use, when the config lock is already held.
947
948 """
949 return self._config_data.instances.keys()
950
951 @locking.ssynchronized(_config_lock, shared=1)
953 """Get the list of instances.
954
955 @return: array of instances, ex. ['instance2.example.com',
956 'instance1.example.com']
957
958 """
959 return self._UnlockedGetInstanceList()
960
961 @locking.ssynchronized(_config_lock, shared=1)
963 """Attempt to expand an incomplete instance name.
964
965 """
966 return utils.MatchNameComponent(short_name,
967 self._config_data.instances.keys(),
968 case_sensitive=False)
969
971 """Returns information about an instance.
972
973 This function is for internal use, when the config lock is already held.
974
975 """
976 if instance_name not in self._config_data.instances:
977 return None
978
979 return self._config_data.instances[instance_name]
980
981 @locking.ssynchronized(_config_lock, shared=1)
983 """Returns information about an instance.
984
985 It takes the information from the configuration file. Other information of
986 an instance are taken from the live systems.
987
988 @param instance_name: name of the instance, e.g.
989 I{instance1.example.com}
990
991 @rtype: L{objects.Instance}
992 @return: the instance object
993
994 """
995 return self._UnlockedGetInstanceInfo(instance_name)
996
997 @locking.ssynchronized(_config_lock, shared=1)
1009
1010 @locking.ssynchronized(_config_lock)
1012 """Add a node to the configuration.
1013
1014 @type node: L{objects.Node}
1015 @param node: a Node instance
1016
1017 """
1018 logging.info("Adding node %s to configuration", node.name)
1019
1020 self._EnsureUUID(node, ec_id)
1021
1022 node.serial_no = 1
1023 node.ctime = node.mtime = time.time()
1024 self._config_data.nodes[node.name] = node
1025 self._config_data.cluster.serial_no += 1
1026 self._WriteConfig()
1027
1028 @locking.ssynchronized(_config_lock)
1030 """Remove a node from the configuration.
1031
1032 """
1033 logging.info("Removing node %s from configuration", node_name)
1034
1035 if node_name not in self._config_data.nodes:
1036 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1037
1038 del self._config_data.nodes[node_name]
1039 self._config_data.cluster.serial_no += 1
1040 self._WriteConfig()
1041
1042 @locking.ssynchronized(_config_lock, shared=1)
1044 """Attempt to expand an incomplete instance name.
1045
1046 """
1047 return utils.MatchNameComponent(short_name,
1048 self._config_data.nodes.keys(),
1049 case_sensitive=False)
1050
1052 """Get the configuration of a node, as stored in the config.
1053
1054 This function is for internal use, when the config lock is already
1055 held.
1056
1057 @param node_name: the node name, e.g. I{node1.example.com}
1058
1059 @rtype: L{objects.Node}
1060 @return: the node object
1061
1062 """
1063 if node_name not in self._config_data.nodes:
1064 return None
1065
1066 return self._config_data.nodes[node_name]
1067
1068 @locking.ssynchronized(_config_lock, shared=1)
1070 """Get the configuration of a node, as stored in the config.
1071
1072 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1073
1074 @param node_name: the node name, e.g. I{node1.example.com}
1075
1076 @rtype: L{objects.Node}
1077 @return: the node object
1078
1079 """
1080 return self._UnlockedGetNodeInfo(node_name)
1081
1083 """Return the list of nodes which are in the configuration.
1084
1085 This function is for internal use, when the config lock is already
1086 held.
1087
1088 @rtype: list
1089
1090 """
1091 return self._config_data.nodes.keys()
1092
1093 @locking.ssynchronized(_config_lock, shared=1)
1095 """Return the list of nodes which are in the configuration.
1096
1097 """
1098 return self._UnlockedGetNodeList()
1099
1107
1108 @locking.ssynchronized(_config_lock, shared=1)
1114
1115 @locking.ssynchronized(_config_lock, shared=1)
1117 """Get the configuration of all nodes.
1118
1119 @rtype: dict
1120 @return: dict of (node, node_info), where node_info is what
1121 would GetNodeInfo return for the node
1122
1123 """
1124 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1125 for node in self._UnlockedGetNodeList()])
1126 return my_dict
1127
1129 """Get the number of current and maximum desired and possible candidates.
1130
1131 @type exceptions: list
1132 @param exceptions: if passed, list of nodes that should be ignored
1133 @rtype: tuple
1134 @return: tuple of (current, desired and possible, possible)
1135
1136 """
1137 mc_now = mc_should = mc_max = 0
1138 for node in self._config_data.nodes.values():
1139 if exceptions and node.name in exceptions:
1140 continue
1141 if not (node.offline or node.drained):
1142 mc_max += 1
1143 if node.master_candidate:
1144 mc_now += 1
1145 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1146 return (mc_now, mc_should, mc_max)
1147
1148 @locking.ssynchronized(_config_lock, shared=1)
1150 """Get the number of current and maximum possible candidates.
1151
1152 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1153
1154 @type exceptions: list
1155 @param exceptions: if passed, list of nodes that should be ignored
1156 @rtype: tuple
1157 @return: tuple of (current, max)
1158
1159 """
1160 return self._UnlockedGetMasterCandidateStats(exceptions)
1161
1162 @locking.ssynchronized(_config_lock)
1163 - def MaintainCandidatePool(self, exceptions):
1164 """Try to grow the candidate pool to the desired size.
1165
1166 @type exceptions: list
1167 @param exceptions: if passed, list of nodes that should be ignored
1168 @rtype: list
1169 @return: list with the adjusted nodes (L{objects.Node} instances)
1170
1171 """
1172 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1173 mod_list = []
1174 if mc_now < mc_max:
1175 node_list = self._config_data.nodes.keys()
1176 random.shuffle(node_list)
1177 for name in node_list:
1178 if mc_now >= mc_max:
1179 break
1180 node = self._config_data.nodes[name]
1181 if (node.master_candidate or node.offline or node.drained or
1182 node.name in exceptions):
1183 continue
1184 mod_list.append(node)
1185 node.master_candidate = True
1186 node.serial_no += 1
1187 mc_now += 1
1188 if mc_now != mc_max:
1189
1190 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1191 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1192 if mod_list:
1193 self._config_data.cluster.serial_no += 1
1194 self._WriteConfig()
1195
1196 return mod_list
1197
1199 """Bump up the serial number of the config.
1200
1201 """
1202 self._config_data.serial_no += 1
1203 self._config_data.mtime = time.time()
1204
1206 """Returns all objects with uuid attributes.
1207
1208 """
1209 return (self._config_data.instances.values() +
1210 self._config_data.nodes.values() +
1211 [self._config_data.cluster])
1212
1242
1244 """Run upgrade steps that cannot be done purely in the objects.
1245
1246 This is because some data elements need uniqueness across the
1247 whole configuration, etc.
1248
1249 @warning: this function will call L{_WriteConfig()}, but also
1250 L{DropECReservations} so it needs to be called only from a
1251 "safe" place (the constructor). If one wanted to call it with
1252 the lock held, a DropECReservationUnlocked would need to be
1253 created first, to avoid causing deadlock.
1254
1255 """
1256 modified = False
1257 for item in self._AllUUIDObjects():
1258 if item.uuid is None:
1259 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1260 modified = True
1261 if modified:
1262 self._WriteConfig()
1263
1264
1265 self.DropECReservations(_UPGRADE_CONFIG_JID)
1266
1268 """Distribute the configuration to the other nodes.
1269
1270 Currently, this only copies the configuration file. In the future,
1271 it could be used to encapsulate the 2/3-phase update mechanism.
1272
1273 """
1274 if self._offline:
1275 return True
1276
1277 bad = False
1278
1279 node_list = []
1280 addr_list = []
1281 myhostname = self._my_hostname
1282
1283
1284
1285
1286 for node_name in self._UnlockedGetNodeList():
1287 if node_name == myhostname:
1288 continue
1289 node_info = self._UnlockedGetNodeInfo(node_name)
1290 if not node_info.master_candidate:
1291 continue
1292 node_list.append(node_info.name)
1293 addr_list.append(node_info.primary_ip)
1294
1295 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1296 address_list=addr_list)
1297 for to_node, to_result in result.items():
1298 msg = to_result.fail_msg
1299 if msg:
1300 msg = ("Copy of file %s to node %s failed: %s" %
1301 (self._cfg_file, to_node, msg))
1302 logging.error(msg)
1303
1304 if feedback_fn:
1305 feedback_fn(msg)
1306
1307 bad = True
1308
1309 return not bad
1310
1311 - def _WriteConfig(self, destination=None, feedback_fn=None):
1312 """Write the configuration data to persistent storage.
1313
1314 """
1315 assert feedback_fn is None or callable(feedback_fn)
1316
1317
1318
1319
1320
1321 config_errors = self._UnlockedVerifyConfig()
1322 if config_errors:
1323 errmsg = ("Configuration data is not consistent: %s" %
1324 (utils.CommaJoin(config_errors)))
1325 logging.critical(errmsg)
1326 if feedback_fn:
1327 feedback_fn(errmsg)
1328
1329 if destination is None:
1330 destination = self._cfg_file
1331 self._BumpSerialNo()
1332 txt = serializer.Dump(self._config_data.ToDict())
1333
1334 utils.WriteFile(destination, data=txt)
1335
1336 self.write_count += 1
1337
1338
1339 self._DistributeConfig(feedback_fn)
1340
1341
1342 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1343 if not self._offline:
1344 result = rpc.RpcRunner.call_write_ssconf_files(
1345 self._UnlockedGetOnlineNodeList(),
1346 self._UnlockedGetSsconfValues())
1347
1348 for nname, nresu in result.items():
1349 msg = nresu.fail_msg
1350 if msg:
1351 errmsg = ("Error while uploading ssconf files to"
1352 " node %s: %s" % (nname, msg))
1353 logging.warning(errmsg)
1354
1355 if feedback_fn:
1356 feedback_fn(errmsg)
1357
1358 self._last_cluster_serial = self._config_data.cluster.serial_no
1359
1361 """Return the values needed by ssconf.
1362
1363 @rtype: dict
1364 @return: a dictionary with keys the ssconf names and values their
1365 associated value
1366
1367 """
1368 fn = "\n".join
1369 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1370 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1371 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1372 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1373 for ninfo in node_info]
1374 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1375 for ninfo in node_info]
1376
1377 instance_data = fn(instance_names)
1378 off_data = fn(node.name for node in node_info if node.offline)
1379 on_data = fn(node.name for node in node_info if not node.offline)
1380 mc_data = fn(node.name for node in node_info if node.master_candidate)
1381 mc_ips_data = fn(node.primary_ip for node in node_info
1382 if node.master_candidate)
1383 node_data = fn(node_names)
1384 node_pri_ips_data = fn(node_pri_ips)
1385 node_snd_ips_data = fn(node_snd_ips)
1386
1387 cluster = self._config_data.cluster
1388 cluster_tags = fn(cluster.GetTags())
1389
1390 hypervisor_list = fn(cluster.enabled_hypervisors)
1391
1392 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1393
1394 return {
1395 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1396 constants.SS_CLUSTER_TAGS: cluster_tags,
1397 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1398 constants.SS_MASTER_CANDIDATES: mc_data,
1399 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1400 constants.SS_MASTER_IP: cluster.master_ip,
1401 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1402 constants.SS_MASTER_NODE: cluster.master_node,
1403 constants.SS_NODE_LIST: node_data,
1404 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1405 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1406 constants.SS_OFFLINE_NODES: off_data,
1407 constants.SS_ONLINE_NODES: on_data,
1408 constants.SS_INSTANCE_LIST: instance_data,
1409 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1410 constants.SS_HYPERVISOR_LIST: hypervisor_list,
1411 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1412 constants.SS_UID_POOL: uid_pool,
1413 }
1414
1415 @locking.ssynchronized(_config_lock, shared=1)
1417 """Return the volume group name.
1418
1419 """
1420 return self._config_data.cluster.volume_group_name
1421
1422 @locking.ssynchronized(_config_lock)
1424 """Set the volume group name.
1425
1426 """
1427 self._config_data.cluster.volume_group_name = vg_name
1428 self._config_data.cluster.serial_no += 1
1429 self._WriteConfig()
1430
1431 @locking.ssynchronized(_config_lock, shared=1)
1433 """Return DRBD usermode helper.
1434
1435 """
1436 return self._config_data.cluster.drbd_usermode_helper
1437
1438 @locking.ssynchronized(_config_lock)
1440 """Set DRBD usermode helper.
1441
1442 """
1443 self._config_data.cluster.drbd_usermode_helper = drbd_helper
1444 self._config_data.cluster.serial_no += 1
1445 self._WriteConfig()
1446
1447 @locking.ssynchronized(_config_lock, shared=1)
1449 """Return the mac prefix.
1450
1451 """
1452 return self._config_data.cluster.mac_prefix
1453
1454 @locking.ssynchronized(_config_lock, shared=1)
1456 """Returns information about the cluster
1457
1458 @rtype: L{objects.Cluster}
1459 @return: the cluster object
1460
1461 """
1462 return self._config_data.cluster
1463
1464 @locking.ssynchronized(_config_lock, shared=1)
1466 """Check if in there is at disk of the given type in the configuration.
1467
1468 """
1469 return self._config_data.HasAnyDiskOfType(dev_type)
1470
1471 @locking.ssynchronized(_config_lock)
1472 - def Update(self, target, feedback_fn):
1473 """Notify function to be called after updates.
1474
1475 This function must be called when an object (as returned by
1476 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1477 caller wants the modifications saved to the backing store. Note
1478 that all modified objects will be saved, but the target argument
1479 is the one the caller wants to ensure that it's saved.
1480
1481 @param target: an instance of either L{objects.Cluster},
1482 L{objects.Node} or L{objects.Instance} which is existing in
1483 the cluster
1484 @param feedback_fn: Callable feedback function
1485
1486 """
1487 if self._config_data is None:
1488 raise errors.ProgrammerError("Configuration file not read,"
1489 " cannot save.")
1490 update_serial = False
1491 if isinstance(target, objects.Cluster):
1492 test = target == self._config_data.cluster
1493 elif isinstance(target, objects.Node):
1494 test = target in self._config_data.nodes.values()
1495 update_serial = True
1496 elif isinstance(target, objects.Instance):
1497 test = target in self._config_data.instances.values()
1498 else:
1499 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1500 " ConfigWriter.Update" % type(target))
1501 if not test:
1502 raise errors.ConfigurationError("Configuration updated since object"
1503 " has been read or unknown object")
1504 target.serial_no += 1
1505 target.mtime = now = time.time()
1506
1507 if update_serial:
1508
1509 self._config_data.cluster.serial_no += 1
1510 self._config_data.cluster.mtime = now
1511
1512 if isinstance(target, objects.Instance):
1513 self._UnlockedReleaseDRBDMinors(target.name)
1514
1515 self._WriteConfig(feedback_fn=feedback_fn)
1516
1517 @locking.ssynchronized(_config_lock)
1519 """Drop per-execution-context reservations
1520
1521 """
1522 for rm in self._all_rms:
1523 rm.DropECReservations(ec_id)
1524