Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Ganeti node daemon""" 
  32   
  33  # pylint: disable=C0103,W0142 
  34   
  35  # C0103: Functions in this module need to have a given name structure, 
  36  # and the name of the daemon doesn't match 
  37   
  38  # W0142: Used * or ** magic, since we do use it extensively in this 
  39  # module 
  40   
  41  import os 
  42  import sys 
  43  import logging 
  44  import signal 
  45  import codecs 
  46   
  47  from optparse import OptionParser 
  48   
  49  from ganeti import backend 
  50  from ganeti import constants 
  51  from ganeti import objects 
  52  from ganeti import errors 
  53  from ganeti import jstore 
  54  from ganeti import daemon 
  55  from ganeti import http 
  56  from ganeti import utils 
  57  from ganeti.storage import container 
  58  from ganeti import serializer 
  59  from ganeti import netutils 
  60  from ganeti import pathutils 
  61  from ganeti import ssconf 
  62   
  63  import ganeti.http.server # pylint: disable=W0611 
  64   
  65   
  66  queue_lock = None 
67 68 69 -def _extendReasonTrail(trail, source, reason=""):
70 """Extend the reason trail with noded information 71 72 The trail is extended by appending the name of the noded functionality 73 """ 74 assert trail is not None 75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source) 76 trail.append((trail_source, reason, utils.EpochNano()))
77
78 79 -def _PrepareQueueLock():
80 """Try to prepare the queue lock. 81 82 @return: None for success, otherwise an exception object 83 84 """ 85 global queue_lock # pylint: disable=W0603 86 87 if queue_lock is not None: 88 return None 89 90 # Prepare job queue 91 try: 92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 93 return None 94 except EnvironmentError, err: 95 return err
96
97 98 -def _RequireJobQueueLock(fn):
99 """Decorator for job queue manipulating functions. 100 101 """ 102 QUEUE_LOCK_TIMEOUT = 10 103 104 def wrapper(*args, **kwargs): 105 # Locking in exclusive, blocking mode because there could be several 106 # children running at the same time. Waiting up to 10 seconds. 107 if _PrepareQueueLock() is not None: 108 raise errors.JobQueueError("Job queue failed initialization," 109 " cannot update jobs") 110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 111 try: 112 return fn(*args, **kwargs) 113 finally: 114 queue_lock.Unlock()
115 116 return wrapper 117
118 119 -def _DecodeImportExportIO(ieio, ieioargs):
120 """Decodes import/export I/O information. 121 122 """ 123 if ieio == constants.IEIO_RAW_DISK: 124 assert len(ieioargs) == 1 125 return (objects.Disk.FromDict(ieioargs[0]), ) 126 127 if ieio == constants.IEIO_SCRIPT: 128 assert len(ieioargs) == 2 129 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 130 131 return ieioargs
132
133 134 -def _DefaultAlternative(value, default):
135 """Returns value or, if evaluating to False, a default value. 136 137 Returns the given value, unless it evaluates to False. In the latter case the 138 default value is returned. 139 140 @param value: Value to return if it doesn't evaluate to False 141 @param default: Default value 142 @return: Given value or the default 143 144 """ 145 if value: 146 return value 147 148 return default
149
150 151 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
152 """Subclass ensuring request handlers are locked in RAM. 153 154 """
155 - def __init__(self, *args, **kwargs):
156 utils.Mlockall() 157 158 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
159
160 161 -class NodeRequestHandler(http.server.HttpServerHandler):
162 """The server implementation. 163 164 This class holds all methods exposed over the RPC interface. 165 166 """ 167 # too many public methods, and unused args - all methods get params 168 # due to the API 169 # pylint: disable=R0904,W0613
170 - def __init__(self):
171 http.server.HttpServerHandler.__init__(self) 172 self.noded_pid = os.getpid()
173
174 - def HandleRequest(self, req):
175 """Handle a request. 176 177 """ 178 179 if req.request_method.upper() != http.HTTP_POST: 180 raise http.HttpBadRequest("Only the POST method is supported") 181 182 path = req.request_path 183 if path.startswith("/"): 184 path = path[1:] 185 186 method = getattr(self, "perspective_%s" % path, None) 187 if method is None: 188 raise http.HttpNotFound() 189 190 try: 191 result = (True, method(serializer.LoadJson(req.request_body))) 192 193 except backend.RPCFail, err: 194 # our custom failure exception; str(err) works fine if the 195 # exception was constructed with a single argument, and in 196 # this case, err.message == err.args[0] == str(err) 197 result = (False, str(err)) 198 except errors.QuitGanetiException, err: 199 # Tell parent to quit 200 logging.info("Shutting down the node daemon, arguments: %s", 201 str(err.args)) 202 os.kill(self.noded_pid, signal.SIGTERM) 203 # And return the error's arguments, which must be already in 204 # correct tuple format 205 result = err.args 206 except Exception, err: 207 logging.exception("Error in RPC call") 208 result = (False, "Error while executing backend function: %s" % str(err)) 209 210 return serializer.DumpJson(result)
211 212 # the new block devices -------------------------- 213 214 @staticmethod
215 - def perspective_blockdev_create(params):
216 """Create a block device. 217 218 """ 219 (bdev_s, size, owner, on_primary, info, excl_stor) = params 220 bdev = objects.Disk.FromDict(bdev_s) 221 if bdev is None: 222 raise ValueError("can't unserialize data!") 223 return backend.BlockdevCreate(bdev, size, owner, on_primary, info, 224 excl_stor)
225 226 @staticmethod
228 """Pause/resume sync of a block device. 229 230 """ 231 disks_s, pause = params 232 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 233 return backend.BlockdevPauseResumeSync(disks, pause)
234 235 @staticmethod
236 - def perspective_blockdev_wipe(params):
237 """Wipe a block device. 238 239 """ 240 bdev_s, offset, size = params 241 bdev = objects.Disk.FromDict(bdev_s) 242 return backend.BlockdevWipe(bdev, offset, size)
243 244 @staticmethod
245 - def perspective_blockdev_remove(params):
246 """Remove a block device. 247 248 """ 249 bdev_s = params[0] 250 bdev = objects.Disk.FromDict(bdev_s) 251 return backend.BlockdevRemove(bdev)
252 253 @staticmethod
254 - def perspective_blockdev_rename(params):
255 """Remove a block device. 256 257 """ 258 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 259 return backend.BlockdevRename(devlist)
260 261 @staticmethod
262 - def perspective_blockdev_assemble(params):
263 """Assemble a block device. 264 265 """ 266 bdev_s, idict, on_primary, idx = params 267 bdev = objects.Disk.FromDict(bdev_s) 268 instance = objects.Instance.FromDict(idict) 269 if bdev is None: 270 raise ValueError("can't unserialize data!") 271 return backend.BlockdevAssemble(bdev, instance, on_primary, idx)
272 273 @staticmethod
274 - def perspective_blockdev_shutdown(params):
275 """Shutdown a block device. 276 277 """ 278 bdev_s = params[0] 279 bdev = objects.Disk.FromDict(bdev_s) 280 if bdev is None: 281 raise ValueError("can't unserialize data!") 282 return backend.BlockdevShutdown(bdev)
283 284 @staticmethod
286 """Add a child to a mirror device. 287 288 Note: this is only valid for mirror devices. It's the caller's duty 289 to send a correct disk, otherwise we raise an error. 290 291 """ 292 bdev_s, ndev_s = params 293 bdev = objects.Disk.FromDict(bdev_s) 294 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 295 if bdev is None or ndevs.count(None) > 0: 296 raise ValueError("can't unserialize data!") 297 return backend.BlockdevAddchildren(bdev, ndevs)
298 299 @staticmethod
301 """Remove a child from a mirror device. 302 303 This is only valid for mirror devices, of course. It's the callers 304 duty to send a correct disk, otherwise we raise an error. 305 306 """ 307 bdev_s, ndev_s = params 308 bdev = objects.Disk.FromDict(bdev_s) 309 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 310 if bdev is None or ndevs.count(None) > 0: 311 raise ValueError("can't unserialize data!") 312 return backend.BlockdevRemovechildren(bdev, ndevs)
313 314 @staticmethod
316 """Return the mirror status for a list of disks. 317 318 """ 319 disks = [objects.Disk.FromDict(dsk_s) 320 for dsk_s in params[0]] 321 return [status.ToDict() 322 for status in backend.BlockdevGetmirrorstatus(disks)]
323 324 @staticmethod
326 """Return the mirror status for a list of disks. 327 328 """ 329 (node_disks, ) = params 330 331 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 332 333 result = [] 334 335 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 336 if success: 337 result.append((success, status.ToDict())) 338 else: 339 result.append((success, status)) 340 341 return result
342 343 @staticmethod
344 - def perspective_blockdev_find(params):
345 """Expose the FindBlockDevice functionality for a disk. 346 347 This will try to find but not activate a disk. 348 349 """ 350 disk = objects.Disk.FromDict(params[0]) 351 352 result = backend.BlockdevFind(disk) 353 if result is None: 354 return None 355 356 return result.ToDict()
357 358 @staticmethod
359 - def perspective_blockdev_snapshot(params):
360 """Create a snapshot device. 361 362 Note that this is only valid for LVM disks, if we get passed 363 something else we raise an exception. The snapshot device can be 364 remove by calling the generic block device remove call. 365 366 """ 367 cfbd = objects.Disk.FromDict(params[0]) 368 return backend.BlockdevSnapshot(cfbd)
369 370 @staticmethod
371 - def perspective_blockdev_grow(params):
372 """Grow a stack of devices. 373 374 """ 375 if len(params) < 5: 376 raise ValueError("Received only %s parameters in blockdev_grow," 377 " old master?" % len(params)) 378 cfbd = objects.Disk.FromDict(params[0]) 379 amount = params[1] 380 dryrun = params[2] 381 backingstore = params[3] 382 excl_stor = params[4] 383 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
384 385 @staticmethod
386 - def perspective_blockdev_close(params):
387 """Closes the given block devices. 388 389 """ 390 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 391 return backend.BlockdevClose(params[0], disks)
392 393 @staticmethod
395 """Compute the sizes of the given block devices. 396 397 """ 398 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 399 return backend.BlockdevGetdimensions(disks)
400 401 @staticmethod
402 - def perspective_blockdev_setinfo(params):
403 """Sets metadata information on the given block device. 404 405 """ 406 (disk, info) = params 407 disk = objects.Disk.FromDict(disk) 408 return backend.BlockdevSetInfo(disk, info)
409 410 # blockdev/drbd specific methods ---------- 411 412 @staticmethod
414 """Disconnects the network connection of drbd disks. 415 416 Note that this is only valid for drbd disks, so the members of the 417 disk list must all be drbd devices. 418 419 """ 420 (disks,) = params 421 disks = [objects.Disk.FromDict(disk) for disk in disks] 422 return backend.DrbdDisconnectNet(disks)
423 424 @staticmethod
425 - def perspective_drbd_attach_net(params):
426 """Attaches the network connection of drbd disks. 427 428 Note that this is only valid for drbd disks, so the members of the 429 disk list must all be drbd devices. 430 431 """ 432 disks, instance_name, multimaster = params 433 disks = [objects.Disk.FromDict(disk) for disk in disks] 434 return backend.DrbdAttachNet(disks, instance_name, multimaster)
435 436 @staticmethod
437 - def perspective_drbd_wait_sync(params):
438 """Wait until DRBD disks are synched. 439 440 Note that this is only valid for drbd disks, so the members of the 441 disk list must all be drbd devices. 442 443 """ 444 (disks,) = params 445 disks = [objects.Disk.FromDict(disk) for disk in disks] 446 return backend.DrbdWaitSync(disks)
447 448 @staticmethod
450 """Checks if the drbd devices need activation 451 452 Note that this is only valid for drbd disks, so the members of the 453 disk list must all be drbd devices. 454 455 """ 456 (disks,) = params 457 disks = [objects.Disk.FromDict(disk) for disk in disks] 458 return backend.DrbdNeedsActivation(disks)
459 460 @staticmethod
462 """Query drbd helper. 463 464 """ 465 return backend.GetDrbdUsermodeHelper()
466 467 # export/import -------------------------- 468 469 @staticmethod
470 - def perspective_finalize_export(params):
471 """Expose the finalize export functionality. 472 473 """ 474 instance = objects.Instance.FromDict(params[0]) 475 476 snap_disks = [] 477 for disk in params[1]: 478 if isinstance(disk, bool): 479 snap_disks.append(disk) 480 else: 481 snap_disks.append(objects.Disk.FromDict(disk)) 482 483 return backend.FinalizeExport(instance, snap_disks)
484 485 @staticmethod
486 - def perspective_export_info(params):
487 """Query information about an existing export on this node. 488 489 The given path may not contain an export, in which case we return 490 None. 491 492 """ 493 path = params[0] 494 return backend.ExportInfo(path)
495 496 @staticmethod
497 - def perspective_export_list(params):
498 """List the available exports on this node. 499 500 Note that as opposed to export_info, which may query data about an 501 export in any path, this only queries the standard Ganeti path 502 (pathutils.EXPORT_DIR). 503 504 """ 505 return backend.ListExports()
506 507 @staticmethod
508 - def perspective_export_remove(params):
509 """Remove an export. 510 511 """ 512 export = params[0] 513 return backend.RemoveExport(export)
514 515 # block device --------------------- 516 @staticmethod
517 - def perspective_bdev_sizes(params):
518 """Query the list of block devices 519 520 """ 521 devices = params[0] 522 return backend.GetBlockDevSizes(devices)
523 524 # volume -------------------------- 525 526 @staticmethod
527 - def perspective_lv_list(params):
528 """Query the list of logical volumes in a given volume group. 529 530 """ 531 vgname = params[0] 532 return backend.GetVolumeList(vgname)
533 534 @staticmethod
535 - def perspective_vg_list(params):
536 """Query the list of volume groups. 537 538 """ 539 return backend.ListVolumeGroups()
540 541 # Storage -------------------------- 542 543 @staticmethod
544 - def perspective_storage_list(params):
545 """Get list of storage units. 546 547 """ 548 (su_name, su_args, name, fields) = params 549 return container.GetStorage(su_name, *su_args).List(name, fields)
550 551 @staticmethod
552 - def perspective_storage_modify(params):
553 """Modify a storage unit. 554 555 """ 556 (su_name, su_args, name, changes) = params 557 return container.GetStorage(su_name, *su_args).Modify(name, changes)
558 559 @staticmethod
560 - def perspective_storage_execute(params):
561 """Execute an operation on a storage unit. 562 563 """ 564 (su_name, su_args, name, op) = params 565 return container.GetStorage(su_name, *su_args).Execute(name, op)
566 567 # bridge -------------------------- 568 569 @staticmethod
570 - def perspective_bridges_exist(params):
571 """Check if all bridges given exist on this node. 572 573 """ 574 bridges_list = params[0] 575 return backend.BridgesExist(bridges_list)
576 577 # instance -------------------------- 578 579 @staticmethod
580 - def perspective_instance_os_add(params):
581 """Install an OS on a given instance. 582 583 """ 584 inst_s = params[0] 585 inst = objects.Instance.FromDict(inst_s) 586 reinstall = params[1] 587 debug = params[2] 588 return backend.InstanceOsAdd(inst, reinstall, debug)
589 590 @staticmethod
592 """Runs the OS rename script for an instance. 593 594 """ 595 inst_s, old_name, debug = params 596 inst = objects.Instance.FromDict(inst_s) 597 return backend.RunRenameInstance(inst, old_name, debug)
598 599 @staticmethod
600 - def perspective_instance_shutdown(params):
601 """Shutdown an instance. 602 603 """ 604 instance = objects.Instance.FromDict(params[0]) 605 timeout = params[1] 606 trail = params[2] 607 _extendReasonTrail(trail, "shutdown") 608 return backend.InstanceShutdown(instance, timeout, trail)
609 610 @staticmethod
611 - def perspective_instance_start(params):
612 """Start an instance. 613 614 """ 615 (instance_name, startup_paused, trail) = params 616 instance = objects.Instance.FromDict(instance_name) 617 _extendReasonTrail(trail, "start") 618 return backend.StartInstance(instance, startup_paused, trail)
619 620 @staticmethod
621 - def perspective_hotplug_device(params):
622 """Hotplugs device to a running instance. 623 624 """ 625 (idict, action, dev_type, ddict, extra, seq) = params 626 instance = objects.Instance.FromDict(idict) 627 if dev_type == constants.HOTPLUG_TARGET_DISK: 628 device = objects.Disk.FromDict(ddict) 629 elif dev_type == constants.HOTPLUG_TARGET_NIC: 630 device = objects.NIC.FromDict(ddict) 631 else: 632 assert dev_type in constants.HOTPLUG_ALL_TARGETS 633 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
634 635 @staticmethod
636 - def perspective_hotplug_supported(params):
637 """Checks if hotplug is supported. 638 639 """ 640 instance = objects.Instance.FromDict(params[0]) 641 return backend.HotplugSupported(instance)
642 643 @staticmethod
644 - def perspective_migration_info(params):
645 """Gather information about an instance to be migrated. 646 647 """ 648 instance = objects.Instance.FromDict(params[0]) 649 return backend.MigrationInfo(instance)
650 651 @staticmethod
652 - def perspective_accept_instance(params):
653 """Prepare the node to accept an instance. 654 655 """ 656 instance, info, target = params 657 instance = objects.Instance.FromDict(instance) 658 return backend.AcceptInstance(instance, info, target)
659 660 @staticmethod
662 """Finalize the instance migration on the destination node. 663 664 """ 665 instance, info, success = params 666 instance = objects.Instance.FromDict(instance) 667 return backend.FinalizeMigrationDst(instance, info, success)
668 669 @staticmethod
670 - def perspective_instance_migrate(params):
671 """Migrates an instance. 672 673 """ 674 cluster_name, instance, target, live = params 675 instance = objects.Instance.FromDict(instance) 676 return backend.MigrateInstance(cluster_name, instance, target, live)
677 678 @staticmethod
680 """Finalize the instance migration on the source node. 681 682 """ 683 instance, success, live = params 684 instance = objects.Instance.FromDict(instance) 685 return backend.FinalizeMigrationSource(instance, success, live)
686 687 @staticmethod
689 """Reports migration status. 690 691 """ 692 instance = objects.Instance.FromDict(params[0]) 693 return backend.GetMigrationStatus(instance).ToDict()
694 695 @staticmethod
696 - def perspective_instance_reboot(params):
697 """Reboot an instance. 698 699 """ 700 instance = objects.Instance.FromDict(params[0]) 701 reboot_type = params[1] 702 shutdown_timeout = params[2] 703 trail = params[3] 704 _extendReasonTrail(trail, "reboot") 705 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout, 706 trail)
707 708 @staticmethod
710 """Modify instance runtime memory. 711 712 """ 713 instance_dict, memory = params 714 instance = objects.Instance.FromDict(instance_dict) 715 return backend.InstanceBalloonMemory(instance, memory)
716 717 @staticmethod
718 - def perspective_instance_info(params):
719 """Query instance information. 720 721 """ 722 (instance_name, hypervisor_name, hvparams) = params 723 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
724 725 @staticmethod
727 """Query whether the specified instance can be migrated. 728 729 """ 730 instance = objects.Instance.FromDict(params[0]) 731 return backend.GetInstanceMigratable(instance)
732 733 @staticmethod
735 """Query information about all instances. 736 737 """ 738 (hypervisor_list, all_hvparams) = params 739 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
740 741 @staticmethod
743 """Query information on how to get console access to instances 744 745 """ 746 return backend.GetInstanceConsoleInfo(params)
747 748 @staticmethod
749 - def perspective_instance_list(params):
750 """Query the list of running instances. 751 752 """ 753 (hypervisor_list, hvparams) = params 754 return backend.GetInstanceList(hypervisor_list, hvparams)
755 756 # node -------------------------- 757 758 @staticmethod
760 """Checks if a node has the given ip address. 761 762 """ 763 return netutils.IPAddress.Own(params[0])
764 765 @staticmethod
766 - def perspective_node_info(params):
767 """Query node information. 768 769 """ 770 (storage_units, hv_specs) = params 771 return backend.GetNodeInfo(storage_units, hv_specs)
772 773 @staticmethod
774 - def perspective_etc_hosts_modify(params):
775 """Modify a node entry in /etc/hosts. 776 777 """ 778 backend.EtcHostsModify(params[0], params[1], params[2]) 779 780 return True
781 782 @staticmethod
783 - def perspective_node_verify(params):
784 """Run a verify sequence on this node. 785 786 """ 787 (what, cluster_name, hvparams, node_groups, groups_cfg) = params 788 return backend.VerifyNode(what, cluster_name, hvparams, 789 node_groups, groups_cfg)
790 791 @classmethod
792 - def perspective_node_verify_light(cls, params):
793 """Run a light verify sequence on this node. 794 795 This call is meant to perform a less strict verification of the node in 796 certain situations. Right now, it is invoked only when a node is just about 797 to be added to a cluster, and even then, it performs the same checks as 798 L{perspective_node_verify}. 799 """ 800 return cls.perspective_node_verify(params)
801 802 @staticmethod
804 """Start the master daemons on this node. 805 806 """ 807 return backend.StartMasterDaemons(params[0])
808 809 @staticmethod
811 """Activate the master IP on this node. 812 813 """ 814 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 815 return backend.ActivateMasterIp(master_params, params[1])
816 817 @staticmethod
819 """Deactivate the master IP on this node. 820 821 """ 822 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 823 return backend.DeactivateMasterIp(master_params, params[1])
824 825 @staticmethod
826 - def perspective_node_stop_master(params):
827 """Stops master daemons on this node. 828 829 """ 830 return backend.StopMasterDaemons()
831 832 @staticmethod
834 """Change the master IP netmask. 835 836 """ 837 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 838 params[3])
839 840 @staticmethod
842 """Cleanup after leaving a cluster. 843 844 """ 845 return backend.LeaveCluster(params[0])
846 847 @staticmethod
848 - def perspective_node_volumes(params):
849 """Query the list of all logical volume groups. 850 851 """ 852 return backend.NodeVolumes()
853 854 @staticmethod
856 """Demote a node from the master candidate role. 857 858 """ 859 return backend.DemoteFromMC()
860 861 @staticmethod
862 - def perspective_node_powercycle(params):
863 """Tries to powercycle the node. 864 865 """ 866 (hypervisor_type, hvparams) = params 867 return backend.PowercycleNode(hypervisor_type, hvparams)
868 869 @staticmethod
871 """Sets up OpenvSwitch on the node. 872 873 """ 874 (ovs_name, ovs_link) = params 875 return backend.ConfigureOVS(ovs_name, ovs_link)
876 877 @staticmethod
879 """Gets the node's public crypto tokens. 880 881 """ 882 token_requests = params[0] 883 return backend.GetCryptoTokens(token_requests)
884 885 @staticmethod
887 """Ensure daemon is running. 888 889 """ 890 (daemon_name, run) = params 891 return backend.EnsureDaemon(daemon_name, run)
892 893 # cluster -------------------------- 894 895 @staticmethod
896 - def perspective_version(params):
897 """Query version information. 898 899 """ 900 return constants.PROTOCOL_VERSION
901 902 @staticmethod
903 - def perspective_upload_file(params):
904 """Upload a file. 905 906 Note that the backend implementation imposes strict rules on which 907 files are accepted. 908 909 """ 910 return backend.UploadFile(*(params[0]))
911 912 @staticmethod
913 - def perspective_master_node_name(params):
914 """Returns the master node name. 915 916 """ 917 return backend.GetMasterNodeName()
918 919 @staticmethod
920 - def perspective_run_oob(params):
921 """Runs oob on node. 922 923 """ 924 output = backend.RunOob(params[0], params[1], params[2], params[3]) 925 if output: 926 result = serializer.LoadJson(output) 927 else: 928 result = None 929 return result
930 931 @staticmethod
933 """Runs a restricted command. 934 935 """ 936 (cmd, ) = params 937 938 return backend.RunRestrictedCmd(cmd)
939 940 @staticmethod
942 """Write ssconf files. 943 944 """ 945 (values,) = params 946 return ssconf.WriteSsconfFiles(values)
947 948 @staticmethod
949 - def perspective_get_watcher_pause(params):
950 """Get watcher pause end. 951 952 """ 953 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
954 955 @staticmethod
956 - def perspective_set_watcher_pause(params):
957 """Set watcher pause. 958 959 """ 960 (until, ) = params 961 return backend.SetWatcherPause(until)
962 963 # os ----------------------- 964 965 @staticmethod
966 - def perspective_os_diagnose(params):
967 """Query detailed information about existing OSes. 968 969 """ 970 return backend.DiagnoseOS()
971 972 @staticmethod
973 - def perspective_os_get(params):
974 """Query information about a given OS. 975 976 """ 977 name = params[0] 978 os_obj = backend.OSFromDisk(name) 979 return os_obj.ToDict()
980 981 @staticmethod
982 - def perspective_os_validate(params):
983 """Run a given OS' validation routine. 984 985 """ 986 required, name, checks, params = params 987 return backend.ValidateOS(required, name, checks, params)
988 989 # extstorage ----------------------- 990 991 @staticmethod
993 """Query detailed information about existing extstorage providers. 994 995 """ 996 return backend.DiagnoseExtStorage()
997 998 # hooks ----------------------- 999 1000 @staticmethod
1001 - def perspective_hooks_runner(params):
1002 """Run hook scripts. 1003 1004 """ 1005 hpath, phase, env = params 1006 hr = backend.HooksRunner() 1007 return hr.RunHooks(hpath, phase, env)
1008 1009 # iallocator ----------------- 1010 1011 @staticmethod
1012 - def perspective_iallocator_runner(params):
1013 """Run an iallocator script. 1014 1015 """ 1016 name, idata, ial_params_dict = params 1017 ial_params = [] 1018 for ial_param in ial_params_dict.items(): 1019 ial_params.append("--" + ial_param[0] + "=" + ial_param[1]) 1020 iar = backend.IAllocatorRunner() 1021 return iar.Run(name, idata, ial_params)
1022 1023 # test ----------------------- 1024 1025 @staticmethod
1026 - def perspective_test_delay(params):
1027 """Run test delay. 1028 1029 """ 1030 duration = params[0] 1031 status, rval = utils.TestDelay(duration) 1032 if not status: 1033 raise backend.RPCFail(rval) 1034 return rval
1035 1036 # file storage --------------- 1037 1038 @staticmethod
1040 """Create the file storage directory. 1041 1042 """ 1043 file_storage_dir = params[0] 1044 return backend.CreateFileStorageDir(file_storage_dir)
1045 1046 @staticmethod
1048 """Remove the file storage directory. 1049 1050 """ 1051 file_storage_dir = params[0] 1052 return backend.RemoveFileStorageDir(file_storage_dir)
1053 1054 @staticmethod
1056 """Rename the file storage directory. 1057 1058 """ 1059 old_file_storage_dir = params[0] 1060 new_file_storage_dir = params[1] 1061 return backend.RenameFileStorageDir(old_file_storage_dir, 1062 new_file_storage_dir)
1063 1064 # jobs ------------------------ 1065 1066 @staticmethod 1067 @_RequireJobQueueLock
1068 - def perspective_jobqueue_update(params):
1069 """Update job queue. 1070 1071 """ 1072 (file_name, content) = params 1073 return backend.JobQueueUpdate(file_name, content)
1074 1075 @staticmethod 1076 @_RequireJobQueueLock
1077 - def perspective_jobqueue_purge(params):
1078 """Purge job queue. 1079 1080 """ 1081 return backend.JobQueuePurge()
1082 1083 @staticmethod 1084 @_RequireJobQueueLock
1085 - def perspective_jobqueue_rename(params):
1086 """Rename a job queue file. 1087 1088 """ 1089 # TODO: What if a file fails to rename? 1090 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1091 1092 @staticmethod 1093 @_RequireJobQueueLock
1095 """Set job queue's drain flag. 1096 1097 """ 1098 (flag, ) = params 1099 1100 return jstore.SetDrainFlag(flag)
1101 1102 # hypervisor --------------- 1103 1104 @staticmethod
1106 """Validate the hypervisor parameters. 1107 1108 """ 1109 (hvname, hvparams) = params 1110 return backend.ValidateHVParams(hvname, hvparams)
1111 1112 # Crypto 1113 1114 @staticmethod
1115 - def perspective_x509_cert_create(params):
1116 """Creates a new X509 certificate for SSL/TLS. 1117 1118 """ 1119 (validity, ) = params 1120 return backend.CreateX509Certificate(validity)
1121 1122 @staticmethod
1123 - def perspective_x509_cert_remove(params):
1124 """Removes a X509 certificate. 1125 1126 """ 1127 (name, ) = params 1128 return backend.RemoveX509Certificate(name)
1129 1130 # Import and export 1131 1132 @staticmethod
1133 - def perspective_import_start(params):
1134 """Starts an import daemon. 1135 1136 """ 1137 (opts_s, instance, component, (dest, dest_args)) = params 1138 1139 opts = objects.ImportExportOptions.FromDict(opts_s) 1140 1141 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 1142 None, None, 1143 objects.Instance.FromDict(instance), 1144 component, dest, 1145 _DecodeImportExportIO(dest, 1146 dest_args))
1147 1148 @staticmethod
1149 - def perspective_export_start(params):
1150 """Starts an export daemon. 1151 1152 """ 1153 (opts_s, host, port, instance, component, (source, source_args)) = params 1154 1155 opts = objects.ImportExportOptions.FromDict(opts_s) 1156 1157 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 1158 host, port, 1159 objects.Instance.FromDict(instance), 1160 component, source, 1161 _DecodeImportExportIO(source, 1162 source_args))
1163 1164 @staticmethod
1165 - def perspective_impexp_status(params):
1166 """Retrieves the status of an import or export daemon. 1167 1168 """ 1169 return backend.GetImportExportStatus(params[0])
1170 1171 @staticmethod
1172 - def perspective_impexp_abort(params):
1173 """Aborts an import or export. 1174 1175 """ 1176 return backend.AbortImportExport(params[0])
1177 1178 @staticmethod
1179 - def perspective_impexp_cleanup(params):
1180 """Cleans up after an import or export. 1181 1182 """ 1183 return backend.CleanupImportExport(params[0])
1184
1185 1186 -def CheckNoded(_, args):
1187 """Initial checks whether to run or exit with a failure. 1188 1189 """ 1190 if args: # noded doesn't take any arguments 1191 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1192 sys.argv[0]) 1193 sys.exit(constants.EXIT_FAILURE) 1194 try: 1195 codecs.lookup("string-escape") 1196 except LookupError: 1197 print >> sys.stderr, ("Can't load the string-escape code which is part" 1198 " of the Python installation. Is your installation" 1199 " complete/correct? Aborting.") 1200 sys.exit(constants.EXIT_FAILURE)
1201
1202 1203 -def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1204 """Callback function to verify a peer against the candidate cert map. 1205 1206 Note that we have a chicken-and-egg problem during cluster init and upgrade. 1207 This method checks whether the incoming connection comes from a master 1208 candidate by comparing it to the master certificate map in the cluster 1209 configuration. However, during cluster init and cluster upgrade there 1210 are various RPC calls done to the master node itself, before the candidate 1211 certificate list is established and the cluster configuration is written. 1212 In this case, we cannot check against the master candidate map. 1213 1214 This problem is solved by checking whether the candidate map is empty. An 1215 initialized 2.11 or higher cluster has at least one entry for the master 1216 node in the candidate map. If the map is empty, we know that we are still 1217 in the bootstrap/upgrade phase. In this case, we read the server certificate 1218 digest and compare it to the incoming request. 1219 1220 This means that after an upgrade of Ganeti, the system continues to operate 1221 like before, using server certificates only. After the client certificates 1222 are generated with ``gnt-cluster renew-crypto --new-node-certificates``, 1223 RPC communication is switched to using client certificates and the trick of 1224 using server certificates does not work anymore. 1225 1226 @type conn: C{OpenSSL.SSL.Connection} 1227 @param conn: the OpenSSL connection object 1228 @type cert: C{OpenSSL.X509} 1229 @param cert: the peer's SSL certificate 1230 1231 """ 1232 # some parameters are unused, but this is the API 1233 # pylint: disable=W0613 1234 _BOOTSTRAP = "bootstrap" 1235 sstore = ssconf.SimpleStore() 1236 try: 1237 candidate_certs = sstore.GetMasterCandidatesCertMap() 1238 except errors.ConfigurationError: 1239 logging.info("No candidate certificates found. Switching to " 1240 "bootstrap/update mode.") 1241 candidate_certs = None 1242 if not candidate_certs: 1243 candidate_certs = { 1244 _BOOTSTRAP: utils.GetCertificateDigest( 1245 cert_filename=pathutils.NODED_CERT_FILE)} 1246 return cert.digest("sha1") in candidate_certs.values()
1247 # pylint: enable=W0613
1248 1249 1250 -def PrepNoded(options, _):
1251 """Preparation node daemon function, executed with the PID file held. 1252 1253 """ 1254 if options.mlock: 1255 request_executor_class = MlockallRequestExecutor 1256 try: 1257 utils.Mlockall() 1258 except errors.NoCtypesError: 1259 logging.warning("Cannot set memory lock, ctypes module not found") 1260 request_executor_class = http.server.HttpServerRequestExecutor 1261 else: 1262 request_executor_class = http.server.HttpServerRequestExecutor 1263 1264 # Read SSL certificate 1265 if options.ssl: 1266 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1267 ssl_cert_path=options.ssl_cert) 1268 else: 1269 ssl_params = None 1270 1271 err = _PrepareQueueLock() 1272 if err is not None: 1273 # this might be some kind of file-system/permission error; while 1274 # this breaks the job queue functionality, we shouldn't prevent 1275 # startup of the whole node daemon because of this 1276 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1277 1278 handler = NodeRequestHandler() 1279 1280 mainloop = daemon.Mainloop() 1281 server = \ 1282 http.server.HttpServer(mainloop, options.bind_address, options.port, 1283 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1284 request_executor_class=request_executor_class, 1285 ssl_verify_callback=SSLVerifyPeer) 1286 server.Start() 1287 1288 return (mainloop, server)
1289
1290 1291 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1292 """Main node daemon function, executed with the PID file held. 1293 1294 """ 1295 (mainloop, server) = prep_data 1296 try: 1297 mainloop.Run() 1298 finally: 1299 server.Stop() 1300
1301 1302 -def Main():
1303 """Main function for the node daemon. 1304 1305 """ 1306 parser = OptionParser(description="Ganeti node daemon", 1307 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]" 1308 " [-i INTERFACE]"), 1309 version="%%prog (ganeti) %s" % 1310 constants.RELEASE_VERSION) 1311 parser.add_option("--no-mlock", dest="mlock", 1312 help="Do not mlock the node memory in ram", 1313 default=True, action="store_false") 1314 1315 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1316 default_ssl_cert=pathutils.NODED_CERT_FILE, 1317 default_ssl_key=pathutils.NODED_CERT_FILE, 1318 console_logging=True)
1319