Package ganeti :: Module backend
[hide private]
[frames] | no frames]

Source Code for Module ganeti.backend

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Functions used by the node daemon 
  23   
  24  @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in 
  25       the L{UploadFile} function 
  26   
  27  """ 
  28   
  29   
  30  import os 
  31  import os.path 
  32  import shutil 
  33  import time 
  34  import stat 
  35  import errno 
  36  import re 
  37  import subprocess 
  38  import random 
  39  import logging 
  40  import tempfile 
  41  import zlib 
  42  import base64 
  43   
  44  from ganeti import errors 
  45  from ganeti import utils 
  46  from ganeti import ssh 
  47  from ganeti import hypervisor 
  48  from ganeti import constants 
  49  from ganeti import bdev 
  50  from ganeti import objects 
  51  from ganeti import ssconf 
52 53 54 -def _GetConfig():
55 """Simple wrapper to return a SimpleStore. 56 57 @rtype: L{ssconf.SimpleStore} 58 @return: a SimpleStore instance 59 60 """ 61 return ssconf.SimpleStore()
62
63 64 -def _GetSshRunner(cluster_name):
65 """Simple wrapper to return an SshRunner. 66 67 @type cluster_name: str 68 @param cluster_name: the cluster name, which is needed 69 by the SshRunner constructor 70 @rtype: L{ssh.SshRunner} 71 @return: an SshRunner instance 72 73 """ 74 return ssh.SshRunner(cluster_name)
75
76 77 -def _Decompress(data):
78 """Unpacks data compressed by the RPC client. 79 80 @type data: list or tuple 81 @param data: Data sent by RPC client 82 @rtype: str 83 @return: Decompressed data 84 85 """ 86 assert isinstance(data, (list, tuple)) 87 assert len(data) == 2 88 (encoding, content) = data 89 if encoding == constants.RPC_ENCODING_NONE: 90 return content 91 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64: 92 return zlib.decompress(base64.b64decode(content)) 93 else: 94 raise AssertionError("Unknown data encoding")
95
96 97 -def _CleanDirectory(path, exclude=None):
98 """Removes all regular files in a directory. 99 100 @type path: str 101 @param path: the directory to clean 102 @type exclude: list 103 @param exclude: list of files to be excluded, defaults 104 to the empty list 105 106 """ 107 if not os.path.isdir(path): 108 return 109 if exclude is None: 110 exclude = [] 111 else: 112 # Normalize excluded paths 113 exclude = [os.path.normpath(i) for i in exclude] 114 115 for rel_name in utils.ListVisibleFiles(path): 116 full_name = os.path.normpath(os.path.join(path, rel_name)) 117 if full_name in exclude: 118 continue 119 if os.path.isfile(full_name) and not os.path.islink(full_name): 120 utils.RemoveFile(full_name)
121
122 123 -def _BuildUploadFileList():
124 """Build the list of allowed upload files. 125 126 This is abstracted so that it's built only once at module import time. 127 128 """ 129 return frozenset([ 130 constants.CLUSTER_CONF_FILE, 131 constants.ETC_HOSTS, 132 constants.SSH_KNOWN_HOSTS_FILE, 133 constants.VNC_PASSWORD_FILE, 134 ])
135 136 137 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
138 139 140 -def JobQueuePurge():
141 """Removes job queue files and archived jobs. 142 143 @rtype: None 144 145 """ 146 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE]) 147 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
148
149 150 -def GetMasterInfo():
151 """Returns master information. 152 153 This is an utility function to compute master information, either 154 for consumption here or from the node daemon. 155 156 @rtype: tuple 157 @return: (master_netdev, master_ip, master_name) if we have a good 158 configuration, otherwise (None, None, None) 159 160 """ 161 try: 162 cfg = _GetConfig() 163 master_netdev = cfg.GetMasterNetdev() 164 master_ip = cfg.GetMasterIP() 165 master_node = cfg.GetMasterNode() 166 except errors.ConfigurationError: 167 logging.exception("Cluster configuration incomplete") 168 return (None, None, None) 169 return (master_netdev, master_ip, master_node)
170
171 172 -def StartMaster(start_daemons, no_voting):
173 """Activate local node as master node. 174 175 The function will always try activate the IP address of the master 176 (unless someone else has it). It will also start the master daemons, 177 based on the start_daemons parameter. 178 179 @type start_daemons: boolean 180 @param start_daemons: whther to also start the master 181 daemons (ganeti-masterd and ganeti-rapi) 182 @type no_voting: boolean 183 @param no_voting: whether to start ganeti-masterd without a node vote 184 (if start_daemons is True), but still non-interactively 185 @rtype: None 186 187 """ 188 ok = True 189 master_netdev, master_ip, _ = GetMasterInfo() 190 if not master_netdev: 191 return False 192 193 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT): 194 if utils.OwnIpAddress(master_ip): 195 # we already have the ip: 196 logging.debug("Already started") 197 else: 198 logging.error("Someone else has the master ip, not activating") 199 ok = False 200 else: 201 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip, 202 "dev", master_netdev, "label", 203 "%s:0" % master_netdev]) 204 if result.failed: 205 logging.error("Can't activate master IP: %s", result.output) 206 ok = False 207 208 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, 209 "-s", master_ip, master_ip]) 210 # we'll ignore the exit code of arping 211 212 # and now start the master and rapi daemons 213 if start_daemons: 214 daemons_params = { 215 'ganeti-masterd': [], 216 'ganeti-rapi': [], 217 } 218 if no_voting: 219 daemons_params['ganeti-masterd'].append('--no-voting') 220 daemons_params['ganeti-masterd'].append('--yes-do-it') 221 for daemon in daemons_params: 222 cmd = [daemon] 223 cmd.extend(daemons_params[daemon]) 224 result = utils.RunCmd(cmd) 225 if result.failed: 226 logging.error("Can't start daemon %s: %s", daemon, result.output) 227 ok = False 228 return ok
229
230 231 -def StopMaster(stop_daemons):
232 """Deactivate this node as master. 233 234 The function will always try to deactivate the IP address of the 235 master. It will also stop the master daemons depending on the 236 stop_daemons parameter. 237 238 @type stop_daemons: boolean 239 @param stop_daemons: whether to also stop the master daemons 240 (ganeti-masterd and ganeti-rapi) 241 @rtype: None 242 243 """ 244 master_netdev, master_ip, _ = GetMasterInfo() 245 if not master_netdev: 246 return False 247 248 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip, 249 "dev", master_netdev]) 250 if result.failed: 251 logging.error("Can't remove the master IP, error: %s", result.output) 252 # but otherwise ignore the failure 253 254 if stop_daemons: 255 # stop/kill the rapi and the master daemon 256 for daemon in constants.RAPI_PID, constants.MASTERD_PID: 257 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon))) 258 259 return True
260
261 262 -def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
263 """Joins this node to the cluster. 264 265 This does the following: 266 - updates the hostkeys of the machine (rsa and dsa) 267 - adds the ssh private key to the user 268 - adds the ssh public key to the users' authorized_keys file 269 270 @type dsa: str 271 @param dsa: the DSA private key to write 272 @type dsapub: str 273 @param dsapub: the DSA public key to write 274 @type rsa: str 275 @param rsa: the RSA private key to write 276 @type rsapub: str 277 @param rsapub: the RSA public key to write 278 @type sshkey: str 279 @param sshkey: the SSH private key to write 280 @type sshpub: str 281 @param sshpub: the SSH public key to write 282 @rtype: boolean 283 @return: the success of the operation 284 285 """ 286 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600), 287 (constants.SSH_HOST_RSA_PUB, rsapub, 0644), 288 (constants.SSH_HOST_DSA_PRIV, dsa, 0600), 289 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)] 290 for name, content, mode in sshd_keys: 291 utils.WriteFile(name, data=content, mode=mode) 292 293 try: 294 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS, 295 mkdir=True) 296 except errors.OpExecError, err: 297 msg = "Error while processing user ssh files" 298 logging.exception(msg) 299 return (False, "%s: %s" % (msg, err)) 300 301 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]: 302 utils.WriteFile(name, data=content, mode=0600) 303 304 utils.AddAuthorizedKey(auth_keys, sshpub) 305 306 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"]) 307 308 return (True, "Node added successfully")
309
310 311 -def LeaveCluster():
312 """Cleans up and remove the current node. 313 314 This function cleans up and prepares the current node to be removed 315 from the cluster. 316 317 If processing is successful, then it raises an 318 L{errors.QuitGanetiException} which is used as a special case to 319 shutdown the node daemon. 320 321 """ 322 _CleanDirectory(constants.DATA_DIR) 323 JobQueuePurge() 324 325 try: 326 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS) 327 except errors.OpExecError: 328 logging.exception("Error while processing ssh files") 329 return 330 331 f = open(pub_key, 'r') 332 try: 333 utils.RemoveAuthorizedKey(auth_keys, f.read(8192)) 334 finally: 335 f.close() 336 337 utils.RemoveFile(priv_key) 338 utils.RemoveFile(pub_key) 339 340 # Return a reassuring string to the caller, and quit 341 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
342
343 344 -def GetNodeInfo(vgname, hypervisor_type):
345 """Gives back a hash with different information about the node. 346 347 @type vgname: C{string} 348 @param vgname: the name of the volume group to ask for disk space information 349 @type hypervisor_type: C{str} 350 @param hypervisor_type: the name of the hypervisor to ask for 351 memory information 352 @rtype: C{dict} 353 @return: dictionary with the following keys: 354 - vg_size is the size of the configured volume group in MiB 355 - vg_free is the free size of the volume group in MiB 356 - memory_dom0 is the memory allocated for domain0 in MiB 357 - memory_free is the currently available (free) ram in MiB 358 - memory_total is the total number of ram in MiB 359 360 """ 361 outputarray = {} 362 vginfo = _GetVGInfo(vgname) 363 outputarray['vg_size'] = vginfo['vg_size'] 364 outputarray['vg_free'] = vginfo['vg_free'] 365 366 hyper = hypervisor.GetHypervisor(hypervisor_type) 367 hyp_info = hyper.GetNodeInfo() 368 if hyp_info is not None: 369 outputarray.update(hyp_info) 370 371 f = open("/proc/sys/kernel/random/boot_id", 'r') 372 try: 373 outputarray["bootid"] = f.read(128).rstrip("\n") 374 finally: 375 f.close() 376 377 return outputarray
378
379 380 -def VerifyNode(what, cluster_name):
381 """Verify the status of the local node. 382 383 Based on the input L{what} parameter, various checks are done on the 384 local node. 385 386 If the I{filelist} key is present, this list of 387 files is checksummed and the file/checksum pairs are returned. 388 389 If the I{nodelist} key is present, we check that we have 390 connectivity via ssh with the target nodes (and check the hostname 391 report). 392 393 If the I{node-net-test} key is present, we check that we have 394 connectivity to the given nodes via both primary IP and, if 395 applicable, secondary IPs. 396 397 @type what: C{dict} 398 @param what: a dictionary of things to check: 399 - filelist: list of files for which to compute checksums 400 - nodelist: list of nodes we should check ssh communication with 401 - node-net-test: list of nodes we should check node daemon port 402 connectivity with 403 - hypervisor: list with hypervisors to run the verify for 404 @rtype: dict 405 @return: a dictionary with the same keys as the input dict, and 406 values representing the result of the checks 407 408 """ 409 result = {} 410 411 if constants.NV_HYPERVISOR in what: 412 result[constants.NV_HYPERVISOR] = tmp = {} 413 for hv_name in what[constants.NV_HYPERVISOR]: 414 tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify() 415 416 if constants.NV_FILELIST in what: 417 result[constants.NV_FILELIST] = utils.FingerprintFiles( 418 what[constants.NV_FILELIST]) 419 420 if constants.NV_NODELIST in what: 421 result[constants.NV_NODELIST] = tmp = {} 422 random.shuffle(what[constants.NV_NODELIST]) 423 for node in what[constants.NV_NODELIST]: 424 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node) 425 if not success: 426 tmp[node] = message 427 428 if constants.NV_NODENETTEST in what: 429 result[constants.NV_NODENETTEST] = tmp = {} 430 my_name = utils.HostInfo().name 431 my_pip = my_sip = None 432 for name, pip, sip in what[constants.NV_NODENETTEST]: 433 if name == my_name: 434 my_pip = pip 435 my_sip = sip 436 break 437 if not my_pip: 438 tmp[my_name] = ("Can't find my own primary/secondary IP" 439 " in the node list") 440 else: 441 port = utils.GetNodeDaemonPort() 442 for name, pip, sip in what[constants.NV_NODENETTEST]: 443 fail = [] 444 if not utils.TcpPing(pip, port, source=my_pip): 445 fail.append("primary") 446 if sip != pip: 447 if not utils.TcpPing(sip, port, source=my_sip): 448 fail.append("secondary") 449 if fail: 450 tmp[name] = ("failure using the %s interface(s)" % 451 " and ".join(fail)) 452 453 if constants.NV_LVLIST in what: 454 result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST]) 455 456 if constants.NV_INSTANCELIST in what: 457 result[constants.NV_INSTANCELIST] = GetInstanceList( 458 what[constants.NV_INSTANCELIST]) 459 460 if constants.NV_VGLIST in what: 461 result[constants.NV_VGLIST] = ListVolumeGroups() 462 463 if constants.NV_VERSION in what: 464 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION, 465 constants.RELEASE_VERSION) 466 467 if constants.NV_HVINFO in what: 468 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO]) 469 result[constants.NV_HVINFO] = hyper.GetNodeInfo() 470 471 if constants.NV_DRBDLIST in what: 472 try: 473 used_minors = bdev.DRBD8.GetUsedDevs().keys() 474 except errors.BlockDeviceError, err: 475 logging.warning("Can't get used minors list", exc_info=True) 476 used_minors = str(err) 477 result[constants.NV_DRBDLIST] = used_minors 478 479 return result
480
481 482 -def GetVolumeList(vg_name):
483 """Compute list of logical volumes and their size. 484 485 @type vg_name: str 486 @param vg_name: the volume group whose LVs we should list 487 @rtype: dict 488 @return: 489 dictionary of all partions (key) with value being a tuple of 490 their size (in MiB), inactive and online status:: 491 492 {'test1': ('20.06', True, True)} 493 494 in case of errors, a string is returned with the error 495 details. 496 497 """ 498 lvs = {} 499 sep = '|' 500 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix", 501 "--separator=%s" % sep, 502 "-olv_name,lv_size,lv_attr", vg_name]) 503 if result.failed: 504 logging.error("Failed to list logical volumes, lvs output: %s", 505 result.output) 506 return result.output 507 508 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$") 509 for line in result.stdout.splitlines(): 510 line = line.strip() 511 match = valid_line_re.match(line) 512 if not match: 513 logging.error("Invalid line returned from lvs output: '%s'", line) 514 continue 515 name, size, attr = match.groups() 516 inactive = attr[4] == '-' 517 online = attr[5] == 'o' 518 lvs[name] = (size, inactive, online) 519 520 return lvs
521
522 523 -def ListVolumeGroups():
524 """List the volume groups and their size. 525 526 @rtype: dict 527 @return: dictionary with keys volume name and values the 528 size of the volume 529 530 """ 531 return utils.ListVolumeGroups()
532
533 534 -def NodeVolumes():
535 """List all volumes on this node. 536 537 @rtype: list 538 @return: 539 A list of dictionaries, each having four keys: 540 - name: the logical volume name, 541 - size: the size of the logical volume 542 - dev: the physical device on which the LV lives 543 - vg: the volume group to which it belongs 544 545 In case of errors, we return an empty list and log the 546 error. 547 548 Note that since a logical volume can live on multiple physical 549 volumes, the resulting list might include a logical volume 550 multiple times. 551 552 """ 553 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix", 554 "--separator=|", 555 "--options=lv_name,lv_size,devices,vg_name"]) 556 if result.failed: 557 logging.error("Failed to list logical volumes, lvs output: %s", 558 result.output) 559 return [] 560 561 def parse_dev(dev): 562 if '(' in dev: 563 return dev.split('(')[0] 564 else: 565 return dev
566 567 def map_line(line): 568 return { 569 'name': line[0].strip(), 570 'size': line[1].strip(), 571 'dev': parse_dev(line[2].strip()), 572 'vg': line[3].strip(), 573 } 574 575 return [map_line(line.split('|')) for line in result.stdout.splitlines() 576 if line.count('|') >= 3] 577
578 579 -def BridgesExist(bridges_list):
580 """Check if a list of bridges exist on the current node. 581 582 @rtype: boolean 583 @return: C{True} if all of them exist, C{False} otherwise 584 585 """ 586 for bridge in bridges_list: 587 if not utils.BridgeExists(bridge): 588 return False 589 590 return True
591
592 593 -def GetInstanceList(hypervisor_list):
594 """Provides a list of instances. 595 596 @type hypervisor_list: list 597 @param hypervisor_list: the list of hypervisors to query information 598 599 @rtype: list 600 @return: a list of all running instances on the current node 601 - instance1.example.com 602 - instance2.example.com 603 604 """ 605 results = [] 606 for hname in hypervisor_list: 607 try: 608 names = hypervisor.GetHypervisor(hname).ListInstances() 609 results.extend(names) 610 except errors.HypervisorError: 611 logging.exception("Error enumerating instances for hypevisor %s", hname) 612 raise 613 614 return results
615
616 617 -def GetInstanceInfo(instance, hname):
618 """Gives back the information about an instance as a dictionary. 619 620 @type instance: string 621 @param instance: the instance name 622 @type hname: string 623 @param hname: the hypervisor type of the instance 624 625 @rtype: dict 626 @return: dictionary with the following keys: 627 - memory: memory size of instance (int) 628 - state: xen state of instance (string) 629 - time: cpu time of instance (float) 630 631 """ 632 output = {} 633 634 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance) 635 if iinfo is not None: 636 output['memory'] = iinfo[2] 637 output['state'] = iinfo[4] 638 output['time'] = iinfo[5] 639 640 return output
641
642 643 -def GetInstanceMigratable(instance):
644 """Gives whether an instance can be migrated. 645 646 @type instance: L{objects.Instance} 647 @param instance: object representing the instance to be checked. 648 649 @rtype: tuple 650 @return: tuple of (result, description) where: 651 - result: whether the instance can be migrated or not 652 - description: a description of the issue, if relevant 653 654 """ 655 hyper = hypervisor.GetHypervisor(instance.hypervisor) 656 if instance.name not in hyper.ListInstances(): 657 return (False, 'not running') 658 659 for idx in range(len(instance.disks)): 660 link_name = _GetBlockDevSymlinkPath(instance.name, idx) 661 if not os.path.islink(link_name): 662 return (False, 'not restarted since ganeti 1.2.5') 663 664 return (True, '')
665
666 667 -def GetAllInstancesInfo(hypervisor_list):
668 """Gather data about all instances. 669 670 This is the equivalent of L{GetInstanceInfo}, except that it 671 computes data for all instances at once, thus being faster if one 672 needs data about more than one instance. 673 674 @type hypervisor_list: list 675 @param hypervisor_list: list of hypervisors to query for instance data 676 677 @rtype: dict 678 @return: dictionary of instance: data, with data having the following keys: 679 - memory: memory size of instance (int) 680 - state: xen state of instance (string) 681 - time: cpu time of instance (float) 682 - vcpus: the number of vcpus 683 684 """ 685 output = {} 686 687 for hname in hypervisor_list: 688 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo() 689 if iinfo: 690 for name, inst_id, memory, vcpus, state, times in iinfo: 691 value = { 692 'memory': memory, 693 'vcpus': vcpus, 694 'state': state, 695 'time': times, 696 } 697 if name in output: 698 # we only check static parameters, like memory and vcpus, 699 # and not state and time which can change between the 700 # invocations of the different hypervisors 701 for key in 'memory', 'vcpus': 702 if value[key] != output[name][key]: 703 raise errors.HypervisorError("Instance %s is running twice" 704 " with different parameters" % name) 705 output[name] = value 706 707 return output
708
709 710 -def InstanceOsAdd(instance):
711 """Add an OS to an instance. 712 713 @type instance: L{objects.Instance} 714 @param instance: Instance whose OS is to be installed 715 @rtype: boolean 716 @return: the success of the operation 717 718 """ 719 try: 720 inst_os = OSFromDisk(instance.os) 721 except errors.InvalidOS, err: 722 os_name, os_dir, os_err = err.args 723 if os_dir is None: 724 return (False, "Can't find OS '%s': %s" % (os_name, os_err)) 725 else: 726 return (False, "Error parsing OS '%s' in directory %s: %s" % 727 (os_name, os_dir, os_err)) 728 729 create_env = OSEnvironment(instance) 730 731 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os, 732 instance.name, int(time.time())) 733 734 result = utils.RunCmd([inst_os.create_script], env=create_env, 735 cwd=inst_os.path, output=logfile,) 736 if result.failed: 737 logging.error("os create command '%s' returned error: %s, logfile: %s," 738 " output: %s", result.cmd, result.fail_reason, logfile, 739 result.output) 740 lines = [utils.SafeEncode(val) 741 for val in utils.TailFile(logfile, lines=20)] 742 return (False, "OS create script failed (%s), last lines in the" 743 " log file:\n%s" % (result.fail_reason, "\n".join(lines))) 744 745 return (True, "Successfully installed")
746
747 748 -def RunRenameInstance(instance, old_name):
749 """Run the OS rename script for an instance. 750 751 @type instance: L{objects.Instance} 752 @param instance: Instance whose OS is to be installed 753 @type old_name: string 754 @param old_name: previous instance name 755 @rtype: boolean 756 @return: the success of the operation 757 758 """ 759 inst_os = OSFromDisk(instance.os) 760 761 rename_env = OSEnvironment(instance) 762 rename_env['OLD_INSTANCE_NAME'] = old_name 763 764 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os, 765 old_name, 766 instance.name, int(time.time())) 767 768 result = utils.RunCmd([inst_os.rename_script], env=rename_env, 769 cwd=inst_os.path, output=logfile) 770 771 if result.failed: 772 logging.error("os create command '%s' returned error: %s output: %s", 773 result.cmd, result.fail_reason, result.output) 774 lines = [utils.SafeEncode(val) 775 for val in utils.TailFile(logfile, lines=20)] 776 return (False, "OS rename script failed (%s), last lines in the" 777 " log file:\n%s" % (result.fail_reason, "\n".join(lines))) 778 779 return (True, "Rename successful")
780
781 782 -def _GetVGInfo(vg_name):
783 """Get information about the volume group. 784 785 @type vg_name: str 786 @param vg_name: the volume group which we query 787 @rtype: dict 788 @return: 789 A dictionary with the following keys: 790 - C{vg_size} is the total size of the volume group in MiB 791 - C{vg_free} is the free size of the volume group in MiB 792 - C{pv_count} are the number of physical disks in that VG 793 794 If an error occurs during gathering of data, we return the same dict 795 with keys all set to None. 796 797 """ 798 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"]) 799 800 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings", 801 "--nosuffix", "--units=m", "--separator=:", vg_name]) 802 803 if retval.failed: 804 logging.error("volume group %s not present", vg_name) 805 return retdic 806 valarr = retval.stdout.strip().rstrip(':').split(':') 807 if len(valarr) == 3: 808 try: 809 retdic = { 810 "vg_size": int(round(float(valarr[0]), 0)), 811 "vg_free": int(round(float(valarr[1]), 0)), 812 "pv_count": int(valarr[2]), 813 } 814 except (TypeError, ValueError), err: 815 logging.exception("Fail to parse vgs output") 816 else: 817 logging.error("vgs output has the wrong number of fields (expected" 818 " three): %s", str(valarr)) 819 return retdic
820
821 822 -def _GetBlockDevSymlinkPath(instance_name, idx):
823 return os.path.join(constants.DISK_LINKS_DIR, 824 "%s:%d" % (instance_name, idx))
825
826 827 -def _SymlinkBlockDev(instance_name, device_path, idx):
828 """Set up symlinks to a instance's block device. 829 830 This is an auxiliary function run when an instance is start (on the primary 831 node) or when an instance is migrated (on the target node). 832 833 834 @param instance_name: the name of the target instance 835 @param device_path: path of the physical block device, on the node 836 @param idx: the disk index 837 @return: absolute path to the disk's symlink 838 839 """ 840 link_name = _GetBlockDevSymlinkPath(instance_name, idx) 841 try: 842 os.symlink(device_path, link_name) 843 except OSError, err: 844 if err.errno == errno.EEXIST: 845 if (not os.path.islink(link_name) or 846 os.readlink(link_name) != device_path): 847 os.remove(link_name) 848 os.symlink(device_path, link_name) 849 else: 850 raise 851 852 return link_name
853 866
867 868 -def _GatherAndLinkBlockDevs(instance):
869 """Set up an instance's block device(s). 870 871 This is run on the primary node at instance startup. The block 872 devices must be already assembled. 873 874 @type instance: L{objects.Instance} 875 @param instance: the instance whose disks we shoul assemble 876 @rtype: list 877 @return: list of (disk_object, device_path) 878 879 """ 880 block_devices = [] 881 for idx, disk in enumerate(instance.disks): 882 device = _RecursiveFindBD(disk) 883 if device is None: 884 raise errors.BlockDeviceError("Block device '%s' is not set up." % 885 str(disk)) 886 device.Open() 887 try: 888 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx) 889 except OSError, e: 890 raise errors.BlockDeviceError("Cannot create block device symlink: %s" % 891 e.strerror) 892 893 block_devices.append((disk, link_name)) 894 895 return block_devices
896
897 898 -def StartInstance(instance):
899 """Start an instance. 900 901 @type instance: L{objects.Instance} 902 @param instance: the instance object 903 @rtype: boolean 904 @return: whether the startup was successful or not 905 906 """ 907 running_instances = GetInstanceList([instance.hypervisor]) 908 909 if instance.name in running_instances: 910 return (True, "Already running") 911 912 try: 913 block_devices = _GatherAndLinkBlockDevs(instance) 914 hyper = hypervisor.GetHypervisor(instance.hypervisor) 915 hyper.StartInstance(instance, block_devices) 916 except errors.BlockDeviceError, err: 917 logging.exception("Failed to start instance") 918 return (False, "Block device error: %s" % str(err)) 919 except errors.HypervisorError, err: 920 logging.exception("Failed to start instance") 921 _RemoveBlockDevLinks(instance.name, instance.disks) 922 return (False, "Hypervisor error: %s" % str(err)) 923 924 return (True, "Instance started successfully")
925
926 927 -def InstanceShutdown(instance):
928 """Shut an instance down. 929 930 @note: this functions uses polling with a hardcoded timeout. 931 932 @type instance: L{objects.Instance} 933 @param instance: the instance object 934 @rtype: boolean 935 @return: whether the startup was successful or not 936 937 """ 938 hv_name = instance.hypervisor 939 running_instances = GetInstanceList([hv_name]) 940 941 if instance.name not in running_instances: 942 return (True, "Instance already stopped") 943 944 hyper = hypervisor.GetHypervisor(hv_name) 945 try: 946 hyper.StopInstance(instance) 947 except errors.HypervisorError, err: 948 msg = "Failed to stop instance %s: %s" % (instance.name, err) 949 logging.error(msg) 950 return (False, msg) 951 952 # test every 10secs for 2min 953 954 time.sleep(1) 955 for _ in range(11): 956 if instance.name not in GetInstanceList([hv_name]): 957 break 958 time.sleep(10) 959 else: 960 # the shutdown did not succeed 961 logging.error("Shutdown of '%s' unsuccessful, using destroy", 962 instance.name) 963 964 try: 965 hyper.StopInstance(instance, force=True) 966 except errors.HypervisorError, err: 967 msg = "Failed to force stop instance %s: %s" % (instance.name, err) 968 logging.error(msg) 969 return (False, msg) 970 971 time.sleep(1) 972 if instance.name in GetInstanceList([hv_name]): 973 msg = ("Could not shutdown instance %s even by destroy" % 974 instance.name) 975 logging.error(msg) 976 return (False, msg) 977 978 _RemoveBlockDevLinks(instance.name, instance.disks) 979 980 return (True, "Instance has been shutdown successfully")
981
982 983 -def InstanceReboot(instance, reboot_type):
984 """Reboot an instance. 985 986 @type instance: L{objects.Instance} 987 @param instance: the instance object to reboot 988 @type reboot_type: str 989 @param reboot_type: the type of reboot, one the following 990 constants: 991 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the 992 instance OS, do not recreate the VM 993 - L{constants.INSTANCE_REBOOT_HARD}: tear down and 994 restart the VM (at the hypervisor level) 995 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is 996 not accepted here, since that mode is handled differently, in 997 cmdlib, and translates into full stop and start of the 998 instance (instead of a call_instance_reboot RPC) 999 @rtype: boolean 1000 @return: the success of the operation 1001 1002 """ 1003 running_instances = GetInstanceList([instance.hypervisor]) 1004 1005 if instance.name not in running_instances: 1006 msg = "Cannot reboot instance %s that is not running" % instance.name 1007 logging.error(msg) 1008 return (False, msg) 1009 1010 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1011 if reboot_type == constants.INSTANCE_REBOOT_SOFT: 1012 try: 1013 hyper.RebootInstance(instance) 1014 except errors.HypervisorError, err: 1015 msg = "Failed to soft reboot instance %s: %s" % (instance.name, err) 1016 logging.error(msg) 1017 return (False, msg) 1018 elif reboot_type == constants.INSTANCE_REBOOT_HARD: 1019 try: 1020 stop_result = InstanceShutdown(instance) 1021 if not stop_result[0]: 1022 return stop_result 1023 return StartInstance(instance) 1024 except errors.HypervisorError, err: 1025 msg = "Failed to hard reboot instance %s: %s" % (instance.name, err) 1026 logging.error(msg) 1027 return (False, msg) 1028 else: 1029 return (False, "Invalid reboot_type received: %s" % (reboot_type,)) 1030 1031 return (True, "Reboot successful")
1032
1033 1034 -def MigrationInfo(instance):
1035 """Gather information about an instance to be migrated. 1036 1037 @type instance: L{objects.Instance} 1038 @param instance: the instance definition 1039 1040 """ 1041 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1042 try: 1043 info = hyper.MigrationInfo(instance) 1044 except errors.HypervisorError, err: 1045 msg = "Failed to fetch migration information" 1046 logging.exception(msg) 1047 return (False, '%s: %s' % (msg, err)) 1048 return (True, info)
1049
1050 1051 -def AcceptInstance(instance, info, target):
1052 """Prepare the node to accept an instance. 1053 1054 @type instance: L{objects.Instance} 1055 @param instance: the instance definition 1056 @type info: string/data (opaque) 1057 @param info: migration information, from the source node 1058 @type target: string 1059 @param target: target host (usually ip), on this node 1060 1061 """ 1062 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1063 try: 1064 hyper.AcceptInstance(instance, info, target) 1065 except errors.HypervisorError, err: 1066 msg = "Failed to accept instance" 1067 logging.exception(msg) 1068 return (False, '%s: %s' % (msg, err)) 1069 return (True, "Accept successful")
1070
1071 1072 -def FinalizeMigration(instance, info, success):
1073 """Finalize any preparation to accept an instance. 1074 1075 @type instance: L{objects.Instance} 1076 @param instance: the instance definition 1077 @type info: string/data (opaque) 1078 @param info: migration information, from the source node 1079 @type success: boolean 1080 @param success: whether the migration was a success or a failure 1081 1082 """ 1083 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1084 try: 1085 hyper.FinalizeMigration(instance, info, success) 1086 except errors.HypervisorError, err: 1087 msg = "Failed to finalize migration" 1088 logging.exception(msg) 1089 return (False, '%s: %s' % (msg, err)) 1090 return (True, "Migration Finalized")
1091
1092 1093 -def MigrateInstance(instance, target, live):
1094 """Migrates an instance to another node. 1095 1096 @type instance: L{objects.Instance} 1097 @param instance: the instance definition 1098 @type target: string 1099 @param target: the target node name 1100 @type live: boolean 1101 @param live: whether the migration should be done live or not (the 1102 interpretation of this parameter is left to the hypervisor) 1103 @rtype: tuple 1104 @return: a tuple of (success, msg) where: 1105 - succes is a boolean denoting the success/failure of the operation 1106 - msg is a string with details in case of failure 1107 1108 """ 1109 hyper = hypervisor.GetHypervisor(instance.hypervisor) 1110 1111 try: 1112 hyper.MigrateInstance(instance.name, target, live) 1113 except errors.HypervisorError, err: 1114 msg = "Failed to migrate instance" 1115 logging.exception(msg) 1116 return (False, "%s: %s" % (msg, err)) 1117 return (True, "Migration successful")
1118
1119 1120 -def BlockdevCreate(disk, size, owner, on_primary, info):
1121 """Creates a block device for an instance. 1122 1123 @type disk: L{objects.Disk} 1124 @param disk: the object describing the disk we should create 1125 @type size: int 1126 @param size: the size of the physical underlying device, in MiB 1127 @type owner: str 1128 @param owner: the name of the instance for which disk is created, 1129 used for device cache data 1130 @type on_primary: boolean 1131 @param on_primary: indicates if it is the primary node or not 1132 @type info: string 1133 @param info: string that will be sent to the physical device 1134 creation, used for example to set (LVM) tags on LVs 1135 1136 @return: the new unique_id of the device (this can sometime be 1137 computed only after creation), or None. On secondary nodes, 1138 it's not required to return anything. 1139 1140 """ 1141 clist = [] 1142 if disk.children: 1143 for child in disk.children: 1144 try: 1145 crdev = _RecursiveAssembleBD(child, owner, on_primary) 1146 except errors.BlockDeviceError, err: 1147 errmsg = "Can't assemble device %s: %s" % (child, err) 1148 logging.error(errmsg) 1149 return False, errmsg 1150 if on_primary or disk.AssembleOnSecondary(): 1151 # we need the children open in case the device itself has to 1152 # be assembled 1153 try: 1154 # pylint: disable-msg=E1103 1155 crdev.Open() 1156 except errors.BlockDeviceError, err: 1157 errmsg = "Can't make child '%s' read-write: %s" % (child, err) 1158 logging.error(errmsg) 1159 return False, errmsg 1160 clist.append(crdev) 1161 1162 try: 1163 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size) 1164 except errors.BlockDeviceError, err: 1165 return False, "Can't create block device: %s" % str(err) 1166 1167 if on_primary or disk.AssembleOnSecondary(): 1168 try: 1169 device.Assemble() 1170 except errors.BlockDeviceError, err: 1171 errmsg = ("Can't assemble device after creation, very" 1172 " unusual event: %s" % str(err)) 1173 logging.error(errmsg) 1174 return False, errmsg 1175 device.SetSyncSpeed(constants.SYNC_SPEED) 1176 if on_primary or disk.OpenOnSecondary(): 1177 try: 1178 device.Open(force=True) 1179 except errors.BlockDeviceError, err: 1180 errmsg = ("Can't make device r/w after creation, very" 1181 " unusual event: %s" % str(err)) 1182 logging.error(errmsg) 1183 return False, errmsg 1184 DevCacheManager.UpdateCache(device.dev_path, owner, 1185 on_primary, disk.iv_name) 1186 1187 device.SetInfo(info) 1188 1189 physical_id = device.unique_id 1190 return True, physical_id
1191
1192 1193 -def BlockdevRemove(disk):
1194 """Remove a block device. 1195 1196 @note: This is intended to be called recursively. 1197 1198 @type disk: L{objects.Disk} 1199 @param disk: the disk object we should remove 1200 @rtype: boolean 1201 @return: the success of the operation 1202 1203 """ 1204 msgs = [] 1205 result = True 1206 try: 1207 rdev = _RecursiveFindBD(disk) 1208 except errors.BlockDeviceError, err: 1209 # probably can't attach 1210 logging.info("Can't attach to device %s in remove", disk) 1211 rdev = None 1212 if rdev is not None: 1213 r_path = rdev.dev_path 1214 try: 1215 rdev.Remove() 1216 except errors.BlockDeviceError, err: 1217 msgs.append(str(err)) 1218 result = False 1219 if result: 1220 DevCacheManager.RemoveCache(r_path) 1221 1222 if disk.children: 1223 for child in disk.children: 1224 c_status, c_msg = BlockdevRemove(child) 1225 result = result and c_status 1226 if c_msg: # not an empty message 1227 msgs.append(c_msg) 1228 1229 return (result, "; ".join(msgs))
1230
1231 1232 -def _RecursiveAssembleBD(disk, owner, as_primary):
1233 """Activate a block device for an instance. 1234 1235 This is run on the primary and secondary nodes for an instance. 1236 1237 @note: this function is called recursively. 1238 1239 @type disk: L{objects.Disk} 1240 @param disk: the disk we try to assemble 1241 @type owner: str 1242 @param owner: the name of the instance which owns the disk 1243 @type as_primary: boolean 1244 @param as_primary: if we should make the block device 1245 read/write 1246 1247 @return: the assembled device or None (in case no device 1248 was assembled) 1249 @raise errors.BlockDeviceError: in case there is an error 1250 during the activation of the children or the device 1251 itself 1252 1253 """ 1254 children = [] 1255 if disk.children: 1256 mcn = disk.ChildrenNeeded() 1257 if mcn == -1: 1258 mcn = 0 # max number of Nones allowed 1259 else: 1260 mcn = len(disk.children) - mcn # max number of Nones 1261 for chld_disk in disk.children: 1262 try: 1263 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary) 1264 except errors.BlockDeviceError, err: 1265 if children.count(None) >= mcn: 1266 raise 1267 cdev = None 1268 logging.error("Error in child activation (but continuing): %s", 1269 str(err)) 1270 children.append(cdev) 1271 1272 if as_primary or disk.AssembleOnSecondary(): 1273 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size) 1274 r_dev.SetSyncSpeed(constants.SYNC_SPEED) 1275 result = r_dev 1276 if as_primary or disk.OpenOnSecondary(): 1277 r_dev.Open() 1278 DevCacheManager.UpdateCache(r_dev.dev_path, owner, 1279 as_primary, disk.iv_name) 1280 1281 else: 1282 result = True 1283 return result
1284
1285 1286 -def BlockdevAssemble(disk, owner, as_primary):
1287 """Activate a block device for an instance. 1288 1289 This is a wrapper over _RecursiveAssembleBD. 1290 1291 @rtype: str or boolean 1292 @return: a C{/dev/...} path for primary nodes, and 1293 C{True} for secondary nodes 1294 1295 """ 1296 status = True 1297 result = "no error information" 1298 try: 1299 result = _RecursiveAssembleBD(disk, owner, as_primary) 1300 if isinstance(result, bdev.BlockDev): 1301 # pylint: disable-msg=E1103 1302 result = result.dev_path 1303 except errors.BlockDeviceError, err: 1304 result = "Error while assembling disk: %s" % str(err) 1305 status = False 1306 return (status, result)
1307
1308 1309 -def BlockdevShutdown(disk):
1310 """Shut down a block device. 1311 1312 First, if the device is assembled (Attach() is successful), then 1313 the device is shutdown. Then the children of the device are 1314 shutdown. 1315 1316 This function is called recursively. Note that we don't cache the 1317 children or such, as oppossed to assemble, shutdown of different 1318 devices doesn't require that the upper device was active. 1319 1320 @type disk: L{objects.Disk} 1321 @param disk: the description of the disk we should 1322 shutdown 1323 @rtype: boolean 1324 @return: the success of the operation 1325 1326 """ 1327 msgs = [] 1328 result = True 1329 r_dev = _RecursiveFindBD(disk) 1330 if r_dev is not None: 1331 r_path = r_dev.dev_path 1332 try: 1333 r_dev.Shutdown() 1334 DevCacheManager.RemoveCache(r_path) 1335 except errors.BlockDeviceError, err: 1336 msgs.append(str(err)) 1337 result = False 1338 1339 if disk.children: 1340 for child in disk.children: 1341 c_status, c_msg = BlockdevShutdown(child) 1342 result = result and c_status 1343 if c_msg: # not an empty message 1344 msgs.append(c_msg) 1345 1346 return (result, "; ".join(msgs))
1347
1348 1349 -def BlockdevAddchildren(parent_cdev, new_cdevs):
1350 """Extend a mirrored block device. 1351 1352 @type parent_cdev: L{objects.Disk} 1353 @param parent_cdev: the disk to which we should add children 1354 @type new_cdevs: list of L{objects.Disk} 1355 @param new_cdevs: the list of children which we should add 1356 @rtype: boolean 1357 @return: the success of the operation 1358 1359 """ 1360 parent_bdev = _RecursiveFindBD(parent_cdev) 1361 if parent_bdev is None: 1362 logging.error("Can't find parent device") 1363 return False 1364 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs] 1365 if new_bdevs.count(None) > 0: 1366 logging.error("Can't find new device(s) to add: %s:%s", 1367 new_bdevs, new_cdevs) 1368 return False 1369 parent_bdev.AddChildren(new_bdevs) 1370 return True
1371
1372 1373 -def BlockdevRemovechildren(parent_cdev, new_cdevs):
1374 """Shrink a mirrored block device. 1375 1376 @type parent_cdev: L{objects.Disk} 1377 @param parent_cdev: the disk from which we should remove children 1378 @type new_cdevs: list of L{objects.Disk} 1379 @param new_cdevs: the list of children which we should remove 1380 @rtype: boolean 1381 @return: the success of the operation 1382 1383 """ 1384 parent_bdev = _RecursiveFindBD(parent_cdev) 1385 if parent_bdev is None: 1386 logging.error("Can't find parent in remove children: %s", parent_cdev) 1387 return False 1388 devs = [] 1389 for disk in new_cdevs: 1390 rpath = disk.StaticDevPath() 1391 if rpath is None: 1392 bd = _RecursiveFindBD(disk) 1393 if bd is None: 1394 logging.error("Can't find dynamic device %s while removing children", 1395 disk) 1396 return False 1397 else: 1398 devs.append(bd.dev_path) 1399 else: 1400 devs.append(rpath) 1401 parent_bdev.RemoveChildren(devs) 1402 return True
1403
1404 1405 -def BlockdevGetmirrorstatus(disks):
1406 """Get the mirroring status of a list of devices. 1407 1408 @type disks: list of L{objects.Disk} 1409 @param disks: the list of disks which we should query 1410 @rtype: disk 1411 @return: 1412 a list of (mirror_done, estimated_time) tuples, which 1413 are the result of L{bdev.BlockDev.CombinedSyncStatus} 1414 @raise errors.BlockDeviceError: if any of the disks cannot be 1415 found 1416 1417 """ 1418 stats = [] 1419 for dsk in disks: 1420 rbd = _RecursiveFindBD(dsk) 1421 if rbd is None: 1422 raise errors.BlockDeviceError("Can't find device %s" % str(dsk)) 1423 stats.append(rbd.CombinedSyncStatus()) 1424 return stats
1425
1426 1427 -def _RecursiveFindBD(disk):
1428 """Check if a device is activated. 1429 1430 If so, return information about the real device. 1431 1432 @type disk: L{objects.Disk} 1433 @param disk: the disk object we need to find 1434 1435 @return: None if the device can't be found, 1436 otherwise the device instance 1437 1438 """ 1439 children = [] 1440 if disk.children: 1441 for chdisk in disk.children: 1442 children.append(_RecursiveFindBD(chdisk)) 1443 1444 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1445
1446 1447 -def BlockdevFind(disk):
1448 """Check if a device is activated. 1449 1450 If it is, return information about the real device. 1451 1452 @type disk: L{objects.Disk} 1453 @param disk: the disk to find 1454 @rtype: None or tuple 1455 @return: None if the disk cannot be found, otherwise a 1456 tuple (device_path, major, minor, sync_percent, 1457 estimated_time, is_degraded) 1458 1459 """ 1460 try: 1461 rbd = _RecursiveFindBD(disk) 1462 except errors.BlockDeviceError, err: 1463 return (False, str(err)) 1464 if rbd is None: 1465 return (True, None) 1466 return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1467
1468 1469 -def BlockdevGetsize(disks):
1470 """Computes the size of the given disks. 1471 1472 If a disk is not found, returns None instead. 1473 1474 @type disks: list of L{objects.Disk} 1475 @param disks: the list of disk to compute the size for 1476 @rtype: list 1477 @return: list with elements None if the disk cannot be found, 1478 otherwise the size 1479 1480 """ 1481 result = [] 1482 for cf in disks: 1483 try: 1484 rbd = _RecursiveFindBD(cf) 1485 except errors.BlockDeviceError, err: 1486 result.append(None) 1487 continue 1488 if rbd is None: 1489 result.append(None) 1490 else: 1491 result.append(rbd.GetActualSize()) 1492 return result
1493
1494 1495 -def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1496 """Write a file to the filesystem. 1497 1498 This allows the master to overwrite(!) a file. It will only perform 1499 the operation if the file belongs to a list of configuration files. 1500 1501 @type file_name: str 1502 @param file_name: the target file name 1503 @type data: str 1504 @param data: the new contents of the file 1505 @type mode: int 1506 @param mode: the mode to give the file (can be None) 1507 @type uid: int 1508 @param uid: the owner of the file (can be -1 for default) 1509 @type gid: int 1510 @param gid: the group of the file (can be -1 for default) 1511 @type atime: float 1512 @param atime: the atime to set on the file (can be None) 1513 @type mtime: float 1514 @param mtime: the mtime to set on the file (can be None) 1515 @rtype: boolean 1516 @return: the success of the operation; errors are logged 1517 in the node daemon log 1518 1519 """ 1520 if not os.path.isabs(file_name): 1521 logging.error("Filename passed to UploadFile is not absolute: '%s'", 1522 file_name) 1523 return False 1524 1525 if file_name not in _ALLOWED_UPLOAD_FILES: 1526 logging.error("Filename passed to UploadFile not in allowed" 1527 " upload targets: '%s'", file_name) 1528 return False 1529 1530 raw_data = _Decompress(data) 1531 1532 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid, 1533 atime=atime, mtime=mtime) 1534 return True
1535
1536 1537 -def WriteSsconfFiles(values):
1538 """Update all ssconf files. 1539 1540 Wrapper around the SimpleStore.WriteFiles. 1541 1542 """ 1543 ssconf.SimpleStore().WriteFiles(values)
1544
1545 1546 -def _ErrnoOrStr(err):
1547 """Format an EnvironmentError exception. 1548 1549 If the L{err} argument has an errno attribute, it will be looked up 1550 and converted into a textual C{E...} description. Otherwise the 1551 string representation of the error will be returned. 1552 1553 @type err: L{EnvironmentError} 1554 @param err: the exception to format 1555 1556 """ 1557 if hasattr(err, 'errno'): 1558 detail = errno.errorcode[err.errno] 1559 else: 1560 detail = str(err) 1561 return detail
1562
1563 1564 -def _OSOndiskVersion(name, os_dir):
1565 """Compute and return the API version of a given OS. 1566 1567 This function will try to read the API version of the OS given by 1568 the 'name' parameter and residing in the 'os_dir' directory. 1569 1570 @type name: str 1571 @param name: the OS name we should look for 1572 @type os_dir: str 1573 @param os_dir: the directory inwhich we should look for the OS 1574 @rtype: int or None 1575 @return: 1576 Either an integer denoting the version or None in the 1577 case when this is not a valid OS name. 1578 @raise errors.InvalidOS: if the OS cannot be found 1579 1580 """ 1581 api_file = os.path.sep.join([os_dir, "ganeti_api_version"]) 1582 1583 try: 1584 st = os.stat(api_file) 1585 except EnvironmentError, err: 1586 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not" 1587 " found (%s)" % _ErrnoOrStr(err)) 1588 1589 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)): 1590 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not" 1591 " a regular file") 1592 1593 try: 1594 f = open(api_file) 1595 try: 1596 api_versions = f.readlines() 1597 finally: 1598 f.close() 1599 except EnvironmentError, err: 1600 raise errors.InvalidOS(name, os_dir, "error while reading the" 1601 " API version (%s)" % _ErrnoOrStr(err)) 1602 1603 api_versions = [version.strip() for version in api_versions] 1604 try: 1605 api_versions = [int(version) for version in api_versions] 1606 except (TypeError, ValueError), err: 1607 raise errors.InvalidOS(name, os_dir, 1608 "API version is not integer (%s)" % str(err)) 1609 1610 return api_versions
1611
1612 1613 -def DiagnoseOS(top_dirs=None):
1614 """Compute the validity for all OSes. 1615 1616 @type top_dirs: list 1617 @param top_dirs: the list of directories in which to 1618 search (if not given defaults to 1619 L{constants.OS_SEARCH_PATH}) 1620 @rtype: list of L{objects.OS} 1621 @return: an OS object for each name in all the given 1622 directories 1623 1624 """ 1625 if top_dirs is None: 1626 top_dirs = constants.OS_SEARCH_PATH 1627 1628 result = [] 1629 for dir_name in top_dirs: 1630 if os.path.isdir(dir_name): 1631 try: 1632 f_names = utils.ListVisibleFiles(dir_name) 1633 except EnvironmentError, err: 1634 logging.exception("Can't list the OS directory %s", dir_name) 1635 break 1636 for name in f_names: 1637 try: 1638 os_inst = OSFromDisk(name, base_dir=dir_name) 1639 result.append(os_inst) 1640 except errors.InvalidOS, err: 1641 result.append(objects.OS.FromInvalidOS(err)) 1642 1643 return result
1644
1645 1646 -def OSFromDisk(name, base_dir=None):
1647 """Create an OS instance from disk. 1648 1649 This function will return an OS instance if the given name is a 1650 valid OS name. Otherwise, it will raise an appropriate 1651 L{errors.InvalidOS} exception, detailing why this is not a valid OS. 1652 1653 @type base_dir: string 1654 @keyword base_dir: Base directory containing OS installations. 1655 Defaults to a search in all the OS_SEARCH_PATH dirs. 1656 @rtype: L{objects.OS} 1657 @return: the OS instance if we find a valid one 1658 @raise errors.InvalidOS: if we don't find a valid OS 1659 1660 """ 1661 if base_dir is None: 1662 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir) 1663 else: 1664 os_dir = utils.FindFile(name, [base_dir], os.path.isdir) 1665 1666 if os_dir is None: 1667 raise errors.InvalidOS(name, None, "OS dir not found in search path") 1668 1669 api_versions = _OSOndiskVersion(name, os_dir) 1670 1671 if constants.OS_API_VERSION not in api_versions: 1672 raise errors.InvalidOS(name, os_dir, "API version mismatch" 1673 " (found %s want %s)" 1674 % (api_versions, constants.OS_API_VERSION)) 1675 1676 # OS Scripts dictionary, we will populate it with the actual script names 1677 os_scripts = dict.fromkeys(constants.OS_SCRIPTS) 1678 1679 for script in os_scripts: 1680 os_scripts[script] = os.path.sep.join([os_dir, script]) 1681 1682 try: 1683 st = os.stat(os_scripts[script]) 1684 except EnvironmentError, err: 1685 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" % 1686 (script, _ErrnoOrStr(err))) 1687 1688 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR: 1689 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" % 1690 script) 1691 1692 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)): 1693 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" % 1694 script) 1695 1696 1697 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS, 1698 create_script=os_scripts[constants.OS_SCRIPT_CREATE], 1699 export_script=os_scripts[constants.OS_SCRIPT_EXPORT], 1700 import_script=os_scripts[constants.OS_SCRIPT_IMPORT], 1701 rename_script=os_scripts[constants.OS_SCRIPT_RENAME], 1702 api_versions=api_versions)
1703
1704 -def OSEnvironment(instance, debug=0):
1705 """Calculate the environment for an os script. 1706 1707 @type instance: L{objects.Instance} 1708 @param instance: target instance for the os script run 1709 @type debug: integer 1710 @param debug: debug level (0 or 1, for OS Api 10) 1711 @rtype: dict 1712 @return: dict of environment variables 1713 @raise errors.BlockDeviceError: if the block device 1714 cannot be found 1715 1716 """ 1717 result = {} 1718 result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION 1719 result['INSTANCE_NAME'] = instance.name 1720 result['INSTANCE_OS'] = instance.os 1721 result['HYPERVISOR'] = instance.hypervisor 1722 result['DISK_COUNT'] = '%d' % len(instance.disks) 1723 result['NIC_COUNT'] = '%d' % len(instance.nics) 1724 result['DEBUG_LEVEL'] = '%d' % debug 1725 for idx, disk in enumerate(instance.disks): 1726 real_disk = _RecursiveFindBD(disk) 1727 if real_disk is None: 1728 raise errors.BlockDeviceError("Block device '%s' is not set up" % 1729 str(disk)) 1730 real_disk.Open() 1731 result['DISK_%d_PATH' % idx] = real_disk.dev_path 1732 result['DISK_%d_ACCESS' % idx] = disk.mode 1733 if constants.HV_DISK_TYPE in instance.hvparams: 1734 result['DISK_%d_FRONTEND_TYPE' % idx] = \ 1735 instance.hvparams[constants.HV_DISK_TYPE] 1736 if disk.dev_type in constants.LDS_BLOCK: 1737 result['DISK_%d_BACKEND_TYPE' % idx] = 'block' 1738 elif disk.dev_type == constants.LD_FILE: 1739 result['DISK_%d_BACKEND_TYPE' % idx] = \ 1740 'file:%s' % disk.physical_id[0] 1741 for idx, nic in enumerate(instance.nics): 1742 result['NIC_%d_MAC' % idx] = nic.mac 1743 if nic.ip: 1744 result['NIC_%d_IP' % idx] = nic.ip 1745 result['NIC_%d_BRIDGE' % idx] = nic.bridge 1746 if constants.HV_NIC_TYPE in instance.hvparams: 1747 result['NIC_%d_FRONTEND_TYPE' % idx] = \ 1748 instance.hvparams[constants.HV_NIC_TYPE] 1749 1750 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]: 1751 for key, value in source.items(): 1752 result["INSTANCE_%s_%s" % (kind, key)] = str(value) 1753 1754 return result
1755
1756 -def BlockdevGrow(disk, amount):
1757 """Grow a stack of block devices. 1758 1759 This function is called recursively, with the childrens being the 1760 first ones to resize. 1761 1762 @type disk: L{objects.Disk} 1763 @param disk: the disk to be grown 1764 @rtype: (status, result) 1765 @return: a tuple with the status of the operation 1766 (True/False), and the errors message if status 1767 is False 1768 1769 """ 1770 r_dev = _RecursiveFindBD(disk) 1771 if r_dev is None: 1772 return False, "Cannot find block device %s" % (disk,) 1773 1774 try: 1775 r_dev.Grow(amount) 1776 except errors.BlockDeviceError, err: 1777 return False, str(err) 1778 1779 return True, None
1780
1781 1782 -def BlockdevSnapshot(disk):
1783 """Create a snapshot copy of a block device. 1784 1785 This function is called recursively, and the snapshot is actually created 1786 just for the leaf lvm backend device. 1787 1788 @type disk: L{objects.Disk} 1789 @param disk: the disk to be snapshotted 1790 @rtype: string 1791 @return: snapshot disk path 1792 1793 """ 1794 if disk.children: 1795 if len(disk.children) == 1: 1796 # only one child, let's recurse on it 1797 return BlockdevSnapshot(disk.children[0]) 1798 else: 1799 # more than one child, choose one that matches 1800 for child in disk.children: 1801 if child.size == disk.size: 1802 # return implies breaking the loop 1803 return BlockdevSnapshot(child) 1804 elif disk.dev_type == constants.LD_LV: 1805 r_dev = _RecursiveFindBD(disk) 1806 if r_dev is not None: 1807 # let's stay on the safe side and ask for the full size, for now 1808 return r_dev.Snapshot(disk.size) 1809 else: 1810 return None 1811 else: 1812 raise errors.ProgrammerError("Cannot snapshot non-lvm block device" 1813 " '%s' of type '%s'" % 1814 (disk.unique_id, disk.dev_type))
1815
1816 1817 -def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1818 """Export a block device snapshot to a remote node. 1819 1820 @type disk: L{objects.Disk} 1821 @param disk: the description of the disk to export 1822 @type dest_node: str 1823 @param dest_node: the destination node to export to 1824 @type instance: L{objects.Instance} 1825 @param instance: the instance object to whom the disk belongs 1826 @type cluster_name: str 1827 @param cluster_name: the cluster name, needed for SSH hostalias 1828 @type idx: int 1829 @param idx: the index of the disk in the instance's disk list, 1830 used to export to the OS scripts environment 1831 @rtype: boolean 1832 @return: the success of the operation 1833 1834 """ 1835 export_env = OSEnvironment(instance) 1836 1837 inst_os = OSFromDisk(instance.os) 1838 export_script = inst_os.export_script 1839 1840 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name, 1841 instance.name, int(time.time())) 1842 if not os.path.exists(constants.LOG_OS_DIR): 1843 os.mkdir(constants.LOG_OS_DIR, 0750) 1844 real_disk = _RecursiveFindBD(disk) 1845 if real_disk is None: 1846 raise errors.BlockDeviceError("Block device '%s' is not set up" % 1847 str(disk)) 1848 real_disk.Open() 1849 1850 export_env['EXPORT_DEVICE'] = real_disk.dev_path 1851 export_env['EXPORT_INDEX'] = str(idx) 1852 1853 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new") 1854 destfile = disk.physical_id[1] 1855 1856 # the target command is built out of three individual commands, 1857 # which are joined by pipes; we check each individual command for 1858 # valid parameters 1859 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s", 1860 inst_os.path, export_script, logfile) 1861 1862 comprcmd = "gzip" 1863 1864 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s", 1865 destdir, destdir, destfile) 1866 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node, 1867 constants.GANETI_RUNAS, 1868 destcmd) 1869 1870 # all commands have been checked, so we're safe to combine them 1871 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)]) 1872 1873 result = utils.RunCmd(["bash", "-c", command], env=export_env) 1874 1875 if result.failed: 1876 logging.error("os snapshot export command '%s' returned error: %s" 1877 " output: %s", command, result.fail_reason, result.output) 1878 return False 1879 1880 return True
1881
1882 1883 -def FinalizeExport(instance, snap_disks):
1884 """Write out the export configuration information. 1885 1886 @type instance: L{objects.Instance} 1887 @param instance: the instance which we export, used for 1888 saving configuration 1889 @type snap_disks: list of L{objects.Disk} 1890 @param snap_disks: list of snapshot block devices, which 1891 will be used to get the actual name of the dump file 1892 1893 @rtype: boolean 1894 @return: the success of the operation 1895 1896 """ 1897 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new") 1898 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name) 1899 1900 config = objects.SerializableConfigParser() 1901 1902 config.add_section(constants.INISECT_EXP) 1903 config.set(constants.INISECT_EXP, 'version', '0') 1904 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time())) 1905 config.set(constants.INISECT_EXP, 'source', instance.primary_node) 1906 config.set(constants.INISECT_EXP, 'os', instance.os) 1907 config.set(constants.INISECT_EXP, 'compression', 'gzip') 1908 1909 config.add_section(constants.INISECT_INS) 1910 config.set(constants.INISECT_INS, 'name', instance.name) 1911 config.set(constants.INISECT_INS, 'memory', '%d' % 1912 instance.beparams[constants.BE_MEMORY]) 1913 config.set(constants.INISECT_INS, 'vcpus', '%d' % 1914 instance.beparams[constants.BE_VCPUS]) 1915 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template) 1916 1917 nic_total = 0 1918 for nic_count, nic in enumerate(instance.nics): 1919 nic_total += 1 1920 config.set(constants.INISECT_INS, 'nic%d_mac' % 1921 nic_count, '%s' % nic.mac) 1922 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip) 1923 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, 1924 '%s' % nic.bridge) 1925 # TODO: redundant: on load can read nics until it doesn't exist 1926 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total) 1927 1928 disk_total = 0 1929 for disk_count, disk in enumerate(snap_disks): 1930 if disk: 1931 disk_total += 1 1932 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count, 1933 ('%s' % disk.iv_name)) 1934 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count, 1935 ('%s' % disk.physical_id[1])) 1936 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count, 1937 ('%d' % disk.size)) 1938 1939 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total) 1940 1941 utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE), 1942 data=config.Dumps()) 1943 shutil.rmtree(finaldestdir, True) 1944 shutil.move(destdir, finaldestdir) 1945 1946 return True
1947
1948 1949 -def ExportInfo(dest):
1950 """Get export configuration information. 1951 1952 @type dest: str 1953 @param dest: directory containing the export 1954 1955 @rtype: L{objects.SerializableConfigParser} 1956 @return: a serializable config file containing the 1957 export info 1958 1959 """ 1960 cff = os.path.join(dest, constants.EXPORT_CONF_FILE) 1961 1962 config = objects.SerializableConfigParser() 1963 config.read(cff) 1964 1965 if (not config.has_section(constants.INISECT_EXP) or 1966 not config.has_section(constants.INISECT_INS)): 1967 return None 1968 1969 return config
1970
1971 1972 -def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1973 """Import an os image into an instance. 1974 1975 @type instance: L{objects.Instance} 1976 @param instance: instance to import the disks into 1977 @type src_node: string 1978 @param src_node: source node for the disk images 1979 @type src_images: list of string 1980 @param src_images: absolute paths of the disk images 1981 @rtype: list of boolean 1982 @return: each boolean represent the success of importing the n-th disk 1983 1984 """ 1985 import_env = OSEnvironment(instance) 1986 inst_os = OSFromDisk(instance.os) 1987 import_script = inst_os.import_script 1988 1989 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os, 1990 instance.name, int(time.time())) 1991 if not os.path.exists(constants.LOG_OS_DIR): 1992 os.mkdir(constants.LOG_OS_DIR, 0750) 1993 1994 comprcmd = "gunzip" 1995 impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path, 1996 import_script, logfile) 1997 1998 final_result = [] 1999 for idx, image in enumerate(src_images): 2000 if image: 2001 destcmd = utils.BuildShellCmd('cat %s', image) 2002 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node, 2003 constants.GANETI_RUNAS, 2004 destcmd) 2005 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd]) 2006 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx] 2007 import_env['IMPORT_INDEX'] = str(idx) 2008 result = utils.RunCmd(command, env=import_env) 2009 if result.failed: 2010 logging.error("Disk import command '%s' returned error: %s" 2011 " output: %s", command, result.fail_reason, 2012 result.output) 2013 final_result.append(False) 2014 else: 2015 final_result.append(True) 2016 else: 2017 final_result.append(True) 2018 2019 return final_result
2020
2021 2022 -def ListExports():
2023 """Return a list of exports currently available on this machine. 2024 2025 @rtype: list 2026 @return: list of the exports 2027 2028 """ 2029 if os.path.isdir(constants.EXPORT_DIR): 2030 return utils.ListVisibleFiles(constants.EXPORT_DIR) 2031 else: 2032 return []
2033
2034 2035 -def RemoveExport(export):
2036 """Remove an existing export from the node. 2037 2038 @type export: str 2039 @param export: the name of the export to remove 2040 @rtype: boolean 2041 @return: the success of the operation 2042 2043 """ 2044 target = os.path.join(constants.EXPORT_DIR, export) 2045 2046 shutil.rmtree(target) 2047 # TODO: catch some of the relevant exceptions and provide a pretty 2048 # error message if rmtree fails. 2049 2050 return True
2051
2052 2053 -def BlockdevRename(devlist):
2054 """Rename a list of block devices. 2055 2056 @type devlist: list of tuples 2057 @param devlist: list of tuples of the form (disk, 2058 new_logical_id, new_physical_id); disk is an 2059 L{objects.Disk} object describing the current disk, 2060 and new logical_id/physical_id is the name we 2061 rename it to 2062 @rtype: boolean 2063 @return: True if all renames succeeded, False otherwise 2064 2065 """ 2066 result = True 2067 for disk, unique_id in devlist: 2068 dev = _RecursiveFindBD(disk) 2069 if dev is None: 2070 result = False 2071 continue 2072 try: 2073 old_rpath = dev.dev_path 2074 dev.Rename(unique_id) 2075 new_rpath = dev.dev_path 2076 if old_rpath != new_rpath: 2077 DevCacheManager.RemoveCache(old_rpath) 2078 # FIXME: we should add the new cache information here, like: 2079 # DevCacheManager.UpdateCache(new_rpath, owner, ...) 2080 # but we don't have the owner here - maybe parse from existing 2081 # cache? for now, we only lose lvm data when we rename, which 2082 # is less critical than DRBD or MD 2083 except errors.BlockDeviceError: 2084 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id) 2085 result = False 2086 return result
2087
2088 2089 -def _TransformFileStorageDir(file_storage_dir):
2090 """Checks whether given file_storage_dir is valid. 2091 2092 Checks wheter the given file_storage_dir is within the cluster-wide 2093 default file_storage_dir stored in SimpleStore. Only paths under that 2094 directory are allowed. 2095 2096 @type file_storage_dir: str 2097 @param file_storage_dir: the path to check 2098 2099 @return: the normalized path if valid, None otherwise 2100 2101 """ 2102 cfg = _GetConfig() 2103 file_storage_dir = os.path.normpath(file_storage_dir) 2104 base_file_storage_dir = cfg.GetFileStorageDir() 2105 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) == 2106 base_file_storage_dir): 2107 logging.error("file storage directory '%s' is not under base file" 2108 " storage directory '%s'", 2109 file_storage_dir, base_file_storage_dir) 2110 return None 2111 return file_storage_dir
2112
2113 2114 -def CreateFileStorageDir(file_storage_dir):
2115 """Create file storage directory. 2116 2117 @type file_storage_dir: str 2118 @param file_storage_dir: directory to create 2119 2120 @rtype: tuple 2121 @return: tuple with first element a boolean indicating wheter dir 2122 creation was successful or not 2123 2124 """ 2125 file_storage_dir = _TransformFileStorageDir(file_storage_dir) 2126 result = True, 2127 if not file_storage_dir: 2128 result = False, 2129 else: 2130 if os.path.exists(file_storage_dir): 2131 if not os.path.isdir(file_storage_dir): 2132 logging.error("'%s' is not a directory", file_storage_dir) 2133 result = False, 2134 else: 2135 try: 2136 os.makedirs(file_storage_dir, 0750) 2137 except OSError, err: 2138 logging.error("Cannot create file storage directory '%s': %s", 2139 file_storage_dir, err) 2140 result = False, 2141 return result
2142
2143 2144 -def RemoveFileStorageDir(file_storage_dir):
2145 """Remove file storage directory. 2146 2147 Remove it only if it's empty. If not log an error and return. 2148 2149 @type file_storage_dir: str 2150 @param file_storage_dir: the directory we should cleanup 2151 @rtype: tuple (success,) 2152 @return: tuple of one element, C{success}, denoting 2153 whether the operation was successful 2154 2155 """ 2156 file_storage_dir = _TransformFileStorageDir(file_storage_dir) 2157 result = True, 2158 if not file_storage_dir: 2159 result = False, 2160 else: 2161 if os.path.exists(file_storage_dir): 2162 if not os.path.isdir(file_storage_dir): 2163 logging.error("'%s' is not a directory", file_storage_dir) 2164 result = False, 2165 # deletes dir only if empty, otherwise we want to return False 2166 try: 2167 os.rmdir(file_storage_dir) 2168 except OSError: 2169 logging.exception("Cannot remove file storage directory '%s'", 2170 file_storage_dir) 2171 result = False, 2172 return result
2173
2174 2175 -def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2176 """Rename the file storage directory. 2177 2178 @type old_file_storage_dir: str 2179 @param old_file_storage_dir: the current path 2180 @type new_file_storage_dir: str 2181 @param new_file_storage_dir: the name we should rename to 2182 @rtype: tuple (success,) 2183 @return: tuple of one element, C{success}, denoting 2184 whether the operation was successful 2185 2186 """ 2187 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir) 2188 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir) 2189 result = True, 2190 if not old_file_storage_dir or not new_file_storage_dir: 2191 result = False, 2192 else: 2193 if not os.path.exists(new_file_storage_dir): 2194 if os.path.isdir(old_file_storage_dir): 2195 try: 2196 os.rename(old_file_storage_dir, new_file_storage_dir) 2197 except OSError: 2198 logging.exception("Cannot rename '%s' to '%s'", 2199 old_file_storage_dir, new_file_storage_dir) 2200 result = False, 2201 else: 2202 logging.error("'%s' is not a directory", old_file_storage_dir) 2203 result = False, 2204 else: 2205 if os.path.exists(old_file_storage_dir): 2206 logging.error("Cannot rename '%s' to '%s'. Both locations exist.", 2207 old_file_storage_dir, new_file_storage_dir) 2208 result = False, 2209 return result
2210
2211 2212 -def _IsJobQueueFile(file_name):
2213 """Checks whether the given filename is in the queue directory. 2214 2215 @type file_name: str 2216 @param file_name: the file name we should check 2217 @rtype: boolean 2218 @return: whether the file is under the queue directory 2219 2220 """ 2221 queue_dir = os.path.normpath(constants.QUEUE_DIR) 2222 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir) 2223 2224 if not result: 2225 logging.error("'%s' is not a file in the queue directory", 2226 file_name) 2227 2228 return result
2229
2230 2231 -def JobQueueUpdate(file_name, content):
2232 """Updates a file in the queue directory. 2233 2234 This is just a wrapper over L{utils.WriteFile}, with proper 2235 checking. 2236 2237 @type file_name: str 2238 @param file_name: the job file name 2239 @type content: str 2240 @param content: the new job contents 2241 @rtype: boolean 2242 @return: the success of the operation 2243 2244 """ 2245 if not _IsJobQueueFile(file_name): 2246 return False 2247 2248 # Write and replace the file atomically 2249 utils.WriteFile(file_name, data=_Decompress(content)) 2250 2251 return True
2252
2253 2254 -def JobQueueRename(old, new):
2255 """Renames a job queue file. 2256 2257 This is just a wrapper over os.rename with proper checking. 2258 2259 @type old: str 2260 @param old: the old (actual) file name 2261 @type new: str 2262 @param new: the desired file name 2263 @rtype: boolean 2264 @return: the success of the operation 2265 2266 """ 2267 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)): 2268 return False 2269 2270 utils.RenameFile(old, new, mkdir=True) 2271 2272 return True
2273
2274 2275 -def JobQueueSetDrainFlag(drain_flag):
2276 """Set the drain flag for the queue. 2277 2278 This will set or unset the queue drain flag. 2279 2280 @type drain_flag: boolean 2281 @param drain_flag: if True, will set the drain flag, otherwise reset it. 2282 @rtype: boolean 2283 @return: always True 2284 @warning: the function always returns True 2285 2286 """ 2287 if drain_flag: 2288 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) 2289 else: 2290 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) 2291 2292 return True
2293
2294 2295 -def BlockdevClose(instance_name, disks):
2296 """Closes the given block devices. 2297 2298 This means they will be switched to secondary mode (in case of 2299 DRBD). 2300 2301 @param instance_name: if the argument is not empty, the symlinks 2302 of this instance will be removed 2303 @type disks: list of L{objects.Disk} 2304 @param disks: the list of disks to be closed 2305 @rtype: tuple (success, message) 2306 @return: a tuple of success and message, where success 2307 indicates the succes of the operation, and message 2308 which will contain the error details in case we 2309 failed 2310 2311 """ 2312 bdevs = [] 2313 for cf in disks: 2314 rd = _RecursiveFindBD(cf) 2315 if rd is None: 2316 return (False, "Can't find device %s" % cf) 2317 bdevs.append(rd) 2318 2319 msg = [] 2320 for rd in bdevs: 2321 try: 2322 rd.Close() 2323 except errors.BlockDeviceError, err: 2324 msg.append(str(err)) 2325 if msg: 2326 return (False, "Can't make devices secondary: %s" % ",".join(msg)) 2327 else: 2328 if instance_name: 2329 _RemoveBlockDevLinks(instance_name, disks) 2330 return (True, "All devices secondary")
2331
2332 2333 -def ValidateHVParams(hvname, hvparams):
2334 """Validates the given hypervisor parameters. 2335 2336 @type hvname: string 2337 @param hvname: the hypervisor name 2338 @type hvparams: dict 2339 @param hvparams: the hypervisor parameters to be validated 2340 @rtype: tuple (success, message) 2341 @return: a tuple of success and message, where success 2342 indicates the succes of the operation, and message 2343 which will contain the error details in case we 2344 failed 2345 2346 """ 2347 try: 2348 hv_type = hypervisor.GetHypervisor(hvname) 2349 hv_type.ValidateParameters(hvparams) 2350 return (True, "Validation passed") 2351 except errors.HypervisorError, err: 2352 return (False, str(err))
2353
2354 2355 -def DemoteFromMC():
2356 """Demotes the current node from master candidate role. 2357 2358 """ 2359 # try to ensure we're not the master by mistake 2360 master, myself = ssconf.GetMasterAndMyself() 2361 if master == myself: 2362 return (False, "ssconf status shows I'm the master node, will not demote") 2363 pid_file = utils.DaemonPidFileName(constants.MASTERD_PID) 2364 if utils.IsProcessAlive(utils.ReadPidFile(pid_file)): 2365 return (False, "The master daemon is running, will not demote") 2366 try: 2367 if os.path.isfile(constants.CLUSTER_CONF_FILE): 2368 utils.CreateBackup(constants.CLUSTER_CONF_FILE) 2369 except EnvironmentError, err: 2370 if err.errno != errno.ENOENT: 2371 return (False, "Error while backing up cluster file: %s" % str(err)) 2372 utils.RemoveFile(constants.CLUSTER_CONF_FILE) 2373 return (True, "Done")
2374
2375 2376 -def _FindDisks(nodes_ip, disks):
2377 """Sets the physical ID on disks and returns the block devices. 2378 2379 """ 2380 # set the correct physical ID 2381 my_name = utils.HostInfo().name 2382 for cf in disks: 2383 cf.SetPhysicalID(my_name, nodes_ip) 2384 2385 bdevs = [] 2386 2387 for cf in disks: 2388 rd = _RecursiveFindBD(cf) 2389 if rd is None: 2390 return (False, "Can't find device %s" % cf) 2391 bdevs.append(rd) 2392 return (True, bdevs)
2393
2394 2395 -def DrbdDisconnectNet(nodes_ip, disks):
2396 """Disconnects the network on a list of drbd devices. 2397 2398 """ 2399 status, bdevs = _FindDisks(nodes_ip, disks) 2400 if not status: 2401 return status, bdevs 2402 2403 # disconnect disks 2404 for rd in bdevs: 2405 try: 2406 rd.DisconnectNet() 2407 except errors.BlockDeviceError, err: 2408 logging.exception("Failed to go into standalone mode") 2409 return (False, "Can't change network configuration: %s" % str(err)) 2410 return (True, "All disks are now disconnected")
2411
2412 2413 -def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2414 """Attaches the network on a list of drbd devices. 2415 2416 """ 2417 status, bdevs = _FindDisks(nodes_ip, disks) 2418 if not status: 2419 return status, bdevs 2420 2421 if multimaster: 2422 for idx, rd in enumerate(bdevs): 2423 try: 2424 _SymlinkBlockDev(instance_name, rd.dev_path, idx) 2425 except EnvironmentError, err: 2426 return (False, "Can't create symlink: %s" % str(err)) 2427 # reconnect disks, switch to new master configuration and if 2428 # needed primary mode 2429 for rd in bdevs: 2430 try: 2431 rd.AttachNet(multimaster) 2432 except errors.BlockDeviceError, err: 2433 return (False, "Can't change network configuration: %s" % str(err)) 2434 # wait until the disks are connected; we need to retry the re-attach 2435 # if the device becomes standalone, as this might happen if the one 2436 # node disconnects and reconnects in a different mode before the 2437 # other node reconnects; in this case, one or both of the nodes will 2438 # decide it has wrong configuration and switch to standalone 2439 RECONNECT_TIMEOUT = 2 * 60 2440 sleep_time = 0.100 # start with 100 miliseconds 2441 timeout_limit = time.time() + RECONNECT_TIMEOUT 2442 while time.time() < timeout_limit: 2443 all_connected = True 2444 for rd in bdevs: 2445 stats = rd.GetProcStatus() 2446 if not (stats.is_connected or stats.is_in_resync): 2447 all_connected = False 2448 if stats.is_standalone: 2449 # peer had different config info and this node became 2450 # standalone, even though this should not happen with the 2451 # new staged way of changing disk configs 2452 try: 2453 rd.AttachNet(multimaster) 2454 except errors.BlockDeviceError, err: 2455 return (False, "Can't change network configuration: %s" % str(err)) 2456 if all_connected: 2457 break 2458 time.sleep(sleep_time) 2459 sleep_time = min(5, sleep_time * 1.5) 2460 if not all_connected: 2461 return (False, "Timeout in disk reconnecting") 2462 if multimaster: 2463 # change to primary mode 2464 for rd in bdevs: 2465 try: 2466 rd.Open() 2467 except errors.BlockDeviceError, err: 2468 return (False, "Can't change to primary mode: %s" % str(err)) 2469 if multimaster: 2470 msg = "multi-master and primary" 2471 else: 2472 msg = "single-master" 2473 return (True, "Disks are now configured as %s" % msg)
2474
2475 2476 -def DrbdWaitSync(nodes_ip, disks):
2477 """Wait until DRBDs have synchronized. 2478 2479 """ 2480 status, bdevs = _FindDisks(nodes_ip, disks) 2481 if not status: 2482 return status, bdevs 2483 2484 min_resync = 100 2485 alldone = True 2486 failure = False 2487 for rd in bdevs: 2488 stats = rd.GetProcStatus() 2489 if not (stats.is_connected or stats.is_in_resync): 2490 failure = True 2491 break 2492 alldone = alldone and (not stats.is_in_resync) 2493 if stats.sync_percent is not None: 2494 min_resync = min(min_resync, stats.sync_percent) 2495 return (not failure, (alldone, min_resync))
2496
2497 2498 -class HooksRunner(object):
2499 """Hook runner. 2500 2501 This class is instantiated on the node side (ganeti-noded) and not 2502 on the master side. 2503 2504 """
2505 - def __init__(self, hooks_base_dir=None):
2506 """Constructor for hooks runner. 2507 2508 @type hooks_base_dir: str or None 2509 @param hooks_base_dir: if not None, this overrides the 2510 L{constants.HOOKS_BASE_DIR} (useful for unittests) 2511 2512 """ 2513 if hooks_base_dir is None: 2514 hooks_base_dir = constants.HOOKS_BASE_DIR 2515 # yeah, _BASE_DIR is not valid for attributes, we use it like a 2516 # constant 2517 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2518 2519 @staticmethod
2520 - def ExecHook(script, env):
2521 """Exec one hook script. 2522 2523 @type script: str 2524 @param script: the full path to the script 2525 @type env: dict 2526 @param env: the environment with which to exec the script 2527 @rtype: tuple (success, message) 2528 @return: a tuple of success and message, where success 2529 indicates the succes of the operation, and message 2530 which will contain the error details in case we 2531 failed 2532 2533 """ 2534 # exec the process using subprocess and log the output 2535 fdstdin = None 2536 try: 2537 fdstdin = open("/dev/null", "r") 2538 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE, 2539 stderr=subprocess.STDOUT, close_fds=True, 2540 shell=False, cwd="/", env=env) 2541 output = "" 2542 try: 2543 output = child.stdout.read(4096) 2544 child.stdout.close() 2545 except EnvironmentError, err: 2546 output += "Hook script error: %s" % str(err) 2547 2548 while True: 2549 try: 2550 result = child.wait() 2551 break 2552 except EnvironmentError, err: 2553 if err.errno == errno.EINTR: 2554 continue 2555 raise 2556 finally: 2557 # try not to leak fds 2558 for fd in (fdstdin, ): 2559 if fd is not None: 2560 try: 2561 fd.close() 2562 except EnvironmentError, err: 2563 # just log the error 2564 #logging.exception("Error while closing fd %s", fd) 2565 pass 2566 2567 return result == 0, utils.SafeEncode(output.strip())
2568
2569 - def RunHooks(self, hpath, phase, env):
2570 """Run the scripts in the hooks directory. 2571 2572 @type hpath: str 2573 @param hpath: the path to the hooks directory which 2574 holds the scripts 2575 @type phase: str 2576 @param phase: either L{constants.HOOKS_PHASE_PRE} or 2577 L{constants.HOOKS_PHASE_POST} 2578 @type env: dict 2579 @param env: dictionary with the environment for the hook 2580 @rtype: list 2581 @return: list of 3-element tuples: 2582 - script path 2583 - script result, either L{constants.HKR_SUCCESS} or 2584 L{constants.HKR_FAIL} 2585 - output of the script 2586 2587 @raise errors.ProgrammerError: for invalid input 2588 parameters 2589 2590 """ 2591 if phase == constants.HOOKS_PHASE_PRE: 2592 suffix = "pre" 2593 elif phase == constants.HOOKS_PHASE_POST: 2594 suffix = "post" 2595 else: 2596 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase) 2597 rr = [] 2598 2599 subdir = "%s-%s.d" % (hpath, suffix) 2600 dir_name = "%s/%s" % (self._BASE_DIR, subdir) 2601 try: 2602 dir_contents = utils.ListVisibleFiles(dir_name) 2603 except OSError: 2604 # FIXME: must log output in case of failures 2605 return rr 2606 2607 # we use the standard python sort order, 2608 # so 00name is the recommended naming scheme 2609 dir_contents.sort() 2610 for relname in dir_contents: 2611 fname = os.path.join(dir_name, relname) 2612 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and 2613 constants.EXT_PLUGIN_MASK.match(relname) is not None): 2614 rrval = constants.HKR_SKIP 2615 output = "" 2616 else: 2617 result, output = self.ExecHook(fname, env) 2618 if not result: 2619 rrval = constants.HKR_FAIL 2620 else: 2621 rrval = constants.HKR_SUCCESS 2622 rr.append(("%s/%s" % (subdir, relname), rrval, output)) 2623 2624 return rr
2625
2626 2627 -class IAllocatorRunner(object):
2628 """IAllocator runner. 2629 2630 This class is instantiated on the node side (ganeti-noded) and not on 2631 the master side. 2632 2633 """
2634 - def Run(self, name, idata):
2635 """Run an iallocator script. 2636 2637 @type name: str 2638 @param name: the iallocator script name 2639 @type idata: str 2640 @param idata: the allocator input data 2641 2642 @rtype: tuple 2643 @return: four element tuple of: 2644 - run status (one of the IARUN_ constants) 2645 - stdout 2646 - stderr 2647 - fail reason (as from L{utils.RunResult}) 2648 2649 """ 2650 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH, 2651 os.path.isfile) 2652 if alloc_script is None: 2653 return (constants.IARUN_NOTFOUND, None, None, None) 2654 2655 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.") 2656 try: 2657 os.write(fd, idata) 2658 os.close(fd) 2659 result = utils.RunCmd([alloc_script, fin_name]) 2660 if result.failed: 2661 return (constants.IARUN_FAILURE, result.stdout, result.stderr, 2662 result.fail_reason) 2663 finally: 2664 os.unlink(fin_name) 2665 2666 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2667
2668 2669 -class DevCacheManager(object):
2670 """Simple class for managing a cache of block device information. 2671 2672 """ 2673 _DEV_PREFIX = "/dev/" 2674 _ROOT_DIR = constants.BDEV_CACHE_DIR 2675 2676 @classmethod
2677 - def _ConvertPath(cls, dev_path):
2678 """Converts a /dev/name path to the cache file name. 2679 2680 This replaces slashes with underscores and strips the /dev 2681 prefix. It then returns the full path to the cache file. 2682 2683 @type dev_path: str 2684 @param dev_path: the C{/dev/} path name 2685 @rtype: str 2686 @return: the converted path name 2687 2688 """ 2689 if dev_path.startswith(cls._DEV_PREFIX): 2690 dev_path = dev_path[len(cls._DEV_PREFIX):] 2691 dev_path = dev_path.replace("/", "_") 2692 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path) 2693 return fpath
2694 2695 @classmethod
2696 - def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2697 """Updates the cache information for a given device. 2698 2699 @type dev_path: str 2700 @param dev_path: the pathname of the device 2701 @type owner: str 2702 @param owner: the owner (instance name) of the device 2703 @type on_primary: bool 2704 @param on_primary: whether this is the primary 2705 node nor not 2706 @type iv_name: str 2707 @param iv_name: the instance-visible name of the 2708 device, as in objects.Disk.iv_name 2709 2710 @rtype: None 2711 2712 """ 2713 if dev_path is None: 2714 logging.error("DevCacheManager.UpdateCache got a None dev_path") 2715 return 2716 fpath = cls._ConvertPath(dev_path) 2717 if on_primary: 2718 state = "primary" 2719 else: 2720 state = "secondary" 2721 if iv_name is None: 2722 iv_name = "not_visible" 2723 fdata = "%s %s %s\n" % (str(owner), state, iv_name) 2724 try: 2725 utils.WriteFile(fpath, data=fdata) 2726 except EnvironmentError: 2727 logging.exception("Can't update bdev cache for %s", dev_path)
2728 2729 @classmethod
2730 - def RemoveCache(cls, dev_path):
2731 """Remove data for a dev_path. 2732 2733 This is just a wrapper over L{utils.RemoveFile} with a converted 2734 path name and logging. 2735 2736 @type dev_path: str 2737 @param dev_path: the pathname of the device 2738 2739 @rtype: None 2740 2741 """ 2742 if dev_path is None: 2743 logging.error("DevCacheManager.RemoveCache got a None dev_path") 2744 return 2745 fpath = cls._ConvertPath(dev_path) 2746 try: 2747 utils.RemoveFile(fpath) 2748 except EnvironmentError: 2749 logging.exception("Can't update bdev cache for %s", dev_path)
2750