Package ganeti :: Module bootstrap
[hide private]
[frames] | no frames]

Source Code for Module ganeti.bootstrap

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Functions to bootstrap a new cluster. 
  32   
  33  """ 
  34   
  35  import os 
  36  import os.path 
  37  import re 
  38  import logging 
  39  import time 
  40  import tempfile 
  41   
  42  from ganeti.cmdlib import cluster 
  43  import ganeti.rpc.node as rpc 
  44  from ganeti import ssh 
  45  from ganeti import utils 
  46  from ganeti import errors 
  47  from ganeti import config 
  48  from ganeti import constants 
  49  from ganeti import objects 
  50  from ganeti import ssconf 
  51  from ganeti import serializer 
  52  from ganeti import hypervisor 
  53  from ganeti.storage import drbd 
  54  from ganeti.storage import filestorage 
  55  from ganeti import netutils 
  56  from ganeti import luxi 
  57  from ganeti import jstore 
  58  from ganeti import pathutils 
  59  from ganeti import runtime 
  60  from ganeti import vcluster 
  61   
  62   
  63  # ec_id for InitConfig's temporary reservation manager 
  64  _INITCONF_ECID = "initconfig-ecid" 
  65   
  66  #: After how many seconds daemon must be responsive 
  67  _DAEMON_READY_TIMEOUT = 10.0 
  68   
  69   
70 -def _InitSSHSetup():
71 """Setup the SSH configuration for the cluster. 72 73 This generates a dsa keypair for root, adds the pub key to the 74 permitted hosts and adds the hostkey to its own known hosts. 75 76 """ 77 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER) 78 79 for name in priv_key, pub_key: 80 if os.path.exists(name): 81 utils.CreateBackup(name) 82 utils.RemoveFile(name) 83 84 result = utils.RunCmd(["ssh-keygen", "-t", "dsa", 85 "-f", priv_key, 86 "-q", "-N", ""]) 87 if result.failed: 88 raise errors.OpExecError("Could not generate ssh keypair, error %s" % 89 result.output) 90 91 utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
92 93
94 -def GenerateHmacKey(file_name):
95 """Writes a new HMAC key. 96 97 @type file_name: str 98 @param file_name: Path to output file 99 100 """ 101 utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400, 102 backup=True)
103 104 105 # pylint: disable=R0913
106 -def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_spice_cert, 107 new_confd_hmac_key, new_cds, new_client_cert, 108 master_name, 109 rapi_cert_pem=None, spice_cert_pem=None, 110 spice_cacert_pem=None, cds=None, 111 nodecert_file=pathutils.NODED_CERT_FILE, 112 clientcert_file=pathutils.NODED_CLIENT_CERT_FILE, 113 rapicert_file=pathutils.RAPI_CERT_FILE, 114 spicecert_file=pathutils.SPICE_CERT_FILE, 115 spicecacert_file=pathutils.SPICE_CACERT_FILE, 116 hmackey_file=pathutils.CONFD_HMAC_KEY, 117 cds_file=pathutils.CLUSTER_DOMAIN_SECRET_FILE):
118 """Updates the cluster certificates, keys and secrets. 119 120 @type new_cluster_cert: bool 121 @param new_cluster_cert: Whether to generate a new cluster certificate 122 @type new_rapi_cert: bool 123 @param new_rapi_cert: Whether to generate a new RAPI certificate 124 @type new_spice_cert: bool 125 @param new_spice_cert: Whether to generate a new SPICE certificate 126 @type new_confd_hmac_key: bool 127 @param new_confd_hmac_key: Whether to generate a new HMAC key 128 @type new_cds: bool 129 @param new_cds: Whether to generate a new cluster domain secret 130 @type new_client_cert: bool 131 @param new_client_cert: Whether to generate a new client certificate 132 @type master_name: string 133 @param master_name: FQDN of the master node 134 @type rapi_cert_pem: string 135 @param rapi_cert_pem: New RAPI certificate in PEM format 136 @type spice_cert_pem: string 137 @param spice_cert_pem: New SPICE certificate in PEM format 138 @type spice_cacert_pem: string 139 @param spice_cacert_pem: Certificate of the CA that signed the SPICE 140 certificate, in PEM format 141 @type cds: string 142 @param cds: New cluster domain secret 143 @type nodecert_file: string 144 @param nodecert_file: optional override of the node cert file path 145 @type rapicert_file: string 146 @param rapicert_file: optional override of the rapi cert file path 147 @type spicecert_file: string 148 @param spicecert_file: optional override of the spice cert file path 149 @type spicecacert_file: string 150 @param spicecacert_file: optional override of the spice CA cert file path 151 @type hmackey_file: string 152 @param hmackey_file: optional override of the hmac key file path 153 154 """ 155 # pylint: disable=R0913 156 # noded SSL certificate 157 utils.GenerateNewSslCert( 158 new_cluster_cert, nodecert_file, 1, 159 "Generating new cluster certificate at %s" % nodecert_file) 160 161 # If the cluster certificate was renewed, the client cert has to be 162 # renewed and resigned. 163 if new_cluster_cert or new_client_cert: 164 utils.GenerateNewClientSslCert(clientcert_file, nodecert_file, 165 master_name) 166 167 # confd HMAC key 168 if new_confd_hmac_key or not os.path.exists(hmackey_file): 169 logging.debug("Writing new confd HMAC key to %s", hmackey_file) 170 GenerateHmacKey(hmackey_file) 171 172 if rapi_cert_pem: 173 # Assume rapi_pem contains a valid PEM-formatted certificate and key 174 logging.debug("Writing RAPI certificate at %s", rapicert_file) 175 utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True) 176 177 else: 178 utils.GenerateNewSslCert( 179 new_rapi_cert, rapicert_file, 1, 180 "Generating new RAPI certificate at %s" % rapicert_file) 181 182 # SPICE 183 spice_cert_exists = os.path.exists(spicecert_file) 184 spice_cacert_exists = os.path.exists(spicecacert_file) 185 if spice_cert_pem: 186 # spice_cert_pem implies also spice_cacert_pem 187 logging.debug("Writing SPICE certificate at %s", spicecert_file) 188 utils.WriteFile(spicecert_file, data=spice_cert_pem, backup=True) 189 logging.debug("Writing SPICE CA certificate at %s", spicecacert_file) 190 utils.WriteFile(spicecacert_file, data=spice_cacert_pem, backup=True) 191 elif new_spice_cert or not spice_cert_exists: 192 if spice_cert_exists: 193 utils.CreateBackup(spicecert_file) 194 if spice_cacert_exists: 195 utils.CreateBackup(spicecacert_file) 196 197 logging.debug("Generating new self-signed SPICE certificate at %s", 198 spicecert_file) 199 (_, cert_pem) = utils.GenerateSelfSignedSslCert(spicecert_file, 1) 200 201 # Self-signed certificate -> the public certificate is also the CA public 202 # certificate 203 logging.debug("Writing the public certificate to %s", 204 spicecert_file) 205 utils.io.WriteFile(spicecacert_file, mode=0400, data=cert_pem) 206 207 # Cluster domain secret 208 if cds: 209 logging.debug("Writing cluster domain secret to %s", cds_file) 210 utils.WriteFile(cds_file, data=cds, backup=True) 211 212 elif new_cds or not os.path.exists(cds_file): 213 logging.debug("Generating new cluster domain secret at %s", cds_file) 214 GenerateHmacKey(cds_file)
215 216
217 -def _InitGanetiServerSetup(master_name, cfg):
218 """Setup the necessary configuration for the initial node daemon. 219 220 This creates the nodepass file containing the shared password for 221 the cluster, generates the SSL certificate and starts the node daemon. 222 223 @type master_name: str 224 @param master_name: Name of the master node 225 @type cfg: ConfigWriter 226 @param cfg: the configuration writer 227 228 """ 229 # Generate cluster secrets 230 GenerateClusterCrypto(True, False, False, False, False, False, master_name) 231 232 # Add the master's SSL certificate digest to the configuration. 233 master_uuid = cfg.GetMasterNode() 234 master_digest = utils.GetCertificateDigest() 235 cfg.AddNodeToCandidateCerts(master_uuid, master_digest) 236 cfg.Update(cfg.GetClusterInfo(), logging.error) 237 ssconf.WriteSsconfFiles(cfg.GetSsconfValues()) 238 239 if not os.path.exists( 240 os.path.join(pathutils.DATA_DIR, 241 "%s%s" % (constants.SSCONF_FILEPREFIX, 242 constants.SS_MASTER_CANDIDATES_CERTS))): 243 raise errors.OpExecError("Ssconf file for master candidate certificates" 244 " was not written.") 245 246 if not os.path.exists(pathutils.NODED_CERT_FILE): 247 raise errors.OpExecError("The server certficate was not created properly.") 248 249 if not os.path.exists(pathutils.NODED_CLIENT_CERT_FILE): 250 raise errors.OpExecError("The client certificate was not created" 251 " properly.") 252 253 # set up the inter-node password and certificate 254 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start", constants.NODED]) 255 if result.failed: 256 raise errors.OpExecError("Could not start the node daemon, command %s" 257 " had exitcode %s and error %s" % 258 (result.cmd, result.exit_code, result.output)) 259 260 _WaitForNodeDaemon(master_name)
261 262
263 -def _WaitForNodeDaemon(node_name):
264 """Wait for node daemon to become responsive. 265 266 """ 267 def _CheckNodeDaemon(): 268 # Pylint bug <http://www.logilab.org/ticket/35642> 269 # pylint: disable=E1101 270 result = rpc.BootstrapRunner().call_version([node_name])[node_name] 271 if result.fail_msg: 272 raise utils.RetryAgain()
273 274 try: 275 utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT) 276 except utils.RetryTimeout: 277 raise errors.OpExecError("Node daemon on %s didn't answer queries within" 278 " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT)) 279 280
281 -def _WaitForMasterDaemon():
282 """Wait for master daemon to become responsive. 283 284 """ 285 def _CheckMasterDaemon(): 286 try: 287 cl = luxi.Client() 288 (cluster_name, ) = cl.QueryConfigValues(["cluster_name"]) 289 except Exception: 290 raise utils.RetryAgain() 291 292 logging.debug("Received cluster name %s from master", cluster_name)
293 294 try: 295 utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT) 296 except utils.RetryTimeout: 297 raise errors.OpExecError("Master daemon didn't answer queries within" 298 " %s seconds" % _DAEMON_READY_TIMEOUT) 299 300
301 -def _WaitForSshDaemon(hostname, port):
302 """Wait for SSH daemon to become responsive. 303 304 """ 305 family = ssconf.SimpleStore().GetPrimaryIPFamily() 306 hostip = netutils.GetHostname(name=hostname, family=family).ip 307 308 def _CheckSshDaemon(): 309 if netutils.TcpPing(hostip, port, timeout=1.0, live_port_needed=True): 310 logging.debug("SSH daemon on %s:%s (IP address %s) has become" 311 " responsive", hostname, port, hostip) 312 else: 313 raise utils.RetryAgain()
314 315 try: 316 utils.Retry(_CheckSshDaemon, 1.0, _DAEMON_READY_TIMEOUT) 317 except utils.RetryTimeout: 318 raise errors.OpExecError("SSH daemon on %s:%s (IP address %s) didn't" 319 " become responsive within %s seconds" % 320 (hostname, port, hostip, _DAEMON_READY_TIMEOUT)) 321 322
323 -def RunNodeSetupCmd(cluster_name, node, basecmd, debug, verbose, 324 use_cluster_key, ask_key, strict_host_check, 325 port, data):
326 """Runs a command to configure something on a remote machine. 327 328 @type cluster_name: string 329 @param cluster_name: Cluster name 330 @type node: string 331 @param node: Node name 332 @type basecmd: string 333 @param basecmd: Base command (path on the remote machine) 334 @type debug: bool 335 @param debug: Enable debug output 336 @type verbose: bool 337 @param verbose: Enable verbose output 338 @type use_cluster_key: bool 339 @param use_cluster_key: See L{ssh.SshRunner.BuildCmd} 340 @type ask_key: bool 341 @param ask_key: See L{ssh.SshRunner.BuildCmd} 342 @type strict_host_check: bool 343 @param strict_host_check: See L{ssh.SshRunner.BuildCmd} 344 @type port: int 345 @param port: The SSH port of the remote machine or None for the default 346 @param data: JSON-serializable input data for script (passed to stdin) 347 348 """ 349 cmd = [basecmd] 350 351 # Pass --debug/--verbose to the external script if set on our invocation 352 if debug: 353 cmd.append("--debug") 354 355 if verbose: 356 cmd.append("--verbose") 357 358 logging.debug("Node setup command: %s", cmd) 359 360 version = constants.DIR_VERSION 361 all_cmds = [["test", "-d", os.path.join(pathutils.PKGLIBDIR, version)]] 362 if constants.HAS_GNU_LN: 363 all_cmds.extend([["ln", "-s", "-f", "-T", 364 os.path.join(pathutils.PKGLIBDIR, version), 365 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")], 366 ["ln", "-s", "-f", "-T", 367 os.path.join(pathutils.SHAREDIR, version), 368 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]]) 369 else: 370 all_cmds.extend([["rm", "-f", 371 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")], 372 ["ln", "-s", "-f", 373 os.path.join(pathutils.PKGLIBDIR, version), 374 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")], 375 ["rm", "-f", 376 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")], 377 ["ln", "-s", "-f", 378 os.path.join(pathutils.SHAREDIR, version), 379 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]]) 380 all_cmds.append(cmd) 381 382 if port is None: 383 port = netutils.GetDaemonPort(constants.SSH) 384 385 srun = ssh.SshRunner(cluster_name) 386 scmd = srun.BuildCmd(node, constants.SSH_LOGIN_USER, 387 utils.ShellQuoteArgs( 388 utils.ShellCombineCommands(all_cmds)), 389 batch=False, ask_key=ask_key, quiet=False, 390 strict_host_check=strict_host_check, 391 use_cluster_key=use_cluster_key, 392 port=port) 393 394 tempfh = tempfile.TemporaryFile() 395 try: 396 tempfh.write(serializer.DumpJson(data)) 397 tempfh.seek(0) 398 399 result = utils.RunCmd(scmd, interactive=True, input_fd=tempfh) 400 finally: 401 tempfh.close() 402 403 if result.failed: 404 raise errors.OpExecError("Command '%s' failed: %s" % 405 (result.cmd, result.fail_reason)) 406 407 _WaitForSshDaemon(node, port)
408 409
410 -def _InitFileStorageDir(file_storage_dir):
411 """Initialize if needed the file storage. 412 413 @param file_storage_dir: the user-supplied value 414 @return: either empty string (if file storage was disabled at build 415 time) or the normalized path to the storage directory 416 417 """ 418 file_storage_dir = os.path.normpath(file_storage_dir) 419 420 if not os.path.isabs(file_storage_dir): 421 raise errors.OpPrereqError("File storage directory '%s' is not an absolute" 422 " path" % file_storage_dir, errors.ECODE_INVAL) 423 424 if not os.path.exists(file_storage_dir): 425 try: 426 os.makedirs(file_storage_dir, 0750) 427 except OSError, err: 428 raise errors.OpPrereqError("Cannot create file storage directory" 429 " '%s': %s" % (file_storage_dir, err), 430 errors.ECODE_ENVIRON) 431 432 if not os.path.isdir(file_storage_dir): 433 raise errors.OpPrereqError("The file storage directory '%s' is not" 434 " a directory." % file_storage_dir, 435 errors.ECODE_ENVIRON) 436 437 return file_storage_dir
438 439
440 -def _PrepareFileBasedStorage( 441 enabled_disk_templates, file_storage_dir, 442 default_dir, file_disk_template, _storage_path_acceptance_fn, 443 init_fn=_InitFileStorageDir, acceptance_fn=None):
444 """Checks if a file-base storage type is enabled and inits the dir. 445 446 @type enabled_disk_templates: list of string 447 @param enabled_disk_templates: list of enabled disk templates 448 @type file_storage_dir: string 449 @param file_storage_dir: the file storage directory 450 @type default_dir: string 451 @param default_dir: default file storage directory when C{file_storage_dir} 452 is 'None' 453 @type file_disk_template: string 454 @param file_disk_template: a disk template whose storage type is 'ST_FILE', 455 'ST_SHARED_FILE' or 'ST_GLUSTER' 456 @type _storage_path_acceptance_fn: function 457 @param _storage_path_acceptance_fn: checks whether the given file-based 458 storage directory is acceptable 459 @see: C{cluster.CheckFileBasedStoragePathVsEnabledDiskTemplates} for details 460 461 @rtype: string 462 @returns: the name of the actual file storage directory 463 464 """ 465 assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes( 466 constants.ST_FILE, constants.ST_SHARED_FILE, constants.ST_GLUSTER 467 )) 468 469 if file_storage_dir is None: 470 file_storage_dir = default_dir 471 if not acceptance_fn: 472 acceptance_fn = \ 473 lambda path: filestorage.CheckFileStoragePathAcceptance( 474 path, exact_match_ok=True) 475 476 _storage_path_acceptance_fn(logging.warning, file_storage_dir, 477 enabled_disk_templates) 478 479 file_storage_enabled = file_disk_template in enabled_disk_templates 480 if file_storage_enabled: 481 try: 482 acceptance_fn(file_storage_dir) 483 except errors.FileStoragePathError as e: 484 raise errors.OpPrereqError(str(e)) 485 result_file_storage_dir = init_fn(file_storage_dir) 486 else: 487 result_file_storage_dir = file_storage_dir 488 return result_file_storage_dir
489 490
491 -def _PrepareFileStorage( 492 enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir, 493 acceptance_fn=None):
494 """Checks if file storage is enabled and inits the dir. 495 496 @see: C{_PrepareFileBasedStorage} 497 498 """ 499 return _PrepareFileBasedStorage( 500 enabled_disk_templates, file_storage_dir, 501 pathutils.DEFAULT_FILE_STORAGE_DIR, constants.DT_FILE, 502 cluster.CheckFileStoragePathVsEnabledDiskTemplates, 503 init_fn=init_fn, acceptance_fn=acceptance_fn)
504 505
506 -def _PrepareSharedFileStorage( 507 enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir, 508 acceptance_fn=None):
509 """Checks if shared file storage is enabled and inits the dir. 510 511 @see: C{_PrepareFileBasedStorage} 512 513 """ 514 return _PrepareFileBasedStorage( 515 enabled_disk_templates, file_storage_dir, 516 pathutils.DEFAULT_SHARED_FILE_STORAGE_DIR, constants.DT_SHARED_FILE, 517 cluster.CheckSharedFileStoragePathVsEnabledDiskTemplates, 518 init_fn=init_fn, acceptance_fn=acceptance_fn)
519 520
521 -def _PrepareGlusterStorage( 522 enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir, 523 acceptance_fn=None):
524 """Checks if gluster storage is enabled and inits the dir. 525 526 @see: C{_PrepareFileBasedStorage} 527 528 """ 529 return _PrepareFileBasedStorage( 530 enabled_disk_templates, file_storage_dir, 531 pathutils.DEFAULT_GLUSTER_STORAGE_DIR, constants.DT_GLUSTER, 532 cluster.CheckGlusterStoragePathVsEnabledDiskTemplates, 533 init_fn=init_fn, acceptance_fn=acceptance_fn)
534 535
536 -def _InitCheckEnabledDiskTemplates(enabled_disk_templates):
537 """Checks the sanity of the enabled disk templates. 538 539 """ 540 if not enabled_disk_templates: 541 raise errors.OpPrereqError("Enabled disk templates list must contain at" 542 " least one member", errors.ECODE_INVAL) 543 invalid_disk_templates = \ 544 set(enabled_disk_templates) - constants.DISK_TEMPLATES 545 if invalid_disk_templates: 546 raise errors.OpPrereqError("Enabled disk templates list contains invalid" 547 " entries: %s" % invalid_disk_templates, 548 errors.ECODE_INVAL)
549 550
551 -def _RestrictIpolicyToEnabledDiskTemplates(ipolicy, enabled_disk_templates):
552 """Restricts the ipolicy's disk templates to the enabled ones. 553 554 This function clears the ipolicy's list of allowed disk templates from the 555 ones that are not enabled by the cluster. 556 557 @type ipolicy: dict 558 @param ipolicy: the instance policy 559 @type enabled_disk_templates: list of string 560 @param enabled_disk_templates: the list of cluster-wide enabled disk 561 templates 562 563 """ 564 assert constants.IPOLICY_DTS in ipolicy 565 allowed_disk_templates = ipolicy[constants.IPOLICY_DTS] 566 restricted_disk_templates = list(set(allowed_disk_templates) 567 .intersection(set(enabled_disk_templates))) 568 ipolicy[constants.IPOLICY_DTS] = restricted_disk_templates
569 570
571 -def _InitCheckDrbdHelper(drbd_helper, drbd_enabled):
572 """Checks the DRBD usermode helper. 573 574 @type drbd_helper: string 575 @param drbd_helper: name of the DRBD usermode helper that the system should 576 use 577 578 """ 579 if not drbd_enabled: 580 return 581 582 if drbd_helper is not None: 583 try: 584 curr_helper = drbd.DRBD8.GetUsermodeHelper() 585 except errors.BlockDeviceError, err: 586 raise errors.OpPrereqError("Error while checking drbd helper" 587 " (disable drbd with --enabled-disk-templates" 588 " if you are not using drbd): %s" % str(err), 589 errors.ECODE_ENVIRON) 590 if drbd_helper != curr_helper: 591 raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s" 592 " is the current helper" % (drbd_helper, 593 curr_helper), 594 errors.ECODE_INVAL)
595 596
597 -def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914 598 master_netmask, master_netdev, file_storage_dir, 599 shared_file_storage_dir, gluster_storage_dir, 600 candidate_pool_size, secondary_ip=None, 601 vg_name=None, beparams=None, nicparams=None, ndparams=None, 602 hvparams=None, diskparams=None, enabled_hypervisors=None, 603 modify_etc_hosts=True, modify_ssh_setup=True, 604 maintain_node_health=False, drbd_helper=None, uid_pool=None, 605 default_iallocator=None, default_iallocator_params=None, 606 primary_ip_version=None, ipolicy=None, 607 prealloc_wipe_disks=False, use_external_mip_script=False, 608 hv_state=None, disk_state=None, enabled_disk_templates=None, 609 install_image=None, zeroing_image=None, compression_tools=None, 610 enabled_user_shutdown=False):
611 """Initialise the cluster. 612 613 @type candidate_pool_size: int 614 @param candidate_pool_size: master candidate pool size 615 616 @type enabled_disk_templates: list of string 617 @param enabled_disk_templates: list of disk_templates to be used in this 618 cluster 619 620 @type enabled_user_shutdown: bool 621 @param enabled_user_shutdown: whether user shutdown is enabled cluster 622 wide 623 624 """ 625 # TODO: complete the docstring 626 if config.ConfigWriter.IsCluster(): 627 raise errors.OpPrereqError("Cluster is already initialised", 628 errors.ECODE_STATE) 629 630 data_dir = vcluster.AddNodePrefix(pathutils.DATA_DIR) 631 queue_dir = vcluster.AddNodePrefix(pathutils.QUEUE_DIR) 632 archive_dir = vcluster.AddNodePrefix(pathutils.JOB_QUEUE_ARCHIVE_DIR) 633 for ddir in [queue_dir, data_dir, archive_dir]: 634 if os.path.isdir(ddir): 635 for entry in os.listdir(ddir): 636 if not os.path.isdir(os.path.join(ddir, entry)): 637 raise errors.OpPrereqError( 638 "%s contains non-directory enries like %s. Remove left-overs of an" 639 " old cluster before initialising a new one" % (ddir, entry), 640 errors.ECODE_STATE) 641 642 if not enabled_hypervisors: 643 raise errors.OpPrereqError("Enabled hypervisors list must contain at" 644 " least one member", errors.ECODE_INVAL) 645 invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES 646 if invalid_hvs: 647 raise errors.OpPrereqError("Enabled hypervisors contains invalid" 648 " entries: %s" % invalid_hvs, 649 errors.ECODE_INVAL) 650 651 _InitCheckEnabledDiskTemplates(enabled_disk_templates) 652 653 try: 654 ipcls = netutils.IPAddress.GetClassFromIpVersion(primary_ip_version) 655 except errors.ProgrammerError: 656 raise errors.OpPrereqError("Invalid primary ip version: %d." % 657 primary_ip_version, errors.ECODE_INVAL) 658 659 hostname = netutils.GetHostname(family=ipcls.family) 660 if not ipcls.IsValid(hostname.ip): 661 raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d" 662 " address." % (hostname.ip, primary_ip_version), 663 errors.ECODE_INVAL) 664 665 if ipcls.IsLoopback(hostname.ip): 666 raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback" 667 " address. Please fix DNS or %s." % 668 (hostname.ip, pathutils.ETC_HOSTS), 669 errors.ECODE_ENVIRON) 670 671 if not ipcls.Own(hostname.ip): 672 raise errors.OpPrereqError("Inconsistency: this host's name resolves" 673 " to %s,\nbut this ip address does not" 674 " belong to this host" % 675 hostname.ip, errors.ECODE_ENVIRON) 676 677 clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family) 678 679 if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5): 680 raise errors.OpPrereqError("Cluster IP already active", 681 errors.ECODE_NOTUNIQUE) 682 683 if not secondary_ip: 684 if primary_ip_version == constants.IP6_VERSION: 685 raise errors.OpPrereqError("When using a IPv6 primary address, a valid" 686 " IPv4 address must be given as secondary", 687 errors.ECODE_INVAL) 688 secondary_ip = hostname.ip 689 690 if not netutils.IP4Address.IsValid(secondary_ip): 691 raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid" 692 " IPv4 address." % secondary_ip, 693 errors.ECODE_INVAL) 694 695 if not netutils.IP4Address.Own(secondary_ip): 696 raise errors.OpPrereqError("You gave %s as secondary IP," 697 " but it does not belong to this host." % 698 secondary_ip, errors.ECODE_ENVIRON) 699 700 if master_netmask is not None: 701 if not ipcls.ValidateNetmask(master_netmask): 702 raise errors.OpPrereqError("CIDR netmask (%s) not valid for IPv%s " % 703 (master_netmask, primary_ip_version), 704 errors.ECODE_INVAL) 705 else: 706 master_netmask = ipcls.iplen 707 708 if vg_name: 709 # Check if volume group is valid 710 vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name, 711 constants.MIN_VG_SIZE) 712 if vgstatus: 713 raise errors.OpPrereqError("Error: %s" % vgstatus, errors.ECODE_INVAL) 714 715 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates 716 _InitCheckDrbdHelper(drbd_helper, drbd_enabled) 717 718 logging.debug("Stopping daemons (if any are running)") 719 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-all"]) 720 if result.failed: 721 raise errors.OpExecError("Could not stop daemons, command %s" 722 " had exitcode %s and error '%s'" % 723 (result.cmd, result.exit_code, result.output)) 724 725 file_storage_dir = _PrepareFileStorage(enabled_disk_templates, 726 file_storage_dir) 727 shared_file_storage_dir = _PrepareSharedFileStorage(enabled_disk_templates, 728 shared_file_storage_dir) 729 gluster_storage_dir = _PrepareGlusterStorage(enabled_disk_templates, 730 gluster_storage_dir) 731 732 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix): 733 raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix, 734 errors.ECODE_INVAL) 735 736 if not nicparams.get('mode', None) == constants.NIC_MODE_OVS: 737 # Do not do this check if mode=openvswitch, since the openvswitch is not 738 # created yet 739 result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev]) 740 if result.failed: 741 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" % 742 (master_netdev, 743 result.output.strip()), errors.ECODE_INVAL) 744 745 dirs = [(pathutils.RUN_DIR, constants.RUN_DIRS_MODE)] 746 utils.EnsureDirs(dirs) 747 748 objects.UpgradeBeParams(beparams) 749 utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES) 750 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES) 751 752 objects.NIC.CheckParameterSyntax(nicparams) 753 754 full_ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy) 755 _RestrictIpolicyToEnabledDiskTemplates(full_ipolicy, enabled_disk_templates) 756 757 if ndparams is not None: 758 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES) 759 else: 760 ndparams = dict(constants.NDC_DEFAULTS) 761 762 # This is ugly, as we modify the dict itself 763 # FIXME: Make utils.ForceDictType pure functional or write a wrapper 764 # around it 765 if hv_state: 766 for hvname, hvs_data in hv_state.items(): 767 utils.ForceDictType(hvs_data, constants.HVSTS_PARAMETER_TYPES) 768 hv_state[hvname] = objects.Cluster.SimpleFillHvState(hvs_data) 769 else: 770 hv_state = dict((hvname, constants.HVST_DEFAULTS) 771 for hvname in enabled_hypervisors) 772 773 # FIXME: disk_state has no default values yet 774 if disk_state: 775 for storage, ds_data in disk_state.items(): 776 if storage not in constants.DS_VALID_TYPES: 777 raise errors.OpPrereqError("Invalid storage type in disk state: %s" % 778 storage, errors.ECODE_INVAL) 779 for ds_name, state in ds_data.items(): 780 utils.ForceDictType(state, constants.DSS_PARAMETER_TYPES) 781 ds_data[ds_name] = objects.Cluster.SimpleFillDiskState(state) 782 783 # hvparams is a mapping of hypervisor->hvparams dict 784 for hv_name, hv_params in hvparams.iteritems(): 785 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 786 hv_class = hypervisor.GetHypervisor(hv_name) 787 hv_class.CheckParameterSyntax(hv_params) 788 789 # diskparams is a mapping of disk-template->diskparams dict 790 for template, dt_params in diskparams.items(): 791 param_keys = set(dt_params.keys()) 792 default_param_keys = set(constants.DISK_DT_DEFAULTS[template].keys()) 793 if not (param_keys <= default_param_keys): 794 unknown_params = param_keys - default_param_keys 795 raise errors.OpPrereqError("Invalid parameters for disk template %s:" 796 " %s" % (template, 797 utils.CommaJoin(unknown_params)), 798 errors.ECODE_INVAL) 799 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES) 800 if template == constants.DT_DRBD8 and vg_name is not None: 801 # The default METAVG value is equal to the VG name set at init time, 802 # if provided 803 dt_params[constants.DRBD_DEFAULT_METAVG] = vg_name 804 805 try: 806 utils.VerifyDictOptions(diskparams, constants.DISK_DT_DEFAULTS) 807 except errors.OpPrereqError, err: 808 raise errors.OpPrereqError("While verify diskparam options: %s" % err, 809 errors.ECODE_INVAL) 810 811 # set up ssh config and /etc/hosts 812 rsa_sshkey = "" 813 dsa_sshkey = "" 814 if os.path.isfile(pathutils.SSH_HOST_RSA_PUB): 815 sshline = utils.ReadFile(pathutils.SSH_HOST_RSA_PUB) 816 rsa_sshkey = sshline.split(" ")[1] 817 if os.path.isfile(pathutils.SSH_HOST_DSA_PUB): 818 sshline = utils.ReadFile(pathutils.SSH_HOST_DSA_PUB) 819 dsa_sshkey = sshline.split(" ")[1] 820 if not rsa_sshkey and not dsa_sshkey: 821 raise errors.OpPrereqError("Failed to find SSH public keys", 822 errors.ECODE_ENVIRON) 823 824 if modify_etc_hosts: 825 utils.AddHostToEtcHosts(hostname.name, hostname.ip) 826 827 if modify_ssh_setup: 828 _InitSSHSetup() 829 830 if default_iallocator is not None: 831 alloc_script = utils.FindFile(default_iallocator, 832 constants.IALLOCATOR_SEARCH_PATH, 833 os.path.isfile) 834 if alloc_script is None: 835 raise errors.OpPrereqError("Invalid default iallocator script '%s'" 836 " specified" % default_iallocator, 837 errors.ECODE_INVAL) 838 else: 839 # default to htools 840 if utils.FindFile(constants.IALLOC_HAIL, 841 constants.IALLOCATOR_SEARCH_PATH, 842 os.path.isfile): 843 default_iallocator = constants.IALLOC_HAIL 844 845 # check if we have all the users we need 846 try: 847 runtime.GetEnts() 848 except errors.ConfigurationError, err: 849 raise errors.OpPrereqError("Required system user/group missing: %s" % 850 err, errors.ECODE_ENVIRON) 851 852 candidate_certs = {} 853 854 now = time.time() 855 856 if compression_tools is not None: 857 cluster.CheckCompressionTools(compression_tools) 858 859 # init of cluster config file 860 cluster_config = objects.Cluster( 861 serial_no=1, 862 rsahostkeypub=rsa_sshkey, 863 dsahostkeypub=dsa_sshkey, 864 highest_used_port=(constants.FIRST_DRBD_PORT - 1), 865 mac_prefix=mac_prefix, 866 volume_group_name=vg_name, 867 tcpudp_port_pool=set(), 868 master_ip=clustername.ip, 869 master_netmask=master_netmask, 870 master_netdev=master_netdev, 871 cluster_name=clustername.name, 872 file_storage_dir=file_storage_dir, 873 shared_file_storage_dir=shared_file_storage_dir, 874 gluster_storage_dir=gluster_storage_dir, 875 enabled_hypervisors=enabled_hypervisors, 876 beparams={constants.PP_DEFAULT: beparams}, 877 nicparams={constants.PP_DEFAULT: nicparams}, 878 ndparams=ndparams, 879 hvparams=hvparams, 880 diskparams=diskparams, 881 candidate_pool_size=candidate_pool_size, 882 modify_etc_hosts=modify_etc_hosts, 883 modify_ssh_setup=modify_ssh_setup, 884 uid_pool=uid_pool, 885 ctime=now, 886 mtime=now, 887 maintain_node_health=maintain_node_health, 888 drbd_usermode_helper=drbd_helper, 889 default_iallocator=default_iallocator, 890 default_iallocator_params=default_iallocator_params, 891 primary_ip_family=ipcls.family, 892 prealloc_wipe_disks=prealloc_wipe_disks, 893 use_external_mip_script=use_external_mip_script, 894 ipolicy=full_ipolicy, 895 hv_state_static=hv_state, 896 disk_state_static=disk_state, 897 enabled_disk_templates=enabled_disk_templates, 898 candidate_certs=candidate_certs, 899 osparams={}, 900 osparams_private_cluster={}, 901 install_image=install_image, 902 zeroing_image=zeroing_image, 903 compression_tools=compression_tools, 904 enabled_user_shutdown=enabled_user_shutdown, 905 ) 906 master_node_config = objects.Node(name=hostname.name, 907 primary_ip=hostname.ip, 908 secondary_ip=secondary_ip, 909 serial_no=1, 910 master_candidate=True, 911 offline=False, drained=False, 912 ctime=now, mtime=now, 913 ) 914 InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config) 915 cfg = config.ConfigWriter(offline=True) 916 ssh.WriteKnownHostsFile(cfg, pathutils.SSH_KNOWN_HOSTS_FILE) 917 cfg.Update(cfg.GetClusterInfo(), logging.error) 918 ssconf.WriteSsconfFiles(cfg.GetSsconfValues()) 919 920 # set up the inter-node password and certificate 921 _InitGanetiServerSetup(hostname.name, cfg) 922 923 logging.debug("Starting daemons") 924 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"]) 925 if result.failed: 926 raise errors.OpExecError("Could not start daemons, command %s" 927 " had exitcode %s and error %s" % 928 (result.cmd, result.exit_code, result.output)) 929 930 _WaitForMasterDaemon()
931 932
933 -def InitConfig(version, cluster_config, master_node_config, 934 cfg_file=pathutils.CLUSTER_CONF_FILE):
935 """Create the initial cluster configuration. 936 937 It will contain the current node, which will also be the master 938 node, and no instances. 939 940 @type version: int 941 @param version: configuration version 942 @type cluster_config: L{objects.Cluster} 943 @param cluster_config: cluster configuration 944 @type master_node_config: L{objects.Node} 945 @param master_node_config: master node configuration 946 @type cfg_file: string 947 @param cfg_file: configuration file path 948 949 """ 950 uuid_generator = config.TemporaryReservationManager() 951 cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID, 952 _INITCONF_ECID) 953 master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID, 954 _INITCONF_ECID) 955 cluster_config.master_node = master_node_config.uuid 956 nodes = { 957 master_node_config.uuid: master_node_config, 958 } 959 default_nodegroup = objects.NodeGroup( 960 uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID), 961 name=constants.INITIAL_NODE_GROUP_NAME, 962 members=[master_node_config.uuid], 963 diskparams={}, 964 ) 965 nodegroups = { 966 default_nodegroup.uuid: default_nodegroup, 967 } 968 now = time.time() 969 config_data = objects.ConfigData(version=version, 970 cluster=cluster_config, 971 nodegroups=nodegroups, 972 nodes=nodes, 973 instances={}, 974 networks={}, 975 disks={}, 976 serial_no=1, 977 ctime=now, mtime=now) 978 utils.WriteFile(cfg_file, 979 data=serializer.Dump(config_data.ToDict()), 980 mode=0600)
981 982
983 -def FinalizeClusterDestroy(master_uuid):
984 """Execute the last steps of cluster destroy 985 986 This function shuts down all the daemons, completing the destroy 987 begun in cmdlib.LUDestroyOpcode. 988 989 """ 990 livelock = utils.livelock.LiveLock("bootstrap_destroy") 991 cfg = config.GetConfig(None, livelock) 992 modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup 993 runner = rpc.BootstrapRunner() 994 995 master_name = cfg.GetNodeName(master_uuid) 996 997 master_params = cfg.GetMasterNetworkParameters() 998 master_params.uuid = master_uuid 999 ems = cfg.GetUseExternalMipScript() 1000 result = runner.call_node_deactivate_master_ip(master_name, master_params, 1001 ems) 1002 1003 msg = result.fail_msg 1004 if msg: 1005 logging.warning("Could not disable the master IP: %s", msg) 1006 1007 result = runner.call_node_stop_master(master_name) 1008 msg = result.fail_msg 1009 if msg: 1010 logging.warning("Could not disable the master role: %s", msg) 1011 1012 result = runner.call_node_leave_cluster(master_name, modify_ssh_setup) 1013 msg = result.fail_msg 1014 if msg: 1015 logging.warning("Could not shutdown the node daemon and cleanup" 1016 " the node: %s", msg)
1017 1018
1019 -def SetupNodeDaemon(opts, cluster_name, node, ssh_port):
1020 """Add a node to the cluster. 1021 1022 This function must be called before the actual opcode, and will ssh 1023 to the remote node, copy the needed files, and start ganeti-noded, 1024 allowing the master to do the rest via normal rpc calls. 1025 1026 @param cluster_name: the cluster name 1027 @param node: the name of the new node 1028 @param ssh_port: the SSH port of the new node 1029 1030 """ 1031 data = { 1032 constants.NDS_CLUSTER_NAME: cluster_name, 1033 constants.NDS_NODE_DAEMON_CERTIFICATE: 1034 utils.ReadFile(pathutils.NODED_CERT_FILE), 1035 constants.NDS_SSCONF: ssconf.SimpleStore().ReadAll(), 1036 constants.NDS_START_NODE_DAEMON: True, 1037 constants.NDS_NODE_NAME: node, 1038 } 1039 1040 RunNodeSetupCmd(cluster_name, node, pathutils.NODE_DAEMON_SETUP, 1041 opts.debug, opts.verbose, 1042 True, opts.ssh_key_check, opts.ssh_key_check, 1043 ssh_port, data) 1044 1045 _WaitForNodeDaemon(node)
1046 1047
1048 -def MasterFailover(no_voting=False):
1049 """Failover the master node. 1050 1051 This checks that we are not already the master, and will cause the 1052 current master to cease being master, and the non-master to become 1053 new master. 1054 1055 @type no_voting: boolean 1056 @param no_voting: force the operation without remote nodes agreement 1057 (dangerous) 1058 1059 @returns: the pair of an exit code and warnings to display 1060 """ 1061 sstore = ssconf.SimpleStore() 1062 1063 old_master, new_master = ssconf.GetMasterAndMyself(sstore) 1064 node_names = sstore.GetNodeList() 1065 mc_list = sstore.GetMasterCandidates() 1066 1067 if old_master == new_master: 1068 raise errors.OpPrereqError("This commands must be run on the node" 1069 " where you want the new master to be." 1070 " %s is already the master" % 1071 old_master, errors.ECODE_INVAL) 1072 1073 if new_master not in mc_list: 1074 mc_no_master = [name for name in mc_list if name != old_master] 1075 raise errors.OpPrereqError("This node is not among the nodes marked" 1076 " as master candidates. Only these nodes" 1077 " can become masters. Current list of" 1078 " master candidates is:\n" 1079 "%s" % ("\n".join(mc_no_master)), 1080 errors.ECODE_STATE) 1081 1082 if not no_voting: 1083 vote_list = GatherMasterVotes(node_names) 1084 1085 if vote_list: 1086 voted_master = vote_list[0][0] 1087 if voted_master is None: 1088 raise errors.OpPrereqError("Cluster is inconsistent, most nodes did" 1089 " not respond.", errors.ECODE_ENVIRON) 1090 elif voted_master != old_master: 1091 raise errors.OpPrereqError("I have a wrong configuration, I believe" 1092 " the master is %s but the other nodes" 1093 " voted %s. Please resync the configuration" 1094 " of this node." % 1095 (old_master, voted_master), 1096 errors.ECODE_STATE) 1097 # end checks 1098 1099 rcode = 0 1100 warnings = [] 1101 1102 logging.info("Setting master to %s, old master: %s", new_master, old_master) 1103 1104 try: 1105 # Forcefully start WConfd so that we can access the configuration 1106 result = utils.RunCmd([pathutils.DAEMON_UTIL, 1107 "start", constants.WCONFD, "--force-node", 1108 "--no-voting", "--yes-do-it"]) 1109 if result.failed: 1110 raise errors.OpPrereqError("Could not start the configuration daemon," 1111 " command %s had exitcode %s and error %s" % 1112 (result.cmd, result.exit_code, result.output), 1113 errors.ECODE_NOENT) 1114 1115 # instantiate a real config writer, as we now know we have the 1116 # configuration data 1117 livelock = utils.livelock.LiveLock("bootstrap_failover") 1118 cfg = config.GetConfig(None, livelock, accept_foreign=True) 1119 1120 old_master_node = cfg.GetNodeInfoByName(old_master) 1121 if old_master_node is None: 1122 raise errors.OpPrereqError("Could not find old master node '%s' in" 1123 " cluster configuration." % old_master, 1124 errors.ECODE_NOENT) 1125 1126 cluster_info = cfg.GetClusterInfo() 1127 new_master_node = cfg.GetNodeInfoByName(new_master) 1128 if new_master_node is None: 1129 raise errors.OpPrereqError("Could not find new master node '%s' in" 1130 " cluster configuration." % new_master, 1131 errors.ECODE_NOENT) 1132 1133 cluster_info.master_node = new_master_node.uuid 1134 # this will also regenerate the ssconf files, since we updated the 1135 # cluster info 1136 cfg.Update(cluster_info, logging.error) 1137 1138 # if cfg.Update worked, then it means the old master daemon won't be 1139 # able now to write its own config file (we rely on locking in both 1140 # backend.UploadFile() and ConfigWriter._Write(); hence the next 1141 # step is to kill the old master 1142 1143 logging.info("Stopping the master daemon on node %s", old_master) 1144 1145 runner = rpc.BootstrapRunner() 1146 master_params = cfg.GetMasterNetworkParameters() 1147 master_params.uuid = old_master_node.uuid 1148 ems = cfg.GetUseExternalMipScript() 1149 result = runner.call_node_deactivate_master_ip(old_master, 1150 master_params, ems) 1151 1152 msg = result.fail_msg 1153 if msg: 1154 warning = "Could not disable the master IP: %s" % (msg,) 1155 logging.warning("%s", warning) 1156 warnings.append(warning) 1157 1158 result = runner.call_node_stop_master(old_master) 1159 msg = result.fail_msg 1160 if msg: 1161 warning = ("Could not disable the master role on the old master" 1162 " %s, please disable manually: %s" % (old_master, msg)) 1163 logging.error("%s", warning) 1164 warnings.append(warning) 1165 except errors.ConfigurationError, err: 1166 logging.error("Error while trying to set the new master: %s", 1167 str(err)) 1168 return 1, warnings 1169 finally: 1170 # stop WConfd again: 1171 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.WCONFD]) 1172 if result.failed: 1173 warning = ("Could not stop the configuration daemon," 1174 " command %s had exitcode %s and error %s" 1175 % (result.cmd, result.exit_code, result.output)) 1176 logging.error("%s", warning) 1177 rcode = 1 1178 1179 logging.info("Checking master IP non-reachability...") 1180 1181 master_ip = sstore.GetMasterIP() 1182 total_timeout = 30 1183 1184 # Here we have a phase where no master should be running 1185 def _check_ip(expected): 1186 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT) != expected: 1187 raise utils.RetryAgain()
1188 1189 try: 1190 utils.Retry(_check_ip, (1, 1.5, 5), total_timeout, args=[False]) 1191 except utils.RetryTimeout: 1192 warning = ("The master IP is still reachable after %s seconds," 1193 " continuing but activating the master IP on the current" 1194 " node will probably fail" % total_timeout) 1195 logging.warning("%s", warning) 1196 warnings.append(warning) 1197 rcode = 1 1198 1199 if jstore.CheckDrainFlag(): 1200 logging.info("Undraining job queue") 1201 jstore.SetDrainFlag(False) 1202 1203 logging.info("Starting the master daemons on the new master") 1204 1205 result = rpc.BootstrapRunner().call_node_start_master_daemons(new_master, 1206 no_voting) 1207 msg = result.fail_msg 1208 if msg: 1209 logging.error("Could not start the master role on the new master" 1210 " %s, please check: %s", new_master, msg) 1211 rcode = 1 1212 1213 # Finally verify that the new master managed to set up the master IP 1214 # and warn if it didn't. 1215 try: 1216 utils.Retry(_check_ip, (1, 1.5, 5), total_timeout, args=[True]) 1217 except utils.RetryTimeout: 1218 warning = ("The master IP did not come up within %s seconds; the" 1219 " cluster should still be working and reachable via %s," 1220 " but not via the master IP address" 1221 % (total_timeout, new_master)) 1222 logging.warning("%s", warning) 1223 warnings.append(warning) 1224 rcode = 1 1225 1226 logging.info("Master failed over from %s to %s", old_master, new_master) 1227 return rcode, warnings 1228 1229
1230 -def GetMaster():
1231 """Returns the current master node. 1232 1233 This is a separate function in bootstrap since it's needed by 1234 gnt-cluster, and instead of importing directly ssconf, it's better 1235 to abstract it in bootstrap, where we do use ssconf in other 1236 functions too. 1237 1238 """ 1239 sstore = ssconf.SimpleStore() 1240 1241 old_master, _ = ssconf.GetMasterAndMyself(sstore) 1242 1243 return old_master
1244 1245
1246 -def GatherMasterVotes(node_names):
1247 """Check the agreement on who is the master. 1248 1249 This function will return a list of (node, number of votes), ordered 1250 by the number of votes. Errors will be denoted by the key 'None'. 1251 1252 Note that the sum of votes is the number of nodes this machine 1253 knows, whereas the number of entries in the list could be different 1254 (if some nodes vote for another master). 1255 1256 @type node_names: list 1257 @param node_names: the list of nodes to query for master info 1258 @rtype: list 1259 @return: list of (node, votes) 1260 1261 """ 1262 if not node_names: 1263 # no nodes 1264 return [] 1265 results = rpc.BootstrapRunner().call_master_node_name(node_names) 1266 if not isinstance(results, dict): 1267 # this should not happen (unless internal error in rpc) 1268 logging.critical("Can't complete rpc call, aborting master startup") 1269 return [(None, len(node_names))] 1270 votes = {} 1271 for node_name in results: 1272 nres = results[node_name] 1273 msg = nres.fail_msg 1274 1275 if msg: 1276 logging.warning("Error contacting node %s: %s", node_name, msg) 1277 node = None 1278 else: 1279 node = nres.payload 1280 1281 if node not in votes: 1282 votes[node] = 1 1283 else: 1284 votes[node] += 1 1285 1286 vote_list = [v for v in votes.items()] 1287 # sort first on number of votes then on name, since we want None 1288 # sorted later if we have the half of the nodes not responding, and 1289 # half voting all for the same master 1290 vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True) 1291 1292 return vote_list
1293 1294
1295 -def MajorityHealthy():
1296 """Check if the majority of nodes is healthy 1297 1298 Gather master votes from all nodes known to this node; 1299 return True if a strict majority of nodes is reachable and 1300 has some opinion on which node is master. Note that this will 1301 not guarantee any node to win an election but it ensures that 1302 a standard master-failover is still possible. 1303 1304 """ 1305 node_names = ssconf.SimpleStore().GetNodeList() 1306 node_count = len(node_names) 1307 vote_list = GatherMasterVotes(node_names) 1308 if vote_list is None: 1309 return False 1310 total_votes = sum([count for (node, count) in vote_list if node is not None]) 1311 logging.info("Total %d nodes, %d votes: %s", node_count, total_votes, 1312 vote_list) 1313 return 2 * total_votes > node_count
1314