Package ganeti :: Module backend
[hide private]
[frames] | no frames]

Source Code for Module ganeti.backend

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Functions used by the node daemon 
  23   
  24  @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in 
  25       the L{UploadFile} function 
  26  @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted 
  27       in the L{_CleanDirectory} function 
  28   
  29  """ 
  30   
  31  # pylint: disable=E1103 
  32   
  33  # E1103: %s %r has no %r member (but some types could not be 
  34  # inferred), because the _TryOSFromDisk returns either (True, os_obj) 
  35  # or (False, "string") which confuses pylint 
  36   
  37   
  38  import os 
  39  import os.path 
  40  import shutil 
  41  import time 
  42  import stat 
  43  import errno 
  44  import re 
  45  import random 
  46  import logging 
  47  import tempfile 
  48  import zlib 
  49  import base64 
  50  import signal 
  51   
  52  from ganeti import errors 
  53  from ganeti import utils 
  54  from ganeti import ssh 
  55  from ganeti import hypervisor 
  56  from ganeti import constants 
  57  from ganeti import bdev 
  58  from ganeti import objects 
  59  from ganeti import ssconf 
  60  from ganeti import serializer 
  61  from ganeti import netutils 
  62  from ganeti import runtime 
  63  from ganeti import mcpu 
  64  from ganeti import compat 
  65   
  66   
  67  _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id" 
  68  _ALLOWED_CLEAN_DIRS = frozenset([ 
  69    constants.DATA_DIR, 
  70    constants.JOB_QUEUE_ARCHIVE_DIR, 
  71    constants.QUEUE_DIR, 
  72    constants.CRYPTO_KEYS_DIR, 
  73    ]) 
  74  _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60 
  75  _X509_KEY_FILE = "key" 
  76  _X509_CERT_FILE = "cert" 
  77  _IES_STATUS_FILE = "status" 
  78  _IES_PID_FILE = "pid" 
  79  _IES_CA_FILE = "ca" 
  80   
  81  #: Valid LVS output line regex 
  82  _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$") 
  83   
  84  # Actions for the master setup script 
  85  _MASTER_START = "start" 
  86  _MASTER_STOP = "stop" 
87 88 89 -class RPCFail(Exception):
90 """Class denoting RPC failure. 91 92 Its argument is the error message. 93 94 """
95
96 97 -def _Fail(msg, *args, **kwargs):
98 """Log an error and the raise an RPCFail exception. 99 100 This exception is then handled specially in the ganeti daemon and 101 turned into a 'failed' return type. As such, this function is a 102 useful shortcut for logging the error and returning it to the master 103 daemon. 104 105 @type msg: string 106 @param msg: the text of the exception 107 @raise RPCFail 108 109 """ 110 if args: 111 msg = msg % args 112 if "log" not in kwargs or kwargs["log"]: # if we should log this error 113 if "exc" in kwargs and kwargs["exc"]: 114 logging.exception(msg) 115 else: 116 logging.error(msg) 117 raise RPCFail(msg)
118
119 120 -def _GetConfig():
121 """Simple wrapper to return a SimpleStore. 122 123 @rtype: L{ssconf.SimpleStore} 124 @return: a SimpleStore instance 125 126 """ 127 return ssconf.SimpleStore()
128
129 130 -def _GetSshRunner(cluster_name):
131 """Simple wrapper to return an SshRunner. 132 133 @type cluster_name: str 134 @param cluster_name: the cluster name, which is needed 135 by the SshRunner constructor 136 @rtype: L{ssh.SshRunner} 137 @return: an SshRunner instance 138 139 """ 140 return ssh.SshRunner(cluster_name)
141
142 143 -def _Decompress(data):
144 """Unpacks data compressed by the RPC client. 145 146 @type data: list or tuple 147 @param data: Data sent by RPC client 148 @rtype: str 149 @return: Decompressed data 150 151 """ 152 assert isinstance(data, (list, tuple)) 153 assert len(data) == 2 154 (encoding, content) = data 155 if encoding == constants.RPC_ENCODING_NONE: 156 return content 157 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64: 158 return zlib.decompress(base64.b64decode(content)) 159 else: 160 raise AssertionError("Unknown data encoding")
161
162 163 -def _CleanDirectory(path, exclude=None):
164 """Removes all regular files in a directory. 165 166 @type path: str 167 @param path: the directory to clean 168 @type exclude: list 169 @param exclude: list of files to be excluded, defaults 170 to the empty list 171 172 """ 173 if path not in _ALLOWED_CLEAN_DIRS: 174 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'", 175 path) 176 177 if not os.path.isdir(path): 178 return 179 if exclude is None: 180 exclude = [] 181 else: 182 # Normalize excluded paths 183 exclude = [os.path.normpath(i) for i in exclude] 184 185 for rel_name in utils.ListVisibleFiles(path): 186 full_name = utils.PathJoin(path, rel_name) 187 if full_name in exclude: 188 continue 189 if os.path.isfile(full_name) and not os.path.islink(full_name): 190 utils.RemoveFile(full_name)
191
192 193 -def _BuildUploadFileList():
194 """Build the list of allowed upload files. 195 196 This is abstracted so that it's built only once at module import time. 197 198 """ 199 allowed_files = set([ 200 constants.CLUSTER_CONF_FILE, 201 constants.ETC_HOSTS, 202 constants.SSH_KNOWN_HOSTS_FILE, 203 constants.VNC_PASSWORD_FILE, 204 constants.RAPI_CERT_FILE, 205 constants.SPICE_CERT_FILE, 206 constants.SPICE_CACERT_FILE, 207 constants.RAPI_USERS_FILE, 208 constants.CONFD_HMAC_KEY, 209 constants.CLUSTER_DOMAIN_SECRET_FILE, 210 ]) 211 212 for hv_name in constants.HYPER_TYPES: 213 hv_class = hypervisor.GetHypervisorClass(hv_name) 214 allowed_files.update(hv_class.GetAncillaryFiles()[0]) 215 216 return frozenset(allowed_files)
217 218 219 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
220 221 222 -def JobQueuePurge():
223 """Removes job queue files and archived jobs. 224 225 @rtype: tuple 226 @return: True, None 227 228 """ 229 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE]) 230 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
231
232 233 -def GetMasterInfo():
234 """Returns master information. 235 236 This is an utility function to compute master information, either 237 for consumption here or from the node daemon. 238 239 @rtype: tuple 240 @return: master_netdev, master_ip, master_name, primary_ip_family, 241 master_netmask 242 @raise RPCFail: in case of errors 243 244 """ 245 try: 246 cfg = _GetConfig() 247 master_netdev = cfg.GetMasterNetdev() 248 master_ip = cfg.GetMasterIP() 249 master_netmask = cfg.GetMasterNetmask() 250 master_node = cfg.GetMasterNode() 251 primary_ip_family = cfg.GetPrimaryIPFamily() 252 except errors.ConfigurationError, err: 253 _Fail("Cluster configuration incomplete: %s", err, exc=True) 254 return (master_netdev, master_ip, master_node, primary_ip_family, 255 master_netmask)
256
257 258 -def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
259 """Decorator that runs hooks before and after the decorated function. 260 261 @type hook_opcode: string 262 @param hook_opcode: opcode of the hook 263 @type hooks_path: string 264 @param hooks_path: path of the hooks 265 @type env_builder_fn: function 266 @param env_builder_fn: function that returns a dictionary containing the 267 environment variables for the hooks. Will get all the parameters of the 268 decorated function. 269 @raise RPCFail: in case of pre-hook failure 270 271 """ 272 def decorator(fn): 273 def wrapper(*args, **kwargs): 274 _, myself = ssconf.GetMasterAndMyself() 275 nodes = ([myself], [myself]) # these hooks run locally 276 277 env_fn = compat.partial(env_builder_fn, *args, **kwargs) 278 279 cfg = _GetConfig() 280 hr = HooksRunner() 281 hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks, 282 None, env_fn, logging.warning, cfg.GetClusterName(), 283 cfg.GetMasterNode()) 284 285 hm.RunPhase(constants.HOOKS_PHASE_PRE) 286 result = fn(*args, **kwargs) 287 hm.RunPhase(constants.HOOKS_PHASE_POST) 288 289 return result
290 return wrapper 291 return decorator 292
293 294 -def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
295 """Builds environment variables for master IP hooks. 296 297 @type master_params: L{objects.MasterNetworkParameters} 298 @param master_params: network parameters of the master 299 @type use_external_mip_script: boolean 300 @param use_external_mip_script: whether to use an external master IP 301 address setup script (unused, but necessary per the implementation of the 302 _RunLocalHooks decorator) 303 304 """ 305 # pylint: disable=W0613 306 ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family) 307 env = { 308 "MASTER_NETDEV": master_params.netdev, 309 "MASTER_IP": master_params.ip, 310 "MASTER_NETMASK": str(master_params.netmask), 311 "CLUSTER_IP_VERSION": str(ver), 312 } 313 314 return env
315
316 317 -def _RunMasterSetupScript(master_params, action, use_external_mip_script):
318 """Execute the master IP address setup script. 319 320 @type master_params: L{objects.MasterNetworkParameters} 321 @param master_params: network parameters of the master 322 @type action: string 323 @param action: action to pass to the script. Must be one of 324 L{backend._MASTER_START} or L{backend._MASTER_STOP} 325 @type use_external_mip_script: boolean 326 @param use_external_mip_script: whether to use an external master IP 327 address setup script 328 @raise backend.RPCFail: if there are errors during the execution of the 329 script 330 331 """ 332 env = _BuildMasterIpEnv(master_params) 333 334 if use_external_mip_script: 335 setup_script = constants.EXTERNAL_MASTER_SETUP_SCRIPT 336 else: 337 setup_script = constants.DEFAULT_MASTER_SETUP_SCRIPT 338 339 result = utils.RunCmd([setup_script, action], env=env, reset_env=True) 340 341 if result.failed: 342 _Fail("Failed to %s the master IP. Script return value: %s" % 343 (action, result.exit_code), log=True)
344 345 346 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup", 347 _BuildMasterIpEnv)
348 -def ActivateMasterIp(master_params, use_external_mip_script):
349 """Activate the IP address of the master daemon. 350 351 @type master_params: L{objects.MasterNetworkParameters} 352 @param master_params: network parameters of the master 353 @type use_external_mip_script: boolean 354 @param use_external_mip_script: whether to use an external master IP 355 address setup script 356 @raise RPCFail: in case of errors during the IP startup 357 358 """ 359 _RunMasterSetupScript(master_params, _MASTER_START, 360 use_external_mip_script)
361
362 363 -def StartMasterDaemons(no_voting):
364 """Activate local node as master node. 365 366 The function will start the master daemons (ganeti-masterd and ganeti-rapi). 367 368 @type no_voting: boolean 369 @param no_voting: whether to start ganeti-masterd without a node vote 370 but still non-interactively 371 @rtype: None 372 373 """ 374 375 if no_voting: 376 masterd_args = "--no-voting --yes-do-it" 377 else: 378 masterd_args = "" 379 380 env = { 381 "EXTRA_MASTERD_ARGS": masterd_args, 382 } 383 384 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env) 385 if result.failed: 386 msg = "Can't start Ganeti master: %s" % result.output 387 logging.error(msg) 388 _Fail(msg)
389 390 391 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown", 392 _BuildMasterIpEnv)
393 -def DeactivateMasterIp(master_params, use_external_mip_script):
394 """Deactivate the master IP on this node. 395 396 @type master_params: L{objects.MasterNetworkParameters} 397 @param master_params: network parameters of the master 398 @type use_external_mip_script: boolean 399 @param use_external_mip_script: whether to use an external master IP 400 address setup script 401 @raise RPCFail: in case of errors during the IP turndown 402 403 """ 404 _RunMasterSetupScript(master_params, _MASTER_STOP, 405 use_external_mip_script)
406
407 408 -def StopMasterDaemons():
409 """Stop the master daemons on this node. 410 411 Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node. 412 413 @rtype: None 414 415 """ 416 # TODO: log and report back to the caller the error failures; we 417 # need to decide in which case we fail the RPC for this 418 419 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"]) 420 if result.failed: 421 logging.error("Could not stop Ganeti master, command %s had exitcode %s" 422 " and error %s", 423 result.cmd, result.exit_code, result.output)
424
425 426 -def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
427 """Change the netmask of the master IP. 428 429 @param old_netmask: the old value of the netmask 430 @param netmask: the new value of the netmask 431 @param master_ip: the master IP 432 @param master_netdev: the master network device 433 434 """ 435 if old_netmask == netmask: 436 return 437 438 if not netutils.IPAddress.Own(master_ip): 439 _Fail("The master IP address is not up, not attempting to change its" 440 " netmask") 441 442 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add", 443 "%s/%s" % (master_ip, netmask), 444 "dev", master_netdev, "label", 445 "%s:0" % master_netdev]) 446 if result.failed: 447 _Fail("Could not set the new netmask on the master IP address") 448 449 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del", 450 "%s/%s" % (master_ip, old_netmask), 451 "dev", master_netdev, "label", 452 "%s:0" % master_netdev]) 453 if result.failed: 454 _Fail("Could not bring down the master IP address with the old netmask")
455
456 457 -def EtcHostsModify(mode, host, ip):
458 """Modify a host entry in /etc/hosts. 459 460 @param mode: The mode to operate. Either add or remove entry 461 @param host: The host to operate on 462 @param ip: The ip associated with the entry 463 464 """ 465 if mode == constants.ETC_HOSTS_ADD: 466 if not ip: 467 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not" 468 " present") 469 utils.AddHostToEtcHosts(host, ip) 470 elif mode == constants.ETC_HOSTS_REMOVE: 471 if ip: 472 RPCFail("Mode 'remove' does not allow 'ip' parameter, but" 473 " parameter is present") 474 utils.RemoveHostFromEtcHosts(host) 475 else: 476 RPCFail("Mode not supported")
477
478 479 -def LeaveCluster(modify_ssh_setup):
480 """Cleans up and remove the current node. 481 482 This function cleans up and prepares the current node to be removed 483 from the cluster. 484 485 If processing is successful, then it raises an 486 L{errors.QuitGanetiException} which is used as a special case to 487 shutdown the node daemon. 488 489 @param modify_ssh_setup: boolean 490 491 """ 492 _CleanDirectory(constants.DATA_DIR) 493 _CleanDirectory(constants.CRYPTO_KEYS_DIR) 494 JobQueuePurge() 495 496 if modify_ssh_setup: 497 try: 498 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS) 499 500 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key)) 501 502 utils.RemoveFile(priv_key) 503 utils.RemoveFile(pub_key) 504 except errors.OpExecError: 505 logging.exception("Error while processing ssh files") 506 507 try: 508 utils.RemoveFile(constants.CONFD_HMAC_KEY) 509 utils.RemoveFile(constants.RAPI_CERT_FILE) 510 utils.RemoveFile(constants.SPICE_CERT_FILE) 511 utils.RemoveFile(constants.SPICE_CACERT_FILE) 512 utils.RemoveFile(constants.NODED_CERT_FILE) 513 except: # pylint: disable=W0702 514 logging.exception("Error while removing cluster secrets") 515 516 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD]) 517 if result.failed: 518 logging.error("Command %s failed with exitcode %s and error %s", 519 result.cmd, result.exit_code, result.output) 520 521 # Raise a custom exception (handled in ganeti-noded) 522 raise errors.QuitGanetiException(True, "Shutdown scheduled")
523
524 525 -def _GetVgInfo(name):
526 """Retrieves information about a LVM volume group. 527 528 """ 529 # TODO: GetVGInfo supports returning information for multiple VGs at once 530 vginfo = bdev.LogicalVolume.GetVGInfo([name]) 531 if vginfo: 532 vg_free = int(round(vginfo[0][0], 0)) 533 vg_size = int(round(vginfo[0][1], 0)) 534 else: 535 vg_free = None 536 vg_size = None 537 538 return { 539 "name": name, 540 "vg_free": vg_free, 541 "vg_size": vg_size, 542 }
543
544 545 -def _GetHvInfo(name):
546 """Retrieves node information from a hypervisor. 547 548 The information returned depends on the hypervisor. Common items: 549 550 - vg_size is the size of the configured volume group in MiB 551 - vg_free is the free size of the volume group in MiB 552 - memory_dom0 is the memory allocated for domain0 in MiB 553 - memory_free is the currently available (free) ram in MiB 554 - memory_total is the total number of ram in MiB 555 - hv_version: the hypervisor version, if available 556 557 """ 558 return hypervisor.GetHypervisor(name).GetNodeInfo()
559
560 561 -def _GetNamedNodeInfo(names, fn):
562 """Calls C{fn} for all names in C{names} and returns a dictionary. 563 564 @rtype: None or dict 565 566 """ 567 if names is None: 568 return None 569 else: 570 return map(fn, names)
571
572 573 -def GetNodeInfo(vg_names, hv_names):
574 """Gives back a hash with different information about the node. 575 576 @type vg_names: list of string 577 @param vg_names: Names of the volume groups to ask for disk space information 578 @type hv_names: list of string 579 @param hv_names: Names of the hypervisors to ask for node information 580 @rtype: tuple; (string, None/dict, None/dict) 581 @return: Tuple containing boot ID, volume group information and hypervisor 582 information 583 584 """ 585 bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n") 586 vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo) 587 hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo) 588 589 return (bootid, vg_info, hv_info)
590
591 592 -def VerifyNode(what, cluster_name):
593 """Verify the status of the local node. 594 595 Based on the input L{what} parameter, various checks are done on the 596 local node. 597 598 If the I{filelist} key is present, this list of 599 files is checksummed and the file/checksum pairs are returned. 600 601 If the I{nodelist} key is present, we check that we have 602 connectivity via ssh with the target nodes (and check the hostname 603 report). 604 605 If the I{node-net-test} key is present, we check that we have 606 connectivity to the given nodes via both primary IP and, if 607 applicable, secondary IPs. 608 609 @type what: C{dict} 610 @param what: a dictionary of things to check: 611 - filelist: list of files for which to compute checksums 612 - nodelist: list of nodes we should check ssh communication with 613 - node-net-test: list of nodes we should check node daemon port 614 connectivity with 615 - hypervisor: list with hypervisors to run the verify for 616 @rtype: dict 617 @return: a dictionary with the same keys as the input dict, and 618 values representing the result of the checks 619 620 """ 621 result = {} 622 my_name = netutils.Hostname.GetSysName() 623 port = netutils.GetDaemonPort(constants.NODED) 624 vm_capable = my_name not in what.get(constants.NV_VMNODES, []) 625 626 if constants.NV_HYPERVISOR in what and vm_capable: 627 result[constants.NV_HYPERVISOR] = tmp = {} 628 for hv_name in what[constants.NV_HYPERVISOR]: 629 try: 630 val = hypervisor.GetHypervisor(hv_name).Verify() 631 except errors.HypervisorError, err: 632 val = "Error while checking hypervisor: %s" % str(err) 633 tmp[hv_name] = val 634 635 if constants.NV_HVPARAMS in what and vm_capable: 636 result[constants.NV_HVPARAMS] = tmp = [] 637 for source, hv_name, hvparms in what[constants.NV_HVPARAMS]: 638 try: 639 logging.info("Validating hv %s, %s", hv_name, hvparms) 640 hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms) 641 except errors.HypervisorError, err: 642 tmp.append((source, hv_name, str(err))) 643 644 if constants.NV_FILELIST in what: 645 result[constants.NV_FILELIST] = utils.FingerprintFiles( 646 what[constants.NV_FILELIST]) 647 648 if constants.NV_NODELIST in what: 649 (nodes, bynode) = what[constants.NV_NODELIST] 650 651 # Add nodes from other groups (different for each node) 652 try: 653 nodes.extend(bynode[my_name]) 654 except KeyError: 655 pass 656 657 # Use a random order 658 random.shuffle(nodes) 659 660 # Try to contact all nodes 661 val = {} 662 for node in nodes: 663 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node) 664 if not success: 665 val[node] = message 666 667 result[constants.NV_NODELIST] = val 668 669 if constants.NV_NODENETTEST in what: 670 result[constants.NV_NODENETTEST] = tmp = {} 671 my_pip = my_sip = None 672 for name, pip, sip in what[constants.NV_NODENETTEST]: 673 if name == my_name: 674 my_pip = pip 675 my_sip = sip 676 break 677 if not my_pip: 678 tmp[my_name] = ("Can't find my own primary/secondary IP" 679 " in the node list") 680 else: 681 for name, pip, sip in what[constants.NV_NODENETTEST]: 682 fail = [] 683 if not netutils.TcpPing(pip, port, source=my_pip): 684 fail.append("primary") 685 if sip != pip: 686 if not netutils.TcpPing(sip, port, source=my_sip): 687 fail.append("secondary") 688 if fail: 689 tmp[name] = ("failure using the %s interface(s)" % 690 " and ".join(fail)) 691 692 if constants.NV_MASTERIP in what: 693 # FIXME: add checks on incoming data structures (here and in the 694 # rest of the function) 695 master_name, master_ip = what[constants.NV_MASTERIP] 696 if master_name == my_name: 697 source = constants.IP4_ADDRESS_LOCALHOST 698 else: 699 source = None 700 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port, 701 source=source) 702 703 if constants.NV_USERSCRIPTS in what: 704 result[constants.NV_USERSCRIPTS] = \ 705 [script for script in what[constants.NV_USERSCRIPTS] 706 if not (os.path.exists(script) and os.access(script, os.X_OK))] 707 708 if constants.NV_OOB_PATHS in what: 709 result[constants.NV_OOB_PATHS] = tmp = [] 710 for path in what[constants.NV_OOB_PATHS]: 711 try: 712 st = os.stat(path) 713 except OSError, err: 714 tmp.append("error stating out of band helper: %s" % err) 715 else: 716 if stat.S_ISREG(st.st_mode): 717 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR: 718 tmp.append(None) 719 else: 720 tmp.append("out of band helper %s is not executable" % path) 721 else: 722 tmp.append("out of band helper %s is not a file" % path) 723 724 if constants.NV_LVLIST in what and vm_capable: 725 try: 726 val = GetVolumeList(utils.ListVolumeGroups().keys()) 727 except RPCFail, err: 728 val = str(err) 729 result[constants.NV_LVLIST] = val 730 731 if constants.NV_INSTANCELIST in what and vm_capable: 732 # GetInstanceList can fail 733 try: 734 val = GetInstanceList(what[constants.NV_INSTANCELIST]) 735 except RPCFail, err: 736 val = str(err) 737 result[constants.NV_INSTANCELIST] = val 738 739 if constants.NV_VGLIST in what and vm_capable: 740 result[constants.NV_VGLIST] = utils.ListVolumeGroups() 741 742 if constants.NV_PVLIST in what and vm_capable: 743 result[constants.NV_PVLIST] = \ 744 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST], 745 filter_allocatable=False) 746 747 if constants.NV_VERSION in what: 748 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION, 749 constants.RELEASE_VERSION) 750 751 if constants.NV_HVINFO in what and vm_capable: 752 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO]) 753 result[constants.NV_HVINFO] = hyper.GetNodeInfo() 754 755 if constants.NV_DRBDLIST in what and vm_capable: 756 try: 757 used_minors = bdev.DRBD8.GetUsedDevs().keys() 758 except errors.BlockDeviceError, err: 759 logging.warning("Can't get used minors list", exc_info=True) 760 used_minors = str(err) 761 result[constants.NV_DRBDLIST] = used_minors 762 763 if constants.NV_DRBDHELPER in what and vm_capable: 764 status = True 765 try: 766 payload = bdev.BaseDRBD.GetUsermodeHelper() 767 except errors.BlockDeviceError, err: 768 logging.error("Can't get DRBD usermode helper: %s", str(err)) 769 status = False 770 payload = str(err) 771 result[constants.NV_DRBDHELPER] = (status, payload) 772 773 if constants.NV_NODESETUP in what: 774 result[constants.NV_NODESETUP] = tmpr = [] 775 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"): 776 tmpr.append("The sysfs filesytem doesn't seem to be mounted" 777 " under /sys, missing required directories /sys/block" 778 " and /sys/class/net") 779 if (not os.path.isdir("/proc/sys") or 780 not os.path.isfile("/proc/sysrq-trigger")): 781 tmpr.append("The procfs filesystem doesn't seem to be mounted" 782 " under /proc, missing required directory /proc/sys and" 783 " the file /proc/sysrq-trigger") 784 785 if constants.NV_TIME in what: 786 result[constants.NV_TIME] = utils.SplitTime(time.time()) 787 788 if constants.NV_OSLIST in what and vm_capable: 789 result[constants.NV_OSLIST] = DiagnoseOS() 790 791 if constants.NV_BRIDGES in what and vm_capable: 792 result[constants.NV_BRIDGES] = [bridge 793 for bridge in what[constants.NV_BRIDGES] 794 if not utils.BridgeExists(bridge)] 795 return result
796
797 798 -def GetBlockDevSizes(devices):
799 """Return the size of the given block devices 800 801 @type devices: list 802 @param devices: list of block device nodes to query 803 @rtype: dict 804 @return: 805 dictionary of all block devices under /dev (key). The value is their 806 size in MiB. 807 808 {'/dev/disk/by-uuid/123456-12321231-312312-312': 124} 809 810 """ 811 DEV_PREFIX = "/dev/" 812 blockdevs = {} 813 814 for devpath in devices: 815 if not utils.IsBelowDir(DEV_PREFIX, devpath): 816 continue 817 818 try: 819 st = os.stat(devpath) 820 except EnvironmentError, err: 821 logging.warning("Error stat()'ing device %s: %s", devpath, str(err)) 822 continue 823 824 if stat.S_ISBLK(st.st_mode): 825 result = utils.RunCmd(["blockdev", "--getsize64", devpath]) 826 if result.failed: 827 # We don't want to fail, just do not list this device as available 828 logging.warning("Cannot get size for block device %s", devpath) 829 continue 830 831 size = int(result.stdout) / (1024 * 1024) 832 blockdevs[devpath] = size 833 return blockdevs
834
835 836 -def GetVolumeList(vg_names):
837 """Compute list of logical volumes and their size. 838 839 @type vg_names: list 840 @param vg_names: the volume groups whose LVs we should list, or 841 empty for all volume groups 842 @rtype: dict 843 @return: 844 dictionary of all partions (key) with value being a tuple of 845 their size (in MiB), inactive and online status:: 846 847 {'xenvg/test1': ('20.06', True, True)} 848 849 in case of errors, a string is returned with the error 850 details. 851 852 """ 853 lvs = {} 854 sep = "|" 855 if not vg_names: 856 vg_names = [] 857 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix", 858 "--separator=%s" % sep, 859 "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names) 860 if result.failed: 861 _Fail("Failed to list logical volumes, lvs output: %s", result.output) 862 863 for line in result.stdout.splitlines(): 864 line = line.strip() 865 match = _LVSLINE_REGEX.match(line) 866 if not match: 867 logging.error("Invalid line returned from lvs output: '%s'", line) 868 continue 869 vg_name, name, size, attr = match.groups() 870 inactive = attr[4] == "-" 871 online = attr[5] == "o" 872 virtual = attr[0] == "v" 873 if virtual: 874 # we don't want to report such volumes as existing, since they 875 # don't really hold data 876 continue 877 lvs[vg_name + "/" + name] = (size, inactive, online) 878 879 return lvs
880
881 882 -def ListVolumeGroups():
883 """List the volume groups and their size. 884 885 @rtype: dict 886 @return: dictionary with keys volume name and values the 887 size of the volume 888 889 """ 890 return utils.ListVolumeGroups()
891
892 893 -def NodeVolumes():
894 """List all volumes on this node. 895 896 @rtype: list 897 @return: 898 A list of dictionaries, each having four keys: 899 - name: the logical volume name, 900 - size: the size of the logical volume 901 - dev: the physical device on which the LV lives 902 - vg: the volume group to which it belongs 903 904 In case of errors, we return an empty list and log the 905 error. 906 907 Note that since a logical volume can live on multiple physical 908 volumes, the resulting list might include a logical volume 909 multiple times. 910 911 """ 912 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix", 913 "--separator=|", 914 "--options=lv_name,lv_size,devices,vg_name"]) 915 if result.failed: 916 _Fail("Failed to list logical volumes, lvs output: %s", 917 result.output) 918 919 def parse_dev(dev): 920 return dev.split("(")[0]
921 922 def handle_dev(dev): 923 return [parse_dev(x) for x in dev.split(",")] 924 925 def map_line(line): 926 line = [v.strip() for v in line] 927 return [{"name": line[0], "size": line[1], 928 "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])] 929 930 all_devs = [] 931 for line in result.stdout.splitlines(): 932 if line.count("|") >= 3: 933 all_devs.extend(map_line(line.split("|"))) 934 else: 935 logging.warning("Strange line in the output from lvs: '%s'", line) 936 return all_devs 937
938 939 -def BridgesExist(bridges_list):
940 """Check if a list of bridges exist on the current node. 941 942 @rtype: boolean 943 @return: C{True} if all of them exist, C{False} otherwise 944 945 """ 946 missing = [] 947 for bridge in bridges_list: 948 if not utils.BridgeExists(bridge): 949 missing.append(bridge) 950 951 if missing: 952 _Fail("Missing bridges %s", utils.CommaJoin(missing))
953
954 955 -def GetInstanceList(hypervisor_list):
956 """Provides a list of instances. 957 958 @type hypervisor_list: list 959 @param hypervisor_list: the list of hypervisors to query information 960 961 @rtype: list 962 @return: a list of all running instances on the current node 963 - instance1.example.com 964 - instance2.example.com 965 966 """ 967 results = [] 968 for hname in hypervisor_list: 969 try: 970 names = hypervisor.GetHypervisor(hname).ListInstances() 971 results.extend(names) 972 except errors.HypervisorError, err: 973 _Fail("Error enumerating instances (hypervisor %s): %s", 974 hname, err, exc=True) 975 976 return results
977
978 979 -def GetInstanceInfo(instance, hname):
980 """Gives back the information about an instance as a dictionary. 981 982 @type instance: string 983 @param instance: the instance name 984 @type hname: string 985 @param hname: the hypervisor type of the instance 986 987 @rtype: dict 988 @return: dictionary with the following keys: 989 - memory: memory size of instance (int) 990 - state: xen state of instance (string) 991 - time: cpu time of instance (float) 992 993 """ 994 output = {} 995 996 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance) 997 if iinfo is not None: 998 output["memory"] = iinfo[2] 999 output["state"] = iinfo[4] 1000 output["time"] = iinfo[5] 1001 1002 return output
1003
1004 1005 -def GetInstanceMigratable(instance):
1006 """Gives whether an instance can be migrated. 1007 1008 @type instance: L{objects.Instance} 1009 @param instance: object representing the instance to be checked. 1010 1011 @rtype: tuple 1012 @return: tuple of (result, description) where: 1013 - result: whether the instance can be migrated or not 1014 - description: a description of the issue, if relevant 1015 1016 """ 1017 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1018 iname = instance.name 1019 if iname not in hyper.ListInstances(): 1020 _Fail("Instance %s is not running", iname) 1021 1022 for idx in range(len(instance.disks)): 1023 link_name = _GetBlockDevSymlinkPath(iname, idx) 1024 if not os.path.islink(link_name): 1025 logging.warning("Instance %s is missing symlink %s for disk %d", 1026 iname, link_name, idx)
1027
1028 1029 -def GetAllInstancesInfo(hypervisor_list):
1030 """Gather data about all instances. 1031 1032 This is the equivalent of L{GetInstanceInfo}, except that it 1033 computes data for all instances at once, thus being faster if one 1034 needs data about more than one instance. 1035 1036 @type hypervisor_list: list 1037 @param hypervisor_list: list of hypervisors to query for instance data 1038 1039 @rtype: dict 1040 @return: dictionary of instance: data, with data having the following keys: 1041 - memory: memory size of instance (int) 1042 - state: xen state of instance (string) 1043 - time: cpu time of instance (float) 1044 - vcpus: the number of vcpus 1045 1046 """ 1047 output = {} 1048 1049 for hname in hypervisor_list: 1050 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo() 1051 if iinfo: 1052 for name, _, memory, vcpus, state, times in iinfo: 1053 value = { 1054 "memory": memory, 1055 "vcpus": vcpus, 1056 "state": state, 1057 "time": times, 1058 } 1059 if name in output: 1060 # we only check static parameters, like memory and vcpus, 1061 # and not state and time which can change between the 1062 # invocations of the different hypervisors 1063 for key in "memory", "vcpus": 1064 if value[key] != output[name][key]: 1065 _Fail("Instance %s is running twice" 1066 " with different parameters", name) 1067 output[name] = value 1068 1069 return output
1070
1071 1072 -def _InstanceLogName(kind, os_name, instance, component):
1073 """Compute the OS log filename for a given instance and operation. 1074 1075 The instance name and os name are passed in as strings since not all 1076 operations have these as part of an instance object. 1077 1078 @type kind: string 1079 @param kind: the operation type (e.g. add, import, etc.) 1080 @type os_name: string 1081 @param os_name: the os name 1082 @type instance: string 1083 @param instance: the name of the instance being imported/added/etc. 1084 @type component: string or None 1085 @param component: the name of the component of the instance being 1086 transferred 1087 1088 """ 1089 # TODO: Use tempfile.mkstemp to create unique filename 1090 if component: 1091 assert "/" not in component 1092 c_msg = "-%s" % component 1093 else: 1094 c_msg = "" 1095 base = ("%s-%s-%s%s-%s.log" % 1096 (kind, os_name, instance, c_msg, utils.TimestampForFilename())) 1097 return utils.PathJoin(constants.LOG_OS_DIR, base)
1098
1099 1100 -def InstanceOsAdd(instance, reinstall, debug):
1101 """Add an OS to an instance. 1102 1103 @type instance: L{objects.Instance} 1104 @param instance: Instance whose OS is to be installed 1105 @type reinstall: boolean 1106 @param reinstall: whether this is an instance reinstall 1107 @type debug: integer 1108 @param debug: debug level, passed to the OS scripts 1109 @rtype: None 1110 1111 """ 1112 inst_os = OSFromDisk(instance.os) 1113 1114 create_env = OSEnvironment(instance, inst_os, debug) 1115 if reinstall: 1116 create_env["INSTANCE_REINSTALL"] = "1" 1117 1118 logfile = _InstanceLogName("add", instance.os, instance.name, None) 1119 1120 result = utils.RunCmd([inst_os.create_script], env=create_env, 1121 cwd=inst_os.path, output=logfile, reset_env=True) 1122 if result.failed: 1123 logging.error("os create command '%s' returned error: %s, logfile: %s," 1124 " output: %s", result.cmd, result.fail_reason, logfile, 1125 result.output) 1126 lines = [utils.SafeEncode(val) 1127 for val in utils.TailFile(logfile, lines=20)] 1128 _Fail("OS create script failed (%s), last lines in the" 1129 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1130
1131 1132 -def RunRenameInstance(instance, old_name, debug):
1133 """Run the OS rename script for an instance. 1134 1135 @type instance: L{objects.Instance} 1136 @param instance: Instance whose OS is to be installed 1137 @type old_name: string 1138 @param old_name: previous instance name 1139 @type debug: integer 1140 @param debug: debug level, passed to the OS scripts 1141 @rtype: boolean 1142 @return: the success of the operation 1143 1144 """ 1145 inst_os = OSFromDisk(instance.os) 1146 1147 rename_env = OSEnvironment(instance, inst_os, debug) 1148 rename_env["OLD_INSTANCE_NAME"] = old_name 1149 1150 logfile = _InstanceLogName("rename", instance.os, 1151 "%s-%s" % (old_name, instance.name), None) 1152 1153 result = utils.RunCmd([inst_os.rename_script], env=rename_env, 1154 cwd=inst_os.path, output=logfile, reset_env=True) 1155 1156 if result.failed: 1157 logging.error("os create command '%s' returned error: %s output: %s", 1158 result.cmd, result.fail_reason, result.output) 1159 lines = [utils.SafeEncode(val) 1160 for val in utils.TailFile(logfile, lines=20)] 1161 _Fail("OS rename script failed (%s), last lines in the" 1162 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1163
1164 1165 -def _GetBlockDevSymlinkPath(instance_name, idx):
1166 return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" % 1167 (instance_name, constants.DISK_SEPARATOR, idx))
1168
1169 1170 -def _SymlinkBlockDev(instance_name, device_path, idx):
1171 """Set up symlinks to a instance's block device. 1172 1173 This is an auxiliary function run when an instance is start (on the primary 1174 node) or when an instance is migrated (on the target node). 1175 1176 1177 @param instance_name: the name of the target instance 1178 @param device_path: path of the physical block device, on the node 1179 @param idx: the disk index 1180 @return: absolute path to the disk's symlink 1181 1182 """ 1183 link_name = _GetBlockDevSymlinkPath(instance_name, idx) 1184 try: 1185 os.symlink(device_path, link_name) 1186 except OSError, err: 1187 if err.errno == errno.EEXIST: 1188 if (not os.path.islink(link_name) or 1189 os.readlink(link_name) != device_path): 1190 os.remove(link_name) 1191 os.symlink(device_path, link_name) 1192 else: 1193 raise 1194 1195 return link_name
1196 1209
1210 1211 -def _GatherAndLinkBlockDevs(instance):
1212 """Set up an instance's block device(s). 1213 1214 This is run on the primary node at instance startup. The block 1215 devices must be already assembled. 1216 1217 @type instance: L{objects.Instance} 1218 @param instance: the instance whose disks we shoul assemble 1219 @rtype: list 1220 @return: list of (disk_object, device_path) 1221 1222 """ 1223 block_devices = [] 1224 for idx, disk in enumerate(instance.disks): 1225 device = _RecursiveFindBD(disk) 1226 if device is None: 1227 raise errors.BlockDeviceError("Block device '%s' is not set up." % 1228 str(disk)) 1229 device.Open() 1230 try: 1231 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx) 1232 except OSError, e: 1233 raise errors.BlockDeviceError("Cannot create block device symlink: %s" % 1234 e.strerror) 1235 1236 block_devices.append((disk, link_name)) 1237 1238 return block_devices
1239
1240 1241 -def StartInstance(instance, startup_paused):
1242 """Start an instance. 1243 1244 @type instance: L{objects.Instance} 1245 @param instance: the instance object 1246 @type startup_paused: bool 1247 @param instance: pause instance at startup? 1248 @rtype: None 1249 1250 """ 1251 running_instances = GetInstanceList([instance.hypervisor]) 1252 1253 if instance.name in running_instances: 1254 logging.info("Instance %s already running, not starting", instance.name) 1255 return 1256 1257 try: 1258 block_devices = _GatherAndLinkBlockDevs(instance) 1259 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1260 hyper.StartInstance(instance, block_devices, startup_paused) 1261 except errors.BlockDeviceError, err: 1262 _Fail("Block device error: %s", err, exc=True) 1263 except errors.HypervisorError, err: 1264 _RemoveBlockDevLinks(instance.name, instance.disks) 1265 _Fail("Hypervisor error: %s", err, exc=True)
1266
1267 1268 -def InstanceShutdown(instance, timeout):
1269 """Shut an instance down. 1270 1271 @note: this functions uses polling with a hardcoded timeout. 1272 1273 @type instance: L{objects.Instance} 1274 @param instance: the instance object 1275 @type timeout: integer 1276 @param timeout: maximum timeout for soft shutdown 1277 @rtype: None 1278 1279 """ 1280 hv_name = instance.hypervisor 1281 hyper = hypervisor.GetHypervisor(hv_name) 1282 iname = instance.name 1283 1284 if instance.name not in hyper.ListInstances(): 1285 logging.info("Instance %s not running, doing nothing", iname) 1286 return 1287 1288 class _TryShutdown: 1289 def __init__(self): 1290 self.tried_once = False
1291 1292 def __call__(self): 1293 if iname not in hyper.ListInstances(): 1294 return 1295 1296 try: 1297 hyper.StopInstance(instance, retry=self.tried_once) 1298 except errors.HypervisorError, err: 1299 if iname not in hyper.ListInstances(): 1300 # if the instance is no longer existing, consider this a 1301 # success and go to cleanup 1302 return 1303 1304 _Fail("Failed to stop instance %s: %s", iname, err) 1305 1306 self.tried_once = True 1307 1308 raise utils.RetryAgain() 1309 1310 try: 1311 utils.Retry(_TryShutdown(), 5, timeout) 1312 except utils.RetryTimeout: 1313 # the shutdown did not succeed 1314 logging.error("Shutdown of '%s' unsuccessful, forcing", iname) 1315 1316 try: 1317 hyper.StopInstance(instance, force=True) 1318 except errors.HypervisorError, err: 1319 if iname in hyper.ListInstances(): 1320 # only raise an error if the instance still exists, otherwise 1321 # the error could simply be "instance ... unknown"! 1322 _Fail("Failed to force stop instance %s: %s", iname, err) 1323 1324 time.sleep(1) 1325 1326 if iname in hyper.ListInstances(): 1327 _Fail("Could not shutdown instance %s even by destroy", iname) 1328 1329 try: 1330 hyper.CleanupInstance(instance.name) 1331 except errors.HypervisorError, err: 1332 logging.warning("Failed to execute post-shutdown cleanup step: %s", err) 1333 1334 _RemoveBlockDevLinks(iname, instance.disks) 1335
1336 1337 -def InstanceReboot(instance, reboot_type, shutdown_timeout):
1338 """Reboot an instance. 1339 1340 @type instance: L{objects.Instance} 1341 @param instance: the instance object to reboot 1342 @type reboot_type: str 1343 @param reboot_type: the type of reboot, one the following 1344 constants: 1345 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the 1346 instance OS, do not recreate the VM 1347 - L{constants.INSTANCE_REBOOT_HARD}: tear down and 1348 restart the VM (at the hypervisor level) 1349 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is 1350 not accepted here, since that mode is handled differently, in 1351 cmdlib, and translates into full stop and start of the 1352 instance (instead of a call_instance_reboot RPC) 1353 @type shutdown_timeout: integer 1354 @param shutdown_timeout: maximum timeout for soft shutdown 1355 @rtype: None 1356 1357 """ 1358 running_instances = GetInstanceList([instance.hypervisor]) 1359 1360 if instance.name not in running_instances: 1361 _Fail("Cannot reboot instance %s that is not running", instance.name) 1362 1363 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1364 if reboot_type == constants.INSTANCE_REBOOT_SOFT: 1365 try: 1366 hyper.RebootInstance(instance) 1367 except errors.HypervisorError, err: 1368 _Fail("Failed to soft reboot instance %s: %s", instance.name, err) 1369 elif reboot_type == constants.INSTANCE_REBOOT_HARD: 1370 try: 1371 InstanceShutdown(instance, shutdown_timeout) 1372 return StartInstance(instance, False) 1373 except errors.HypervisorError, err: 1374 _Fail("Failed to hard reboot instance %s: %s", instance.name, err) 1375 else: 1376 _Fail("Invalid reboot_type received: %s", reboot_type)
1377
1378 1379 -def InstanceBalloonMemory(instance, memory):
1380 """Resize an instance's memory. 1381 1382 @type instance: L{objects.Instance} 1383 @param instance: the instance object 1384 @type memory: int 1385 @param memory: new memory amount in MB 1386 @rtype: None 1387 1388 """ 1389 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1390 running = hyper.ListInstances() 1391 if instance.name not in running: 1392 logging.info("Instance %s is not running, cannot balloon", instance.name) 1393 return 1394 try: 1395 hyper.BalloonInstanceMemory(instance, memory) 1396 except errors.HypervisorError, err: 1397 _Fail("Failed to balloon instance memory: %s", err, exc=True)
1398
1399 1400 -def MigrationInfo(instance):
1401 """Gather information about an instance to be migrated. 1402 1403 @type instance: L{objects.Instance} 1404 @param instance: the instance definition 1405 1406 """ 1407 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1408 try: 1409 info = hyper.MigrationInfo(instance) 1410 except errors.HypervisorError, err: 1411 _Fail("Failed to fetch migration information: %s", err, exc=True) 1412 return info
1413
1414 1415 -def AcceptInstance(instance, info, target):
1416 """Prepare the node to accept an instance. 1417 1418 @type instance: L{objects.Instance} 1419 @param instance: the instance definition 1420 @type info: string/data (opaque) 1421 @param info: migration information, from the source node 1422 @type target: string 1423 @param target: target host (usually ip), on this node 1424 1425 """ 1426 # TODO: why is this required only for DTS_EXT_MIRROR? 1427 if instance.disk_template in constants.DTS_EXT_MIRROR: 1428 # Create the symlinks, as the disks are not active 1429 # in any way 1430 try: 1431 _GatherAndLinkBlockDevs(instance) 1432 except errors.BlockDeviceError, err: 1433 _Fail("Block device error: %s", err, exc=True) 1434 1435 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1436 try: 1437 hyper.AcceptInstance(instance, info, target) 1438 except errors.HypervisorError, err: 1439 if instance.disk_template in constants.DTS_EXT_MIRROR: 1440 _RemoveBlockDevLinks(instance.name, instance.disks) 1441 _Fail("Failed to accept instance: %s", err, exc=True)
1442
1443 1444 -def FinalizeMigrationDst(instance, info, success):
1445 """Finalize any preparation to accept an instance. 1446 1447 @type instance: L{objects.Instance} 1448 @param instance: the instance definition 1449 @type info: string/data (opaque) 1450 @param info: migration information, from the source node 1451 @type success: boolean 1452 @param success: whether the migration was a success or a failure 1453 1454 """ 1455 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1456 try: 1457 hyper.FinalizeMigrationDst(instance, info, success) 1458 except errors.HypervisorError, err: 1459 _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1460
1461 1462 -def MigrateInstance(instance, target, live):
1463 """Migrates an instance to another node. 1464 1465 @type instance: L{objects.Instance} 1466 @param instance: the instance definition 1467 @type target: string 1468 @param target: the target node name 1469 @type live: boolean 1470 @param live: whether the migration should be done live or not (the 1471 interpretation of this parameter is left to the hypervisor) 1472 @raise RPCFail: if migration fails for some reason 1473 1474 """ 1475 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1476 1477 try: 1478 hyper.MigrateInstance(instance, target, live) 1479 except errors.HypervisorError, err: 1480 _Fail("Failed to migrate instance: %s", err, exc=True)
1481
1482 1483 -def FinalizeMigrationSource(instance, success, live):
1484 """Finalize the instance migration on the source node. 1485 1486 @type instance: L{objects.Instance} 1487 @param instance: the instance definition of the migrated instance 1488 @type success: bool 1489 @param success: whether the migration succeeded or not 1490 @type live: bool 1491 @param live: whether the user requested a live migration or not 1492 @raise RPCFail: If the execution fails for some reason 1493 1494 """ 1495 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1496 1497 try: 1498 hyper.FinalizeMigrationSource(instance, success, live) 1499 except Exception, err: # pylint: disable=W0703 1500 _Fail("Failed to finalize the migration on the source node: %s", err, 1501 exc=True)
1502
1503 1504 -def GetMigrationStatus(instance):
1505 """Get the migration status 1506 1507 @type instance: L{objects.Instance} 1508 @param instance: the instance that is being migrated 1509 @rtype: L{objects.MigrationStatus} 1510 @return: the status of the current migration (one of 1511 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional 1512 progress info that can be retrieved from the hypervisor 1513 @raise RPCFail: If the migration status cannot be retrieved 1514 1515 """ 1516 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1517 try: 1518 return hyper.GetMigrationStatus(instance) 1519 except Exception, err: # pylint: disable=W0703 1520 _Fail("Failed to get migration status: %s", err, exc=True)
1521
1522 1523 -def BlockdevCreate(disk, size, owner, on_primary, info):
1524 """Creates a block device for an instance. 1525 1526 @type disk: L{objects.Disk} 1527 @param disk: the object describing the disk we should create 1528 @type size: int 1529 @param size: the size of the physical underlying device, in MiB 1530 @type owner: str 1531 @param owner: the name of the instance for which disk is created, 1532 used for device cache data 1533 @type on_primary: boolean 1534 @param on_primary: indicates if it is the primary node or not 1535 @type info: string 1536 @param info: string that will be sent to the physical device 1537 creation, used for example to set (LVM) tags on LVs 1538 1539 @return: the new unique_id of the device (this can sometime be 1540 computed only after creation), or None. On secondary nodes, 1541 it's not required to return anything. 1542 1543 """ 1544 # TODO: remove the obsolete "size" argument 1545 # pylint: disable=W0613 1546 clist = [] 1547 if disk.children: 1548 for child in disk.children: 1549 try: 1550 crdev = _RecursiveAssembleBD(child, owner, on_primary) 1551 except errors.BlockDeviceError, err: 1552 _Fail("Can't assemble device %s: %s", child, err) 1553 if on_primary or disk.AssembleOnSecondary(): 1554 # we need the children open in case the device itself has to 1555 # be assembled 1556 try: 1557 # pylint: disable=E1103 1558 crdev.Open() 1559 except errors.BlockDeviceError, err: 1560 _Fail("Can't make child '%s' read-write: %s", child, err) 1561 clist.append(crdev) 1562 1563 try: 1564 device = bdev.Create(disk, clist) 1565 except errors.BlockDeviceError, err: 1566 _Fail("Can't create block device: %s", err) 1567 1568 if on_primary or disk.AssembleOnSecondary(): 1569 try: 1570 device.Assemble() 1571 except errors.BlockDeviceError, err: 1572 _Fail("Can't assemble device after creation, unusual event: %s", err) 1573 if on_primary or disk.OpenOnSecondary(): 1574 try: 1575 device.Open(force=True) 1576 except errors.BlockDeviceError, err: 1577 _Fail("Can't make device r/w after creation, unusual event: %s", err) 1578 DevCacheManager.UpdateCache(device.dev_path, owner, 1579 on_primary, disk.iv_name) 1580 1581 device.SetInfo(info) 1582 1583 return device.unique_id
1584
1585 1586 -def _WipeDevice(path, offset, size):
1587 """This function actually wipes the device. 1588 1589 @param path: The path to the device to wipe 1590 @param offset: The offset in MiB in the file 1591 @param size: The size in MiB to write 1592 1593 """ 1594 cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset, 1595 "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path, 1596 "count=%d" % size] 1597 result = utils.RunCmd(cmd) 1598 1599 if result.failed: 1600 _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd, 1601 result.fail_reason, result.output)
1602
1603 1604 -def BlockdevWipe(disk, offset, size):
1605 """Wipes a block device. 1606 1607 @type disk: L{objects.Disk} 1608 @param disk: the disk object we want to wipe 1609 @type offset: int 1610 @param offset: The offset in MiB in the file 1611 @type size: int 1612 @param size: The size in MiB to write 1613 1614 """ 1615 try: 1616 rdev = _RecursiveFindBD(disk) 1617 except errors.BlockDeviceError: 1618 rdev = None 1619 1620 if not rdev: 1621 _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name) 1622 1623 # Do cross verify some of the parameters 1624 if offset > rdev.size: 1625 _Fail("Offset is bigger than device size") 1626 if (offset + size) > rdev.size: 1627 _Fail("The provided offset and size to wipe is bigger than device size") 1628 1629 _WipeDevice(rdev.dev_path, offset, size)
1630
1631 1632 -def BlockdevPauseResumeSync(disks, pause):
1633 """Pause or resume the sync of the block device. 1634 1635 @type disks: list of L{objects.Disk} 1636 @param disks: the disks object we want to pause/resume 1637 @type pause: bool 1638 @param pause: Wheater to pause or resume 1639 1640 """ 1641 success = [] 1642 for disk in disks: 1643 try: 1644 rdev = _RecursiveFindBD(disk) 1645 except errors.BlockDeviceError: 1646 rdev = None 1647 1648 if not rdev: 1649 success.append((False, ("Cannot change sync for device %s:" 1650 " device not found" % disk.iv_name))) 1651 continue 1652 1653 result = rdev.PauseResumeSync(pause) 1654 1655 if result: 1656 success.append((result, None)) 1657 else: 1658 if pause: 1659 msg = "Pause" 1660 else: 1661 msg = "Resume" 1662 success.append((result, "%s for device %s failed" % (msg, disk.iv_name))) 1663 1664 return success
1665
1666 1667 -def BlockdevRemove(disk):
1668 """Remove a block device. 1669 1670 @note: This is intended to be called recursively. 1671 1672 @type disk: L{objects.Disk} 1673 @param disk: the disk object we should remove 1674 @rtype: boolean 1675 @return: the success of the operation 1676 1677 """ 1678 msgs = [] 1679 try: 1680 rdev = _RecursiveFindBD(disk) 1681 except errors.BlockDeviceError, err: 1682 # probably can't attach 1683 logging.info("Can't attach to device %s in remove", disk) 1684 rdev = None 1685 if rdev is not None: 1686 r_path = rdev.dev_path 1687 try: 1688 rdev.Remove() 1689 except errors.BlockDeviceError, err: 1690 msgs.append(str(err)) 1691 if not msgs: 1692 DevCacheManager.RemoveCache(r_path) 1693 1694 if disk.children: 1695 for child in disk.children: 1696 try: 1697 BlockdevRemove(child) 1698 except RPCFail, err: 1699 msgs.append(str(err)) 1700 1701 if msgs: 1702 _Fail("; ".join(msgs))
1703
1704 1705 -def _RecursiveAssembleBD(disk, owner, as_primary):
1706 """Activate a block device for an instance. 1707 1708 This is run on the primary and secondary nodes for an instance. 1709 1710 @note: this function is called recursively. 1711 1712 @type disk: L{objects.Disk} 1713 @param disk: the disk we try to assemble 1714 @type owner: str 1715 @param owner: the name of the instance which owns the disk 1716 @type as_primary: boolean 1717 @param as_primary: if we should make the block device 1718 read/write 1719 1720 @return: the assembled device or None (in case no device 1721 was assembled) 1722 @raise errors.BlockDeviceError: in case there is an error 1723 during the activation of the children or the device 1724 itself 1725 1726 """ 1727 children = [] 1728 if disk.children: 1729 mcn = disk.ChildrenNeeded() 1730 if mcn == -1: 1731 mcn = 0 # max number of Nones allowed 1732 else: 1733 mcn = len(disk.children) - mcn # max number of Nones 1734 for chld_disk in disk.children: 1735 try: 1736 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary) 1737 except errors.BlockDeviceError, err: 1738 if children.count(None) >= mcn: 1739 raise 1740 cdev = None 1741 logging.error("Error in child activation (but continuing): %s", 1742 str(err)) 1743 children.append(cdev) 1744 1745 if as_primary or disk.AssembleOnSecondary(): 1746 r_dev = bdev.Assemble(disk, children) 1747 result = r_dev 1748 if as_primary or disk.OpenOnSecondary(): 1749 r_dev.Open() 1750 DevCacheManager.UpdateCache(r_dev.dev_path, owner, 1751 as_primary, disk.iv_name) 1752 1753 else: 1754 result = True 1755 return result
1756
1757 1758 -def BlockdevAssemble(disk, owner, as_primary, idx):
1759 """Activate a block device for an instance. 1760 1761 This is a wrapper over _RecursiveAssembleBD. 1762 1763 @rtype: str or boolean 1764 @return: a C{/dev/...} path for primary nodes, and 1765 C{True} for secondary nodes 1766 1767 """ 1768 try: 1769 result = _RecursiveAssembleBD(disk, owner, as_primary) 1770 if isinstance(result, bdev.BlockDev): 1771 # pylint: disable=E1103 1772 result = result.dev_path 1773 if as_primary: 1774 _SymlinkBlockDev(owner, result, idx) 1775 except errors.BlockDeviceError, err: 1776 _Fail("Error while assembling disk: %s", err, exc=True) 1777 except OSError, err: 1778 _Fail("Error while symlinking disk: %s", err, exc=True) 1779 1780 return result
1781
1782 1783 -def BlockdevShutdown(disk):
1784 """Shut down a block device. 1785 1786 First, if the device is assembled (Attach() is successful), then 1787 the device is shutdown. Then the children of the device are 1788 shutdown. 1789 1790 This function is called recursively. Note that we don't cache the 1791 children or such, as oppossed to assemble, shutdown of different 1792 devices doesn't require that the upper device was active. 1793 1794 @type disk: L{objects.Disk} 1795 @param disk: the description of the disk we should 1796 shutdown 1797 @rtype: None 1798 1799 """ 1800 msgs = [] 1801 r_dev = _RecursiveFindBD(disk) 1802 if r_dev is not None: 1803 r_path = r_dev.dev_path 1804 try: 1805 r_dev.Shutdown() 1806 DevCacheManager.RemoveCache(r_path) 1807 except errors.BlockDeviceError, err: 1808 msgs.append(str(err)) 1809 1810 if disk.children: 1811 for child in disk.children: 1812 try: 1813 BlockdevShutdown(child) 1814 except RPCFail, err: 1815 msgs.append(str(err)) 1816 1817 if msgs: 1818 _Fail("; ".join(msgs))
1819
1820 1821 -def BlockdevAddchildren(parent_cdev, new_cdevs):
1822 """Extend a mirrored block device. 1823 1824 @type parent_cdev: L{objects.Disk} 1825 @param parent_cdev: the disk to which we should add children 1826 @type new_cdevs: list of L{objects.Disk} 1827 @param new_cdevs: the list of children which we should add 1828 @rtype: None 1829 1830 """ 1831 parent_bdev = _RecursiveFindBD(parent_cdev) 1832 if parent_bdev is None: 1833 _Fail("Can't find parent device '%s' in add children", parent_cdev) 1834 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs] 1835 if new_bdevs.count(None) > 0: 1836 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs) 1837 parent_bdev.AddChildren(new_bdevs)
1838
1839 1840 -def BlockdevRemovechildren(parent_cdev, new_cdevs):
1841 """Shrink a mirrored block device. 1842 1843 @type parent_cdev: L{objects.Disk} 1844 @param parent_cdev: the disk from which we should remove children 1845 @type new_cdevs: list of L{objects.Disk} 1846 @param new_cdevs: the list of children which we should remove 1847 @rtype: None 1848 1849 """ 1850 parent_bdev = _RecursiveFindBD(parent_cdev) 1851 if parent_bdev is None: 1852 _Fail("Can't find parent device '%s' in remove children", parent_cdev) 1853 devs = [] 1854 for disk in new_cdevs: 1855 rpath = disk.StaticDevPath() 1856 if rpath is None: 1857 bd = _RecursiveFindBD(disk) 1858 if bd is None: 1859 _Fail("Can't find device %s while removing children", disk) 1860 else: 1861 devs.append(bd.dev_path) 1862 else: 1863 if not utils.IsNormAbsPath(rpath): 1864 _Fail("Strange path returned from StaticDevPath: '%s'", rpath) 1865 devs.append(rpath) 1866 parent_bdev.RemoveChildren(devs)
1867
1868 1869 -def BlockdevGetmirrorstatus(disks):
1870 """Get the mirroring status of a list of devices. 1871 1872 @type disks: list of L{objects.Disk} 1873 @param disks: the list of disks which we should query 1874 @rtype: disk 1875 @return: List of L{objects.BlockDevStatus}, one for each disk 1876 @raise errors.BlockDeviceError: if any of the disks cannot be 1877 found 1878 1879 """ 1880 stats = [] 1881 for dsk in disks: 1882 rbd = _RecursiveFindBD(dsk) 1883 if rbd is None: 1884 _Fail("Can't find device %s", dsk) 1885 1886 stats.append(rbd.CombinedSyncStatus()) 1887 1888 return stats
1889
1890 1891 -def BlockdevGetmirrorstatusMulti(disks):
1892 """Get the mirroring status of a list of devices. 1893 1894 @type disks: list of L{objects.Disk} 1895 @param disks: the list of disks which we should query 1896 @rtype: disk 1897 @return: List of tuples, (bool, status), one for each disk; bool denotes 1898 success/failure, status is L{objects.BlockDevStatus} on success, string 1899 otherwise 1900 1901 """ 1902 result = [] 1903 for disk in disks: 1904 try: 1905 rbd = _RecursiveFindBD(disk) 1906 if rbd is None: 1907 result.append((False, "Can't find device %s" % disk)) 1908 continue 1909 1910 status = rbd.CombinedSyncStatus() 1911 except errors.BlockDeviceError, err: 1912 logging.exception("Error while getting disk status") 1913 result.append((False, str(err))) 1914 else: 1915 result.append((True, status)) 1916 1917 assert len(disks) == len(result) 1918 1919 return result
1920
1921 1922 -def _RecursiveFindBD(disk):
1923 """Check if a device is activated. 1924 1925 If so, return information about the real device. 1926 1927 @type disk: L{objects.Disk} 1928 @param disk: the disk object we need to find 1929 1930 @return: None if the device can't be found, 1931 otherwise the device instance 1932 1933 """ 1934 children = [] 1935 if disk.children: 1936 for chdisk in disk.children: 1937 children.append(_RecursiveFindBD(chdisk)) 1938 1939 return bdev.FindDevice(disk, children)
1940
1941 1942 -def _OpenRealBD(disk):
1943 """Opens the underlying block device of a disk. 1944 1945 @type disk: L{objects.Disk} 1946 @param disk: the disk object we want to open 1947 1948 """ 1949 real_disk = _RecursiveFindBD(disk) 1950 if real_disk is None: 1951 _Fail("Block device '%s' is not set up", disk) 1952 1953 real_disk.Open() 1954 1955 return real_disk
1956
1957 1958 -def BlockdevFind(disk):
1959 """Check if a device is activated. 1960 1961 If it is, return information about the real device. 1962 1963 @type disk: L{objects.Disk} 1964 @param disk: the disk to find 1965 @rtype: None or objects.BlockDevStatus 1966 @return: None if the disk cannot be found, otherwise a the current 1967 information 1968 1969 """ 1970 try: 1971 rbd = _RecursiveFindBD(disk) 1972 except errors.BlockDeviceError, err: 1973 _Fail("Failed to find device: %s", err, exc=True) 1974 1975 if rbd is None: 1976 return None 1977 1978 return rbd.GetSyncStatus()
1979
1980 1981 -def BlockdevGetsize(disks):
1982 """Computes the size of the given disks. 1983 1984 If a disk is not found, returns None instead. 1985 1986 @type disks: list of L{objects.Disk} 1987 @param disks: the list of disk to compute the size for 1988 @rtype: list 1989 @return: list with elements None if the disk cannot be found, 1990 otherwise the size 1991 1992 """ 1993 result = [] 1994 for cf in disks: 1995 try: 1996 rbd = _RecursiveFindBD(cf) 1997 except errors.BlockDeviceError: 1998 result.append(None) 1999 continue 2000 if rbd is None: 2001 result.append(None) 2002 else: 2003 result.append(rbd.GetActualSize()) 2004 return result
2005
2006 2007 -def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2008 """Export a block device to a remote node. 2009 2010 @type disk: L{objects.Disk} 2011 @param disk: the description of the disk to export 2012 @type dest_node: str 2013 @param dest_node: the destination node to export to 2014 @type dest_path: str 2015 @param dest_path: the destination path on the target node 2016 @type cluster_name: str 2017 @param cluster_name: the cluster name, needed for SSH hostalias 2018 @rtype: None 2019 2020 """ 2021 real_disk = _OpenRealBD(disk) 2022 2023 # the block size on the read dd is 1MiB to match our units 2024 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; " 2025 "dd if=%s bs=1048576 count=%s", 2026 real_disk.dev_path, str(disk.size)) 2027 2028 # we set here a smaller block size as, due to ssh buffering, more 2029 # than 64-128k will mostly ignored; we use nocreat to fail if the 2030 # device is not already there or we pass a wrong path; we use 2031 # notrunc to no attempt truncate on an LV device; we use oflag=dsync 2032 # to not buffer too much memory; this means that at best, we flush 2033 # every 64k, which will not be very fast 2034 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536" 2035 " oflag=dsync", dest_path) 2036 2037 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node, 2038 constants.GANETI_RUNAS, 2039 destcmd) 2040 2041 # all commands have been checked, so we're safe to combine them 2042 command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)]) 2043 2044 result = utils.RunCmd(["bash", "-c", command]) 2045 2046 if result.failed: 2047 _Fail("Disk copy command '%s' returned error: %s" 2048 " output: %s", command, result.fail_reason, result.output)
2049
2050 2051 -def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2052 """Write a file to the filesystem. 2053 2054 This allows the master to overwrite(!) a file. It will only perform 2055 the operation if the file belongs to a list of configuration files. 2056 2057 @type file_name: str 2058 @param file_name: the target file name 2059 @type data: str 2060 @param data: the new contents of the file 2061 @type mode: int 2062 @param mode: the mode to give the file (can be None) 2063 @type uid: string 2064 @param uid: the owner of the file 2065 @type gid: string 2066 @param gid: the group of the file 2067 @type atime: float 2068 @param atime: the atime to set on the file (can be None) 2069 @type mtime: float 2070 @param mtime: the mtime to set on the file (can be None) 2071 @rtype: None 2072 2073 """ 2074 if not os.path.isabs(file_name): 2075 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name) 2076 2077 if file_name not in _ALLOWED_UPLOAD_FILES: 2078 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'", 2079 file_name) 2080 2081 raw_data = _Decompress(data) 2082 2083 if not (isinstance(uid, basestring) and isinstance(gid, basestring)): 2084 _Fail("Invalid username/groupname type") 2085 2086 getents = runtime.GetEnts() 2087 uid = getents.LookupUser(uid) 2088 gid = getents.LookupGroup(gid) 2089 2090 utils.SafeWriteFile(file_name, None, 2091 data=raw_data, mode=mode, uid=uid, gid=gid, 2092 atime=atime, mtime=mtime)
2093
2094 2095 -def RunOob(oob_program, command, node, timeout):
2096 """Executes oob_program with given command on given node. 2097 2098 @param oob_program: The path to the executable oob_program 2099 @param command: The command to invoke on oob_program 2100 @param node: The node given as an argument to the program 2101 @param timeout: Timeout after which we kill the oob program 2102 2103 @return: stdout 2104 @raise RPCFail: If execution fails for some reason 2105 2106 """ 2107 result = utils.RunCmd([oob_program, command, node], timeout=timeout) 2108 2109 if result.failed: 2110 _Fail("'%s' failed with reason '%s'; output: %s", result.cmd, 2111 result.fail_reason, result.output) 2112 2113 return result.stdout
2114
2115 2116 -def WriteSsconfFiles(values):
2117 """Update all ssconf files. 2118 2119 Wrapper around the SimpleStore.WriteFiles. 2120 2121 """ 2122 ssconf.SimpleStore().WriteFiles(values)
2123
2124 2125 -def _OSOndiskAPIVersion(os_dir):
2126 """Compute and return the API version of a given OS. 2127 2128 This function will try to read the API version of the OS residing in 2129 the 'os_dir' directory. 2130 2131 @type os_dir: str 2132 @param os_dir: the directory in which we should look for the OS 2133 @rtype: tuple 2134 @return: tuple (status, data) with status denoting the validity and 2135 data holding either the vaid versions or an error message 2136 2137 """ 2138 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE) 2139 2140 try: 2141 st = os.stat(api_file) 2142 except EnvironmentError, err: 2143 return False, ("Required file '%s' not found under path %s: %s" % 2144 (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err))) 2145 2146 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)): 2147 return False, ("File '%s' in %s is not a regular file" % 2148 (constants.OS_API_FILE, os_dir)) 2149 2150 try: 2151 api_versions = utils.ReadFile(api_file).splitlines() 2152 except EnvironmentError, err: 2153 return False, ("Error while reading the API version file at %s: %s" % 2154 (api_file, utils.ErrnoOrStr(err))) 2155 2156 try: 2157 api_versions = [int(version.strip()) for version in api_versions] 2158 except (TypeError, ValueError), err: 2159 return False, ("API version(s) can't be converted to integer: %s" % 2160 str(err)) 2161 2162 return True, api_versions
2163
2164 2165 -def DiagnoseOS(top_dirs=None):
2166 """Compute the validity for all OSes. 2167 2168 @type top_dirs: list 2169 @param top_dirs: the list of directories in which to 2170 search (if not given defaults to 2171 L{constants.OS_SEARCH_PATH}) 2172 @rtype: list of L{objects.OS} 2173 @return: a list of tuples (name, path, status, diagnose, variants, 2174 parameters, api_version) for all (potential) OSes under all 2175 search paths, where: 2176 - name is the (potential) OS name 2177 - path is the full path to the OS 2178 - status True/False is the validity of the OS 2179 - diagnose is the error message for an invalid OS, otherwise empty 2180 - variants is a list of supported OS variants, if any 2181 - parameters is a list of (name, help) parameters, if any 2182 - api_version is a list of support OS API versions 2183 2184 """ 2185 if top_dirs is None: 2186 top_dirs = constants.OS_SEARCH_PATH 2187 2188 result = [] 2189 for dir_name in top_dirs: 2190 if os.path.isdir(dir_name): 2191 try: 2192 f_names = utils.ListVisibleFiles(dir_name) 2193 except EnvironmentError, err: 2194 logging.exception("Can't list the OS directory %s: %s", dir_name, err) 2195 break 2196 for name in f_names: 2197 os_path = utils.PathJoin(dir_name, name) 2198 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name) 2199 if status: 2200 diagnose = "" 2201 variants = os_inst.supported_variants 2202 parameters = os_inst.supported_parameters 2203 api_versions = os_inst.api_versions 2204 else: 2205 diagnose = os_inst 2206 variants = parameters = api_versions = [] 2207 result.append((name, os_path, status, diagnose, variants, 2208 parameters, api_versions)) 2209 2210 return result
2211
2212 2213 -def _TryOSFromDisk(name, base_dir=None):
2214 """Create an OS instance from disk. 2215 2216 This function will return an OS instance if the given name is a 2217 valid OS name. 2218 2219 @type base_dir: string 2220 @keyword base_dir: Base directory containing OS installations. 2221 Defaults to a search in all the OS_SEARCH_PATH dirs. 2222 @rtype: tuple 2223 @return: success and either the OS instance if we find a valid one, 2224 or error message 2225 2226 """ 2227 if base_dir is None: 2228 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir) 2229 else: 2230 os_dir = utils.FindFile(name, [base_dir], os.path.isdir) 2231 2232 if os_dir is None: 2233 return False, "Directory for OS %s not found in search path" % name 2234 2235 status, api_versions = _OSOndiskAPIVersion(os_dir) 2236 if not status: 2237 # push the error up 2238 return status, api_versions 2239 2240 if not constants.OS_API_VERSIONS.intersection(api_versions): 2241 return False, ("API version mismatch for path '%s': found %s, want %s." % 2242 (os_dir, api_versions, constants.OS_API_VERSIONS)) 2243 2244 # OS Files dictionary, we will populate it with the absolute path 2245 # names; if the value is True, then it is a required file, otherwise 2246 # an optional one 2247 os_files = dict.fromkeys(constants.OS_SCRIPTS, True) 2248 2249 if max(api_versions) >= constants.OS_API_V15: 2250 os_files[constants.OS_VARIANTS_FILE] = False 2251 2252 if max(api_versions) >= constants.OS_API_V20: 2253 os_files[constants.OS_PARAMETERS_FILE] = True 2254 else: 2255 del os_files[constants.OS_SCRIPT_VERIFY] 2256 2257 for (filename, required) in os_files.items(): 2258 os_files[filename] = utils.PathJoin(os_dir, filename) 2259 2260 try: 2261 st = os.stat(os_files[filename]) 2262 except EnvironmentError, err: 2263 if err.errno == errno.ENOENT and not required: 2264 del os_files[filename] 2265 continue 2266 return False, ("File '%s' under path '%s' is missing (%s)" % 2267 (filename, os_dir, utils.ErrnoOrStr(err))) 2268 2269 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)): 2270 return False, ("File '%s' under path '%s' is not a regular file" % 2271 (filename, os_dir)) 2272 2273 if filename in constants.OS_SCRIPTS: 2274 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR: 2275 return False, ("File '%s' under path '%s' is not executable" % 2276 (filename, os_dir)) 2277 2278 variants = [] 2279 if constants.OS_VARIANTS_FILE in os_files: 2280 variants_file = os_files[constants.OS_VARIANTS_FILE] 2281 try: 2282 variants = utils.ReadFile(variants_file).splitlines() 2283 except EnvironmentError, err: 2284 # we accept missing files, but not other errors 2285 if err.errno != errno.ENOENT: 2286 return False, ("Error while reading the OS variants file at %s: %s" % 2287 (variants_file, utils.ErrnoOrStr(err))) 2288 2289 parameters = [] 2290 if constants.OS_PARAMETERS_FILE in os_files: 2291 parameters_file = os_files[constants.OS_PARAMETERS_FILE] 2292 try: 2293 parameters = utils.ReadFile(parameters_file).splitlines() 2294 except EnvironmentError, err: 2295 return False, ("Error while reading the OS parameters file at %s: %s" % 2296 (parameters_file, utils.ErrnoOrStr(err))) 2297 parameters = [v.split(None, 1) for v in parameters] 2298 2299 os_obj = objects.OS(name=name, path=os_dir, 2300 create_script=os_files[constants.OS_SCRIPT_CREATE], 2301 export_script=os_files[constants.OS_SCRIPT_EXPORT], 2302 import_script=os_files[constants.OS_SCRIPT_IMPORT], 2303 rename_script=os_files[constants.OS_SCRIPT_RENAME], 2304 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY, 2305 None), 2306 supported_variants=variants, 2307 supported_parameters=parameters, 2308 api_versions=api_versions) 2309 return True, os_obj
2310
2311 2312 -def OSFromDisk(name, base_dir=None):
2313 """Create an OS instance from disk. 2314 2315 This function will return an OS instance if the given name is a 2316 valid OS name. Otherwise, it will raise an appropriate 2317 L{RPCFail} exception, detailing why this is not a valid OS. 2318 2319 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise 2320 an exception but returns true/false status data. 2321 2322 @type base_dir: string 2323 @keyword base_dir: Base directory containing OS installations. 2324 Defaults to a search in all the OS_SEARCH_PATH dirs. 2325 @rtype: L{objects.OS} 2326 @return: the OS instance if we find a valid one 2327 @raise RPCFail: if we don't find a valid OS 2328 2329 """ 2330 name_only = objects.OS.GetName(name) 2331 status, payload = _TryOSFromDisk(name_only, base_dir) 2332 2333 if not status: 2334 _Fail(payload) 2335 2336 return payload
2337
2338 2339 -def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2340 """Calculate the basic environment for an os script. 2341 2342 @type os_name: str 2343 @param os_name: full operating system name (including variant) 2344 @type inst_os: L{objects.OS} 2345 @param inst_os: operating system for which the environment is being built 2346 @type os_params: dict 2347 @param os_params: the OS parameters 2348 @type debug: integer 2349 @param debug: debug level (0 or 1, for OS Api 10) 2350 @rtype: dict 2351 @return: dict of environment variables 2352 @raise errors.BlockDeviceError: if the block device 2353 cannot be found 2354 2355 """ 2356 result = {} 2357 api_version = \ 2358 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions)) 2359 result["OS_API_VERSION"] = "%d" % api_version 2360 result["OS_NAME"] = inst_os.name 2361 result["DEBUG_LEVEL"] = "%d" % debug 2362 2363 # OS variants 2364 if api_version >= constants.OS_API_V15 and inst_os.supported_variants: 2365 variant = objects.OS.GetVariant(os_name) 2366 if not variant: 2367 variant = inst_os.supported_variants[0] 2368 else: 2369 variant = "" 2370 result["OS_VARIANT"] = variant 2371 2372 # OS params 2373 for pname, pvalue in os_params.items(): 2374 result["OSP_%s" % pname.upper()] = pvalue 2375 2376 # Set a default path otherwise programs called by OS scripts (or 2377 # even hooks called from OS scripts) might break, and we don't want 2378 # to have each script require setting a PATH variable 2379 result["PATH"] = constants.HOOKS_PATH 2380 2381 return result
2382
2383 2384 -def OSEnvironment(instance, inst_os, debug=0):
2385 """Calculate the environment for an os script. 2386 2387 @type instance: L{objects.Instance} 2388 @param instance: target instance for the os script run 2389 @type inst_os: L{objects.OS} 2390 @param inst_os: operating system for which the environment is being built 2391 @type debug: integer 2392 @param debug: debug level (0 or 1, for OS Api 10) 2393 @rtype: dict 2394 @return: dict of environment variables 2395 @raise errors.BlockDeviceError: if the block device 2396 cannot be found 2397 2398 """ 2399 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug) 2400 2401 for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]: 2402 result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr)) 2403 2404 result["HYPERVISOR"] = instance.hypervisor 2405 result["DISK_COUNT"] = "%d" % len(instance.disks) 2406 result["NIC_COUNT"] = "%d" % len(instance.nics) 2407 result["INSTANCE_SECONDARY_NODES"] = \ 2408 ("%s" % " ".join(instance.secondary_nodes)) 2409 2410 # Disks 2411 for idx, disk in enumerate(instance.disks): 2412 real_disk = _OpenRealBD(disk) 2413 result["DISK_%d_PATH" % idx] = real_disk.dev_path 2414 result["DISK_%d_ACCESS" % idx] = disk.mode 2415 if constants.HV_DISK_TYPE in instance.hvparams: 2416 result["DISK_%d_FRONTEND_TYPE" % idx] = \ 2417 instance.hvparams[constants.HV_DISK_TYPE] 2418 if disk.dev_type in constants.LDS_BLOCK: 2419 result["DISK_%d_BACKEND_TYPE" % idx] = "block" 2420 elif disk.dev_type == constants.LD_FILE: 2421 result["DISK_%d_BACKEND_TYPE" % idx] = \ 2422 "file:%s" % disk.physical_id[0] 2423 2424 # NICs 2425 for idx, nic in enumerate(instance.nics): 2426 result["NIC_%d_MAC" % idx] = nic.mac 2427 if nic.ip: 2428 result["NIC_%d_IP" % idx] = nic.ip 2429 result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE] 2430 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: 2431 result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK] 2432 if nic.nicparams[constants.NIC_LINK]: 2433 result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK] 2434 if constants.HV_NIC_TYPE in instance.hvparams: 2435 result["NIC_%d_FRONTEND_TYPE" % idx] = \ 2436 instance.hvparams[constants.HV_NIC_TYPE] 2437 2438 # HV/BE params 2439 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]: 2440 for key, value in source.items(): 2441 result["INSTANCE_%s_%s" % (kind, key)] = str(value) 2442 2443 return result
2444
2445 2446 -def BlockdevGrow(disk, amount, dryrun):
2447 """Grow a stack of block devices. 2448 2449 This function is called recursively, with the childrens being the 2450 first ones to resize. 2451 2452 @type disk: L{objects.Disk} 2453 @param disk: the disk to be grown 2454 @type amount: integer 2455 @param amount: the amount (in mebibytes) to grow with 2456 @type dryrun: boolean 2457 @param dryrun: whether to execute the operation in simulation mode 2458 only, without actually increasing the size 2459 @rtype: (status, result) 2460 @return: a tuple with the status of the operation (True/False), and 2461 the errors message if status is False 2462 2463 """ 2464 r_dev = _RecursiveFindBD(disk) 2465 if r_dev is None: 2466 _Fail("Cannot find block device %s", disk) 2467 2468 try: 2469 r_dev.Grow(amount, dryrun) 2470 except errors.BlockDeviceError, err: 2471 _Fail("Failed to grow block device: %s", err, exc=True)
2472
2473 2474 -def BlockdevSnapshot(disk):
2475 """Create a snapshot copy of a block device. 2476 2477 This function is called recursively, and the snapshot is actually created 2478 just for the leaf lvm backend device. 2479 2480 @type disk: L{objects.Disk} 2481 @param disk: the disk to be snapshotted 2482 @rtype: string 2483 @return: snapshot disk ID as (vg, lv) 2484 2485 """ 2486 if disk.dev_type == constants.LD_DRBD8: 2487 if not disk.children: 2488 _Fail("DRBD device '%s' without backing storage cannot be snapshotted", 2489 disk.unique_id) 2490 return BlockdevSnapshot(disk.children[0]) 2491 elif disk.dev_type == constants.LD_LV: 2492 r_dev = _RecursiveFindBD(disk) 2493 if r_dev is not None: 2494 # FIXME: choose a saner value for the snapshot size 2495 # let's stay on the safe side and ask for the full size, for now 2496 return r_dev.Snapshot(disk.size) 2497 else: 2498 _Fail("Cannot find block device %s", disk) 2499 else: 2500 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'", 2501 disk.unique_id, disk.dev_type)
2502
2503 2504 -def FinalizeExport(instance, snap_disks):
2505 """Write out the export configuration information. 2506 2507 @type instance: L{objects.Instance} 2508 @param instance: the instance which we export, used for 2509 saving configuration 2510 @type snap_disks: list of L{objects.Disk} 2511 @param snap_disks: list of snapshot block devices, which 2512 will be used to get the actual name of the dump file 2513 2514 @rtype: None 2515 2516 """ 2517 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new") 2518 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name) 2519 2520 config = objects.SerializableConfigParser() 2521 2522 config.add_section(constants.INISECT_EXP) 2523 config.set(constants.INISECT_EXP, "version", "0") 2524 config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time())) 2525 config.set(constants.INISECT_EXP, "source", instance.primary_node) 2526 config.set(constants.INISECT_EXP, "os", instance.os) 2527 config.set(constants.INISECT_EXP, "compression", "none") 2528 2529 config.add_section(constants.INISECT_INS) 2530 config.set(constants.INISECT_INS, "name", instance.name) 2531 config.set(constants.INISECT_INS, "maxmem", "%d" % 2532 instance.beparams[constants.BE_MAXMEM]) 2533 config.set(constants.INISECT_INS, "minmem", "%d" % 2534 instance.beparams[constants.BE_MINMEM]) 2535 # "memory" is deprecated, but useful for exporting to old ganeti versions 2536 config.set(constants.INISECT_INS, "memory", "%d" % 2537 instance.beparams[constants.BE_MAXMEM]) 2538 config.set(constants.INISECT_INS, "vcpus", "%d" % 2539 instance.beparams[constants.BE_VCPUS]) 2540 config.set(constants.INISECT_INS, "disk_template", instance.disk_template) 2541 config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor) 2542 config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags())) 2543 2544 nic_total = 0 2545 for nic_count, nic in enumerate(instance.nics): 2546 nic_total += 1 2547 config.set(constants.INISECT_INS, "nic%d_mac" % 2548 nic_count, "%s" % nic.mac) 2549 config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip) 2550 for param in constants.NICS_PARAMETER_TYPES: 2551 config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param), 2552 "%s" % nic.nicparams.get(param, None)) 2553 # TODO: redundant: on load can read nics until it doesn't exist 2554 config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total) 2555 2556 disk_total = 0 2557 for disk_count, disk in enumerate(snap_disks): 2558 if disk: 2559 disk_total += 1 2560 config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count, 2561 ("%s" % disk.iv_name)) 2562 config.set(constants.INISECT_INS, "disk%d_dump" % disk_count, 2563 ("%s" % disk.physical_id[1])) 2564 config.set(constants.INISECT_INS, "disk%d_size" % disk_count, 2565 ("%d" % disk.size)) 2566 2567 config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total) 2568 2569 # New-style hypervisor/backend parameters 2570 2571 config.add_section(constants.INISECT_HYP) 2572 for name, value in instance.hvparams.items(): 2573 if name not in constants.HVC_GLOBALS: 2574 config.set(constants.INISECT_HYP, name, str(value)) 2575 2576 config.add_section(constants.INISECT_BEP) 2577 for name, value in instance.beparams.items(): 2578 config.set(constants.INISECT_BEP, name, str(value)) 2579 2580 config.add_section(constants.INISECT_OSP) 2581 for name, value in instance.osparams.items(): 2582 config.set(constants.INISECT_OSP, name, str(value)) 2583 2584 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE), 2585 data=config.Dumps()) 2586 shutil.rmtree(finaldestdir, ignore_errors=True) 2587 shutil.move(destdir, finaldestdir)
2588
2589 2590 -def ExportInfo(dest):
2591 """Get export configuration information. 2592 2593 @type dest: str 2594 @param dest: directory containing the export 2595 2596 @rtype: L{objects.SerializableConfigParser} 2597 @return: a serializable config file containing the 2598 export info 2599 2600 """ 2601 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE) 2602 2603 config = objects.SerializableConfigParser() 2604 config.read(cff) 2605 2606 if (not config.has_section(constants.INISECT_EXP) or 2607 not config.has_section(constants.INISECT_INS)): 2608 _Fail("Export info file doesn't have the required fields") 2609 2610 return config.Dumps()
2611
2612 2613 -def ListExports():
2614 """Return a list of exports currently available on this machine. 2615 2616 @rtype: list 2617 @return: list of the exports 2618 2619 """ 2620 if os.path.isdir(constants.EXPORT_DIR): 2621 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR)) 2622 else: 2623 _Fail("No exports directory")
2624
2625 2626 -def RemoveExport(export):
2627 """Remove an existing export from the node. 2628 2629 @type export: str 2630 @param export: the name of the export to remove 2631 @rtype: None 2632 2633 """ 2634 target = utils.PathJoin(constants.EXPORT_DIR, export) 2635 2636 try: 2637 shutil.rmtree(target) 2638 except EnvironmentError, err: 2639 _Fail("Error while removing the export: %s", err, exc=True)
2640
2641 2642 -def BlockdevRename(devlist):
2643 """Rename a list of block devices. 2644 2645 @type devlist: list of tuples 2646 @param devlist: list of tuples of the form (disk, 2647 new_logical_id, new_physical_id); disk is an 2648 L{objects.Disk} object describing the current disk, 2649 and new logical_id/physical_id is the name we 2650 rename it to 2651 @rtype: boolean 2652 @return: True if all renames succeeded, False otherwise 2653 2654 """ 2655 msgs = [] 2656 result = True 2657 for disk, unique_id in devlist: 2658 dev = _RecursiveFindBD(disk) 2659 if dev is None: 2660 msgs.append("Can't find device %s in rename" % str(disk)) 2661 result = False 2662 continue 2663 try: 2664 old_rpath = dev.dev_path 2665 dev.Rename(unique_id) 2666 new_rpath = dev.dev_path 2667 if old_rpath != new_rpath: 2668 DevCacheManager.RemoveCache(old_rpath) 2669 # FIXME: we should add the new cache information here, like: 2670 # DevCacheManager.UpdateCache(new_rpath, owner, ...) 2671 # but we don't have the owner here - maybe parse from existing 2672 # cache? for now, we only lose lvm data when we rename, which 2673 # is less critical than DRBD or MD 2674 except errors.BlockDeviceError, err: 2675 msgs.append("Can't rename device '%s' to '%s': %s" % 2676 (dev, unique_id, err)) 2677 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id) 2678 result = False 2679 if not result: 2680 _Fail("; ".join(msgs))
2681
2682 2683 -def _TransformFileStorageDir(fs_dir):
2684 """Checks whether given file_storage_dir is valid. 2685 2686 Checks wheter the given fs_dir is within the cluster-wide default 2687 file_storage_dir or the shared_file_storage_dir, which are stored in 2688 SimpleStore. Only paths under those directories are allowed. 2689 2690 @type fs_dir: str 2691 @param fs_dir: the path to check 2692 2693 @return: the normalized path if valid, None otherwise 2694 2695 """ 2696 if not constants.ENABLE_FILE_STORAGE: 2697 _Fail("File storage disabled at configure time") 2698 cfg = _GetConfig() 2699 fs_dir = os.path.normpath(fs_dir) 2700 base_fstore = cfg.GetFileStorageDir() 2701 base_shared = cfg.GetSharedFileStorageDir() 2702 if not (utils.IsBelowDir(base_fstore, fs_dir) or 2703 utils.IsBelowDir(base_shared, fs_dir)): 2704 _Fail("File storage directory '%s' is not under base file" 2705 " storage directory '%s' or shared storage directory '%s'", 2706 fs_dir, base_fstore, base_shared) 2707 return fs_dir
2708
2709 2710 -def CreateFileStorageDir(file_storage_dir):
2711 """Create file storage directory. 2712 2713 @type file_storage_dir: str 2714 @param file_storage_dir: directory to create 2715 2716 @rtype: tuple 2717 @return: tuple with first element a boolean indicating wheter dir 2718 creation was successful or not 2719 2720 """ 2721 file_storage_dir = _TransformFileStorageDir(file_storage_dir) 2722 if os.path.exists(file_storage_dir): 2723 if not os.path.isdir(file_storage_dir): 2724 _Fail("Specified storage dir '%s' is not a directory", 2725 file_storage_dir) 2726 else: 2727 try: 2728 os.makedirs(file_storage_dir, 0750) 2729 except OSError, err: 2730 _Fail("Cannot create file storage directory '%s': %s", 2731 file_storage_dir, err, exc=True)
2732
2733 2734 -def RemoveFileStorageDir(file_storage_dir):
2735 """Remove file storage directory. 2736 2737 Remove it only if it's empty. If not log an error and return. 2738 2739 @type file_storage_dir: str 2740 @param file_storage_dir: the directory we should cleanup 2741 @rtype: tuple (success,) 2742 @return: tuple of one element, C{success}, denoting 2743 whether the operation was successful 2744 2745 """ 2746 file_storage_dir = _TransformFileStorageDir(file_storage_dir) 2747 if os.path.exists(file_storage_dir): 2748 if not os.path.isdir(file_storage_dir): 2749 _Fail("Specified Storage directory '%s' is not a directory", 2750 file_storage_dir) 2751 # deletes dir only if empty, otherwise we want to fail the rpc call 2752 try: 2753 os.rmdir(file_storage_dir) 2754 except OSError, err: 2755 _Fail("Cannot remove file storage directory '%s': %s", 2756 file_storage_dir, err)
2757
2758 2759 -def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2760 """Rename the file storage directory. 2761 2762 @type old_file_storage_dir: str 2763 @param old_file_storage_dir: the current path 2764 @type new_file_storage_dir: str 2765 @param new_file_storage_dir: the name we should rename to 2766 @rtype: tuple (success,) 2767 @return: tuple of one element, C{success}, denoting 2768 whether the operation was successful 2769 2770 """ 2771 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir) 2772 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir) 2773 if not os.path.exists(new_file_storage_dir): 2774 if os.path.isdir(old_file_storage_dir): 2775 try: 2776 os.rename(old_file_storage_dir, new_file_storage_dir) 2777 except OSError, err: 2778 _Fail("Cannot rename '%s' to '%s': %s", 2779 old_file_storage_dir, new_file_storage_dir, err) 2780 else: 2781 _Fail("Specified storage dir '%s' is not a directory", 2782 old_file_storage_dir) 2783 else: 2784 if os.path.exists(old_file_storage_dir): 2785 _Fail("Cannot rename '%s' to '%s': both locations exist", 2786 old_file_storage_dir, new_file_storage_dir)
2787
2788 2789 -def _EnsureJobQueueFile(file_name):
2790 """Checks whether the given filename is in the queue directory. 2791 2792 @type file_name: str 2793 @param file_name: the file name we should check 2794 @rtype: None 2795 @raises RPCFail: if the file is not valid 2796 2797 """ 2798 queue_dir = os.path.normpath(constants.QUEUE_DIR) 2799 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir) 2800 2801 if not result: 2802 _Fail("Passed job queue file '%s' does not belong to" 2803 " the queue directory '%s'", file_name, queue_dir)
2804
2805 2806 -def JobQueueUpdate(file_name, content):
2807 """Updates a file in the queue directory. 2808 2809 This is just a wrapper over L{utils.io.WriteFile}, with proper 2810 checking. 2811 2812 @type file_name: str 2813 @param file_name: the job file name 2814 @type content: str 2815 @param content: the new job contents 2816 @rtype: boolean 2817 @return: the success of the operation 2818 2819 """ 2820 _EnsureJobQueueFile(file_name) 2821 getents = runtime.GetEnts() 2822 2823 # Write and replace the file atomically 2824 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid, 2825 gid=getents.masterd_gid)
2826
2827 2828 -def JobQueueRename(old, new):
2829 """Renames a job queue file. 2830 2831 This is just a wrapper over os.rename with proper checking. 2832 2833 @type old: str 2834 @param old: the old (actual) file name 2835 @type new: str 2836 @param new: the desired file name 2837 @rtype: tuple 2838 @return: the success of the operation and payload 2839 2840 """ 2841 _EnsureJobQueueFile(old) 2842 _EnsureJobQueueFile(new) 2843 2844 getents = runtime.GetEnts() 2845 2846 utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700, 2847 dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2848
2849 2850 -def BlockdevClose(instance_name, disks):
2851 """Closes the given block devices. 2852 2853 This means they will be switched to secondary mode (in case of 2854 DRBD). 2855 2856 @param instance_name: if the argument is not empty, the symlinks 2857 of this instance will be removed 2858 @type disks: list of L{objects.Disk} 2859 @param disks: the list of disks to be closed 2860 @rtype: tuple (success, message) 2861 @return: a tuple of success and message, where success 2862 indicates the succes of the operation, and message 2863 which will contain the error details in case we 2864 failed 2865 2866 """ 2867 bdevs = [] 2868 for cf in disks: 2869 rd = _RecursiveFindBD(cf) 2870 if rd is None: 2871 _Fail("Can't find device %s", cf) 2872 bdevs.append(rd) 2873 2874 msg = [] 2875 for rd in bdevs: 2876 try: 2877 rd.Close() 2878 except errors.BlockDeviceError, err: 2879 msg.append(str(err)) 2880 if msg: 2881 _Fail("Can't make devices secondary: %s", ",".join(msg)) 2882 else: 2883 if instance_name: 2884 _RemoveBlockDevLinks(instance_name, disks)
2885
2886 2887 -def ValidateHVParams(hvname, hvparams):
2888 """Validates the given hypervisor parameters. 2889 2890 @type hvname: string 2891 @param hvname: the hypervisor name 2892 @type hvparams: dict 2893 @param hvparams: the hypervisor parameters to be validated 2894 @rtype: None 2895 2896 """ 2897 try: 2898 hv_type = hypervisor.GetHypervisor(hvname) 2899 hv_type.ValidateParameters(hvparams) 2900 except errors.HypervisorError, err: 2901 _Fail(str(err), log=False)
2902
2903 2904 -def _CheckOSPList(os_obj, parameters):
2905 """Check whether a list of parameters is supported by the OS. 2906 2907 @type os_obj: L{objects.OS} 2908 @param os_obj: OS object to check 2909 @type parameters: list 2910 @param parameters: the list of parameters to check 2911 2912 """ 2913 supported = [v[0] for v in os_obj.supported_parameters] 2914 delta = frozenset(parameters).difference(supported) 2915 if delta: 2916 _Fail("The following parameters are not supported" 2917 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2918
2919 2920 -def ValidateOS(required, osname, checks, osparams):
2921 """Validate the given OS' parameters. 2922 2923 @type required: boolean 2924 @param required: whether absence of the OS should translate into 2925 failure or not 2926 @type osname: string 2927 @param osname: the OS to be validated 2928 @type checks: list 2929 @param checks: list of the checks to run (currently only 'parameters') 2930 @type osparams: dict 2931 @param osparams: dictionary with OS parameters 2932 @rtype: boolean 2933 @return: True if the validation passed, or False if the OS was not 2934 found and L{required} was false 2935 2936 """ 2937 if not constants.OS_VALIDATE_CALLS.issuperset(checks): 2938 _Fail("Unknown checks required for OS %s: %s", osname, 2939 set(checks).difference(constants.OS_VALIDATE_CALLS)) 2940 2941 name_only = objects.OS.GetName(osname) 2942 status, tbv = _TryOSFromDisk(name_only, None) 2943 2944 if not status: 2945 if required: 2946 _Fail(tbv) 2947 else: 2948 return False 2949 2950 if max(tbv.api_versions) < constants.OS_API_V20: 2951 return True 2952 2953 if constants.OS_VALIDATE_PARAMETERS in checks: 2954 _CheckOSPList(tbv, osparams.keys()) 2955 2956 validate_env = OSCoreEnv(osname, tbv, osparams) 2957 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env, 2958 cwd=tbv.path, reset_env=True) 2959 if result.failed: 2960 logging.error("os validate command '%s' returned error: %s output: %s", 2961 result.cmd, result.fail_reason, result.output) 2962 _Fail("OS validation script failed (%s), output: %s", 2963 result.fail_reason, result.output, log=False) 2964 2965 return True
2966
2967 2968 -def DemoteFromMC():
2969 """Demotes the current node from master candidate role. 2970 2971 """ 2972 # try to ensure we're not the master by mistake 2973 master, myself = ssconf.GetMasterAndMyself() 2974 if master == myself: 2975 _Fail("ssconf status shows I'm the master node, will not demote") 2976 2977 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD]) 2978 if not result.failed: 2979 _Fail("The master daemon is running, will not demote") 2980 2981 try: 2982 if os.path.isfile(constants.CLUSTER_CONF_FILE): 2983 utils.CreateBackup(constants.CLUSTER_CONF_FILE) 2984 except EnvironmentError, err: 2985 if err.errno != errno.ENOENT: 2986 _Fail("Error while backing up cluster file: %s", err, exc=True) 2987 2988 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2989
2990 2991 -def _GetX509Filenames(cryptodir, name):
2992 """Returns the full paths for the private key and certificate. 2993 2994 """ 2995 return (utils.PathJoin(cryptodir, name), 2996 utils.PathJoin(cryptodir, name, _X509_KEY_FILE), 2997 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2998
2999 3000 -def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
3001 """Creates a new X509 certificate for SSL/TLS. 3002 3003 @type validity: int 3004 @param validity: Validity in seconds 3005 @rtype: tuple; (string, string) 3006 @return: Certificate name and public part 3007 3008 """ 3009 (key_pem, cert_pem) = \ 3010 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(), 3011 min(validity, _MAX_SSL_CERT_VALIDITY)) 3012 3013 cert_dir = tempfile.mkdtemp(dir=cryptodir, 3014 prefix="x509-%s-" % utils.TimestampForFilename()) 3015 try: 3016 name = os.path.basename(cert_dir) 3017 assert len(name) > 5 3018 3019 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name) 3020 3021 utils.WriteFile(key_file, mode=0400, data=key_pem) 3022 utils.WriteFile(cert_file, mode=0400, data=cert_pem) 3023 3024 # Never return private key as it shouldn't leave the node 3025 return (name, cert_pem) 3026 except Exception: 3027 shutil.rmtree(cert_dir, ignore_errors=True) 3028 raise
3029
3030 3031 -def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
3032 """Removes a X509 certificate. 3033 3034 @type name: string 3035 @param name: Certificate name 3036 3037 """ 3038 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name) 3039 3040 utils.RemoveFile(key_file) 3041 utils.RemoveFile(cert_file) 3042 3043 try: 3044 os.rmdir(cert_dir) 3045 except EnvironmentError, err: 3046 _Fail("Cannot remove certificate directory '%s': %s", 3047 cert_dir, err)
3048
3049 3050 -def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3051 """Returns the command for the requested input/output. 3052 3053 @type instance: L{objects.Instance} 3054 @param instance: The instance object 3055 @param mode: Import/export mode 3056 @param ieio: Input/output type 3057 @param ieargs: Input/output arguments 3058 3059 """ 3060 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT) 3061 3062 env = None 3063 prefix = None 3064 suffix = None 3065 exp_size = None 3066 3067 if ieio == constants.IEIO_FILE: 3068 (filename, ) = ieargs 3069 3070 if not utils.IsNormAbsPath(filename): 3071 _Fail("Path '%s' is not normalized or absolute", filename) 3072 3073 real_filename = os.path.realpath(filename) 3074 directory = os.path.dirname(real_filename) 3075 3076 if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename): 3077 _Fail("File '%s' is not under exports directory '%s': %s", 3078 filename, constants.EXPORT_DIR, real_filename) 3079 3080 # Create directory 3081 utils.Makedirs(directory, mode=0750) 3082 3083 quoted_filename = utils.ShellQuote(filename) 3084 3085 if mode == constants.IEM_IMPORT: 3086 suffix = "> %s" % quoted_filename 3087 elif mode == constants.IEM_EXPORT: 3088 suffix = "< %s" % quoted_filename 3089 3090 # Retrieve file size 3091 try: 3092 st = os.stat(filename) 3093 except EnvironmentError, err: 3094 logging.error("Can't stat(2) %s: %s", filename, err) 3095 else: 3096 exp_size = utils.BytesToMebibyte(st.st_size) 3097 3098 elif ieio == constants.IEIO_RAW_DISK: 3099 (disk, ) = ieargs 3100 3101 real_disk = _OpenRealBD(disk) 3102 3103 if mode == constants.IEM_IMPORT: 3104 # we set here a smaller block size as, due to transport buffering, more 3105 # than 64-128k will mostly ignored; we use nocreat to fail if the device 3106 # is not already there or we pass a wrong path; we use notrunc to no 3107 # attempt truncate on an LV device; we use oflag=dsync to not buffer too 3108 # much memory; this means that at best, we flush every 64k, which will 3109 # not be very fast 3110 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc" 3111 " bs=%s oflag=dsync"), 3112 real_disk.dev_path, 3113 str(64 * 1024)) 3114 3115 elif mode == constants.IEM_EXPORT: 3116 # the block size on the read dd is 1MiB to match our units 3117 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |", 3118 real_disk.dev_path, 3119 str(1024 * 1024), # 1 MB 3120 str(disk.size)) 3121 exp_size = disk.size 3122 3123 elif ieio == constants.IEIO_SCRIPT: 3124 (disk, disk_index, ) = ieargs 3125 3126 assert isinstance(disk_index, (int, long)) 3127 3128 real_disk = _OpenRealBD(disk) 3129 3130 inst_os = OSFromDisk(instance.os) 3131 env = OSEnvironment(instance, inst_os) 3132 3133 if mode == constants.IEM_IMPORT: 3134 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index] 3135 env["IMPORT_INDEX"] = str(disk_index) 3136 script = inst_os.import_script 3137 3138 elif mode == constants.IEM_EXPORT: 3139 env["EXPORT_DEVICE"] = real_disk.dev_path 3140 env["EXPORT_INDEX"] = str(disk_index) 3141 script = inst_os.export_script 3142 3143 # TODO: Pass special environment only to script 3144 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script) 3145 3146 if mode == constants.IEM_IMPORT: 3147 suffix = "| %s" % script_cmd 3148 3149 elif mode == constants.IEM_EXPORT: 3150 prefix = "%s |" % script_cmd 3151 3152 # Let script predict size 3153 exp_size = constants.IE_CUSTOM_SIZE 3154 3155 else: 3156 _Fail("Invalid %s I/O mode %r", mode, ieio) 3157 3158 return (env, prefix, suffix, exp_size)
3159
3160 3161 -def _CreateImportExportStatusDir(prefix):
3162 """Creates status directory for import/export. 3163 3164 """ 3165 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR, 3166 prefix=("%s-%s-" % 3167 (prefix, utils.TimestampForFilename())))
3168
3169 3170 -def StartImportExportDaemon(mode, opts, host, port, instance, component, 3171 ieio, ieioargs):
3172 """Starts an import or export daemon. 3173 3174 @param mode: Import/output mode 3175 @type opts: L{objects.ImportExportOptions} 3176 @param opts: Daemon options 3177 @type host: string 3178 @param host: Remote host for export (None for import) 3179 @type port: int 3180 @param port: Remote port for export (None for import) 3181 @type instance: L{objects.Instance} 3182 @param instance: Instance object 3183 @type component: string 3184 @param component: which part of the instance is transferred now, 3185 e.g. 'disk/0' 3186 @param ieio: Input/output type 3187 @param ieioargs: Input/output arguments 3188 3189 """ 3190 if mode == constants.IEM_IMPORT: 3191 prefix = "import" 3192 3193 if not (host is None and port is None): 3194 _Fail("Can not specify host or port on import") 3195 3196 elif mode == constants.IEM_EXPORT: 3197 prefix = "export" 3198 3199 if host is None or port is None: 3200 _Fail("Host and port must be specified for an export") 3201 3202 else: 3203 _Fail("Invalid mode %r", mode) 3204 3205 if (opts.key_name is None) ^ (opts.ca_pem is None): 3206 _Fail("Cluster certificate can only be used for both key and CA") 3207 3208 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \ 3209 _GetImportExportIoCommand(instance, mode, ieio, ieioargs) 3210 3211 if opts.key_name is None: 3212 # Use server.pem 3213 key_path = constants.NODED_CERT_FILE 3214 cert_path = constants.NODED_CERT_FILE 3215 assert opts.ca_pem is None 3216 else: 3217 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR, 3218 opts.key_name) 3219 assert opts.ca_pem is not None 3220 3221 for i in [key_path, cert_path]: 3222 if not os.path.exists(i): 3223 _Fail("File '%s' does not exist" % i) 3224 3225 status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component)) 3226 try: 3227 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE) 3228 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE) 3229 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE) 3230 3231 if opts.ca_pem is None: 3232 # Use server.pem 3233 ca = utils.ReadFile(constants.NODED_CERT_FILE) 3234 else: 3235 ca = opts.ca_pem 3236 3237 # Write CA file 3238 utils.WriteFile(ca_file, data=ca, mode=0400) 3239 3240 cmd = [ 3241 constants.IMPORT_EXPORT_DAEMON, 3242 status_file, mode, 3243 "--key=%s" % key_path, 3244 "--cert=%s" % cert_path, 3245 "--ca=%s" % ca_file, 3246 ] 3247 3248 if host: 3249 cmd.append("--host=%s" % host) 3250 3251 if port: 3252 cmd.append("--port=%s" % port) 3253 3254 if opts.ipv6: 3255 cmd.append("--ipv6") 3256 else: 3257 cmd.append("--ipv4") 3258 3259 if opts.compress: 3260 cmd.append("--compress=%s" % opts.compress) 3261 3262 if opts.magic: 3263 cmd.append("--magic=%s" % opts.magic) 3264 3265 if exp_size is not None: 3266 cmd.append("--expected-size=%s" % exp_size) 3267 3268 if cmd_prefix: 3269 cmd.append("--cmd-prefix=%s" % cmd_prefix) 3270 3271 if cmd_suffix: 3272 cmd.append("--cmd-suffix=%s" % cmd_suffix) 3273 3274 if mode == constants.IEM_EXPORT: 3275 # Retry connection a few times when connecting to remote peer 3276 cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES) 3277 cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT) 3278 elif opts.connect_timeout is not None: 3279 assert mode == constants.IEM_IMPORT 3280 # Overall timeout for establishing connection while listening 3281 cmd.append("--connect-timeout=%s" % opts.connect_timeout) 3282 3283 logfile = _InstanceLogName(prefix, instance.os, instance.name, component) 3284 3285 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has 3286 # support for receiving a file descriptor for output 3287 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file, 3288 output=logfile) 3289 3290 # The import/export name is simply the status directory name 3291 return os.path.basename(status_dir) 3292 3293 except Exception: 3294 shutil.rmtree(status_dir, ignore_errors=True) 3295 raise
3296
3297 3298 -def GetImportExportStatus(names):
3299 """Returns import/export daemon status. 3300 3301 @type names: sequence 3302 @param names: List of names 3303 @rtype: List of dicts 3304 @return: Returns a list of the state of each named import/export or None if a 3305 status couldn't be read 3306 3307 """ 3308 result = [] 3309 3310 for name in names: 3311 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name, 3312 _IES_STATUS_FILE) 3313 3314 try: 3315 data = utils.ReadFile(status_file) 3316 except EnvironmentError, err: 3317 if err.errno != errno.ENOENT: 3318 raise 3319 data = None 3320 3321 if not data: 3322 result.append(None) 3323 continue 3324 3325 result.append(serializer.LoadJson(data)) 3326 3327 return result
3328
3329 3330 -def AbortImportExport(name):
3331 """Sends SIGTERM to a running import/export daemon. 3332 3333 """ 3334 logging.info("Abort import/export %s", name) 3335 3336 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name) 3337 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE)) 3338 3339 if pid: 3340 logging.info("Import/export %s is running with PID %s, sending SIGTERM", 3341 name, pid) 3342 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3343
3344 3345 -def CleanupImportExport(name):
3346 """Cleanup after an import or export. 3347 3348 If the import/export daemon is still running it's killed. Afterwards the 3349 whole status directory is removed. 3350 3351 """ 3352 logging.info("Finalizing import/export %s", name) 3353 3354 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name) 3355 3356 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE)) 3357 3358 if pid: 3359 logging.info("Import/export %s is still running with PID %s", 3360 name, pid) 3361 utils.KillProcess(pid, waitpid=False) 3362 3363 shutil.rmtree(status_dir, ignore_errors=True)
3364
3365 3366 -def _FindDisks(nodes_ip, disks):
3367 """Sets the physical ID on disks and returns the block devices. 3368 3369 """ 3370 # set the correct physical ID 3371 my_name = netutils.Hostname.GetSysName() 3372 for cf in disks: 3373 cf.SetPhysicalID(my_name, nodes_ip) 3374 3375 bdevs = [] 3376 3377 for cf in disks: 3378 rd = _RecursiveFindBD(cf) 3379 if rd is None: 3380 _Fail("Can't find device %s", cf) 3381 bdevs.append(rd) 3382 return bdevs
3383
3384 3385 -def DrbdDisconnectNet(nodes_ip, disks):
3386 """Disconnects the network on a list of drbd devices. 3387 3388 """ 3389 bdevs = _FindDisks(nodes_ip, disks) 3390 3391 # disconnect disks 3392 for rd in bdevs: 3393 try: 3394 rd.DisconnectNet() 3395 except errors.BlockDeviceError, err: 3396 _Fail("Can't change network configuration to standalone mode: %s", 3397 err, exc=True)
3398
3399 3400 -def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3401 """Attaches the network on a list of drbd devices. 3402 3403 """ 3404 bdevs = _FindDisks(nodes_ip, disks) 3405 3406 if multimaster: 3407 for idx, rd in enumerate(bdevs): 3408 try: 3409 _SymlinkBlockDev(instance_name, rd.dev_path, idx) 3410 except EnvironmentError, err: 3411 _Fail("Can't create symlink: %s", err) 3412 # reconnect disks, switch to new master configuration and if 3413 # needed primary mode 3414 for rd in bdevs: 3415 try: 3416 rd.AttachNet(multimaster) 3417 except errors.BlockDeviceError, err: 3418 _Fail("Can't change network configuration: %s", err) 3419 3420 # wait until the disks are connected; we need to retry the re-attach 3421 # if the device becomes standalone, as this might happen if the one 3422 # node disconnects and reconnects in a different mode before the 3423 # other node reconnects; in this case, one or both of the nodes will 3424 # decide it has wrong configuration and switch to standalone 3425 3426 def _Attach(): 3427 all_connected = True 3428 3429 for rd in bdevs: 3430 stats = rd.GetProcStatus() 3431 3432 all_connected = (all_connected and 3433 (stats.is_connected or stats.is_in_resync)) 3434 3435 if stats.is_standalone: 3436 # peer had different config info and this node became 3437 # standalone, even though this should not happen with the 3438 # new staged way of changing disk configs 3439 try: 3440 rd.AttachNet(multimaster) 3441 except errors.BlockDeviceError, err: 3442 _Fail("Can't change network configuration: %s", err) 3443 3444 if not all_connected: 3445 raise utils.RetryAgain()
3446 3447 try: 3448 # Start with a delay of 100 miliseconds and go up to 5 seconds 3449 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60) 3450 except utils.RetryTimeout: 3451 _Fail("Timeout in disk reconnecting") 3452 3453 if multimaster: 3454 # change to primary mode 3455 for rd in bdevs: 3456 try: 3457 rd.Open() 3458 except errors.BlockDeviceError, err: 3459 _Fail("Can't change to primary mode: %s", err) 3460
3461 3462 -def DrbdWaitSync(nodes_ip, disks):
3463 """Wait until DRBDs have synchronized. 3464 3465 """ 3466 def _helper(rd): 3467 stats = rd.GetProcStatus() 3468 if not (stats.is_connected or stats.is_in_resync): 3469 raise utils.RetryAgain() 3470 return stats
3471 3472 bdevs = _FindDisks(nodes_ip, disks) 3473 3474 min_resync = 100 3475 alldone = True 3476 for rd in bdevs: 3477 try: 3478 # poll each second for 15 seconds 3479 stats = utils.Retry(_helper, 1, 15, args=[rd]) 3480 except utils.RetryTimeout: 3481 stats = rd.GetProcStatus() 3482 # last check 3483 if not (stats.is_connected or stats.is_in_resync): 3484 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats) 3485 alldone = alldone and (not stats.is_in_resync) 3486 if stats.sync_percent is not None: 3487 min_resync = min(min_resync, stats.sync_percent) 3488 3489 return (alldone, min_resync) 3490
3491 3492 -def GetDrbdUsermodeHelper():
3493 """Returns DRBD usermode helper currently configured. 3494 3495 """ 3496 try: 3497 return bdev.BaseDRBD.GetUsermodeHelper() 3498 except errors.BlockDeviceError, err: 3499 _Fail(str(err