Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Ganeti node daemon""" 
  32   
  33  # pylint: disable=C0103,W0142 
  34   
  35  # C0103: Functions in this module need to have a given name structure, 
  36  # and the name of the daemon doesn't match 
  37   
  38  # W0142: Used * or ** magic, since we do use it extensively in this 
  39  # module 
  40   
  41  import os 
  42  import sys 
  43  import logging 
  44  import signal 
  45  import codecs 
  46   
  47  from optparse import OptionParser 
  48   
  49  from ganeti import backend 
  50  from ganeti import constants 
  51  from ganeti import objects 
  52  from ganeti import errors 
  53  from ganeti import jstore 
  54  from ganeti import daemon 
  55  from ganeti import http 
  56  from ganeti import utils 
  57  from ganeti.storage import container 
  58  from ganeti import serializer 
  59  from ganeti import netutils 
  60  from ganeti import pathutils 
  61  from ganeti import ssconf 
  62   
  63  import ganeti.http.server # pylint: disable=W0611 
  64   
  65   
  66  queue_lock = None 
67 68 69 -def _extendReasonTrail(trail, source, reason=""):
70 """Extend the reason trail with noded information 71 72 The trail is extended by appending the name of the noded functionality 73 """ 74 assert trail is not None 75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source) 76 trail.append((trail_source, reason, utils.EpochNano()))
77
78 79 -def _PrepareQueueLock():
80 """Try to prepare the queue lock. 81 82 @return: None for success, otherwise an exception object 83 84 """ 85 global queue_lock # pylint: disable=W0603 86 87 if queue_lock is not None: 88 return None 89 90 # Prepare job queue 91 try: 92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 93 return None 94 except EnvironmentError, err: 95 return err
96
97 98 -def _RequireJobQueueLock(fn):
99 """Decorator for job queue manipulating functions. 100 101 """ 102 QUEUE_LOCK_TIMEOUT = 10 103 104 def wrapper(*args, **kwargs): 105 # Locking in exclusive, blocking mode because there could be several 106 # children running at the same time. Waiting up to 10 seconds. 107 if _PrepareQueueLock() is not None: 108 raise errors.JobQueueError("Job queue failed initialization," 109 " cannot update jobs") 110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 111 try: 112 return fn(*args, **kwargs) 113 finally: 114 queue_lock.Unlock()
115 116 return wrapper 117
118 119 -def _DecodeImportExportIO(ieio, ieioargs):
120 """Decodes import/export I/O information. 121 122 """ 123 if ieio == constants.IEIO_RAW_DISK: 124 assert len(ieioargs) == 1 125 return (objects.Disk.FromDict(ieioargs[0]), ) 126 127 if ieio == constants.IEIO_SCRIPT: 128 assert len(ieioargs) == 2 129 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 130 131 return ieioargs
132
133 134 -def _DefaultAlternative(value, default):
135 """Returns value or, if evaluating to False, a default value. 136 137 Returns the given value, unless it evaluates to False. In the latter case the 138 default value is returned. 139 140 @param value: Value to return if it doesn't evaluate to False 141 @param default: Default value 142 @return: Given value or the default 143 144 """ 145 if value: 146 return value 147 148 return default
149
150 151 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
152 """Subclass ensuring request handlers are locked in RAM. 153 154 """
155 - def __init__(self, *args, **kwargs):
156 utils.Mlockall() 157 158 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
159
160 161 -class NodeRequestHandler(http.server.HttpServerHandler):
162 """The server implementation. 163 164 This class holds all methods exposed over the RPC interface. 165 166 """ 167 # too many public methods, and unused args - all methods get params 168 # due to the API 169 # pylint: disable=R0904,W0613
170 - def __init__(self):
171 http.server.HttpServerHandler.__init__(self) 172 self.noded_pid = os.getpid()
173
174 - def HandleRequest(self, req):
175 """Handle a request. 176 177 """ 178 179 if req.request_method.upper() != http.HTTP_POST: 180 raise http.HttpBadRequest("Only the POST method is supported") 181 182 path = req.request_path 183 if path.startswith("/"): 184 path = path[1:] 185 186 method = getattr(self, "perspective_%s" % path, None) 187 if method is None: 188 raise http.HttpNotFound() 189 190 try: 191 result = (True, method(serializer.LoadJson(req.request_body))) 192 193 except backend.RPCFail, err: 194 # our custom failure exception; str(err) works fine if the 195 # exception was constructed with a single argument, and in 196 # this case, err.message == err.args[0] == str(err) 197 result = (False, str(err)) 198 except errors.QuitGanetiException, err: 199 # Tell parent to quit 200 logging.info("Shutting down the node daemon, arguments: %s", 201 str(err.args)) 202 os.kill(self.noded_pid, signal.SIGTERM) 203 # And return the error's arguments, which must be already in 204 # correct tuple format 205 result = err.args 206 except Exception, err: 207 logging.exception("Error in RPC call") 208 result = (False, "Error while executing backend function: %s" % str(err)) 209 210 return serializer.DumpJson(result)
211 212 # the new block devices -------------------------- 213 214 @staticmethod
215 - def perspective_blockdev_create(params):
216 """Create a block device. 217 218 """ 219 (bdev_s, size, owner, on_primary, info, excl_stor) = params 220 bdev = objects.Disk.FromDict(bdev_s) 221 if bdev is None: 222 raise ValueError("can't unserialize data!") 223 return backend.BlockdevCreate(bdev, size, owner, on_primary, info, 224 excl_stor)
225 226 @staticmethod
227 - def perspective_blockdev_convert(params):
228 """Copy data from source block device to target. 229 230 """ 231 disk_src, disk_dest = params 232 bdev_src = objects.Disk.FromDict(disk_src) 233 bdev_dest = objects.Disk.FromDict(disk_dest) 234 return backend.BlockdevConvert(bdev_src, bdev_dest)
235 236 @staticmethod
238 """Pause/resume sync of a block device. 239 240 """ 241 disks_s, pause = params 242 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 243 return backend.BlockdevPauseResumeSync(disks, pause)
244 245 @staticmethod
246 - def perspective_blockdev_image(params):
247 """Image a block device. 248 249 """ 250 bdev_s, image, size = params 251 bdev = objects.Disk.FromDict(bdev_s) 252 return backend.BlockdevImage(bdev, image, size)
253 254 @staticmethod
255 - def perspective_blockdev_wipe(params):
256 """Wipe a block device. 257 258 """ 259 bdev_s, offset, size = params 260 bdev = objects.Disk.FromDict(bdev_s) 261 return backend.BlockdevWipe(bdev, offset, size)
262 263 @staticmethod
264 - def perspective_blockdev_remove(params):
265 """Remove a block device. 266 267 """ 268 bdev_s = params[0] 269 bdev = objects.Disk.FromDict(bdev_s) 270 return backend.BlockdevRemove(bdev)
271 272 @staticmethod
273 - def perspective_blockdev_rename(params):
274 """Remove a block device. 275 276 """ 277 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 278 return backend.BlockdevRename(devlist)
279 280 @staticmethod
281 - def perspective_blockdev_assemble(params):
282 """Assemble a block device. 283 284 """ 285 bdev_s, idict, on_primary, idx = params 286 bdev = objects.Disk.FromDict(bdev_s) 287 instance = objects.Instance.FromDict(idict) 288 if bdev is None: 289 raise ValueError("can't unserialize data!") 290 return backend.BlockdevAssemble(bdev, instance, on_primary, idx)
291 292 @staticmethod
293 - def perspective_blockdev_shutdown(params):
294 """Shutdown a block device. 295 296 """ 297 bdev_s = params[0] 298 bdev = objects.Disk.FromDict(bdev_s) 299 if bdev is None: 300 raise ValueError("can't unserialize data!") 301 return backend.BlockdevShutdown(bdev)
302 303 @staticmethod
305 """Add a child to a mirror device. 306 307 Note: this is only valid for mirror devices. It's the caller's duty 308 to send a correct disk, otherwise we raise an error. 309 310 """ 311 bdev_s, ndev_s = params 312 bdev = objects.Disk.FromDict(bdev_s) 313 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 314 if bdev is None or ndevs.count(None) > 0: 315 raise ValueError("can't unserialize data!") 316 return backend.BlockdevAddchildren(bdev, ndevs)
317 318 @staticmethod
320 """Remove a child from a mirror device. 321 322 This is only valid for mirror devices, of course. It's the callers 323 duty to send a correct disk, otherwise we raise an error. 324 325 """ 326 bdev_s, ndev_s = params 327 bdev = objects.Disk.FromDict(bdev_s) 328 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 329 if bdev is None or ndevs.count(None) > 0: 330 raise ValueError("can't unserialize data!") 331 return backend.BlockdevRemovechildren(bdev, ndevs)
332 333 @staticmethod
335 """Return the mirror status for a list of disks. 336 337 """ 338 disks = [objects.Disk.FromDict(dsk_s) 339 for dsk_s in params[0]] 340 return [status.ToDict() 341 for status in backend.BlockdevGetmirrorstatus(disks)]
342 343 @staticmethod
345 """Return the mirror status for a list of disks. 346 347 """ 348 (node_disks, ) = params 349 350 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 351 352 result = [] 353 354 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 355 if success: 356 result.append((success, status.ToDict())) 357 else: 358 result.append((success, status)) 359 360 return result
361 362 @staticmethod
363 - def perspective_blockdev_find(params):
364 """Expose the FindBlockDevice functionality for a disk. 365 366 This will try to find but not activate a disk. 367 368 """ 369 disk = objects.Disk.FromDict(params[0]) 370 371 result = backend.BlockdevFind(disk) 372 if result is None: 373 return None 374 375 return result.ToDict()
376 377 @staticmethod
378 - def perspective_blockdev_snapshot(params):
379 """Create a snapshot device. 380 381 Note that this is only valid for LVM and ExtStorage disks, if we get passed 382 something else we raise an exception. The snapshot device can be 383 remove by calling the generic block device remove call. 384 385 """ 386 (disk, snap_name, snap_size) = params 387 cfbd = objects.Disk.FromDict(disk) 388 return backend.BlockdevSnapshot(cfbd, snap_name, snap_size)
389 390 @staticmethod
391 - def perspective_blockdev_grow(params):
392 """Grow a stack of devices. 393 394 """ 395 if len(params) < 5: 396 raise ValueError("Received only %s parameters in blockdev_grow," 397 " old master?" % len(params)) 398 cfbd = objects.Disk.FromDict(params[0]) 399 amount = params[1] 400 dryrun = params[2] 401 backingstore = params[3] 402 excl_stor = params[4] 403 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
404 405 @staticmethod
406 - def perspective_blockdev_close(params):
407 """Closes the given block devices. 408 409 """ 410 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 411 return backend.BlockdevClose(params[0], disks)
412 413 @staticmethod
415 """Compute the sizes of the given block devices. 416 417 """ 418 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 419 return backend.BlockdevGetdimensions(disks)
420 421 @staticmethod
422 - def perspective_blockdev_setinfo(params):
423 """Sets metadata information on the given block device. 424 425 """ 426 (disk, info) = params 427 disk = objects.Disk.FromDict(disk) 428 return backend.BlockdevSetInfo(disk, info)
429 430 # blockdev/drbd specific methods ---------- 431 432 @staticmethod
434 """Disconnects the network connection of drbd disks. 435 436 Note that this is only valid for drbd disks, so the members of the 437 disk list must all be drbd devices. 438 439 """ 440 (disks,) = params 441 disks = [objects.Disk.FromDict(disk) for disk in disks] 442 return backend.DrbdDisconnectNet(disks)
443 444 @staticmethod
445 - def perspective_drbd_attach_net(params):
446 """Attaches the network connection of drbd disks. 447 448 Note that this is only valid for drbd disks, so the members of the 449 disk list must all be drbd devices. 450 451 """ 452 disks, instance_name, multimaster = params 453 disks = [objects.Disk.FromDict(disk) for disk in disks] 454 return backend.DrbdAttachNet(disks, instance_name, multimaster)
455 456 @staticmethod
457 - def perspective_drbd_wait_sync(params):
458 """Wait until DRBD disks are synched. 459 460 Note that this is only valid for drbd disks, so the members of the 461 disk list must all be drbd devices. 462 463 """ 464 (disks,) = params 465 disks = [objects.Disk.FromDict(disk) for disk in disks] 466 return backend.DrbdWaitSync(disks)
467 468 @staticmethod
470 """Checks if the drbd devices need activation 471 472 Note that this is only valid for drbd disks, so the members of the 473 disk list must all be drbd devices. 474 475 """ 476 (disks,) = params 477 disks = [objects.Disk.FromDict(disk) for disk in disks] 478 return backend.DrbdNeedsActivation(disks)
479 480 @staticmethod
482 """Query drbd helper. 483 484 """ 485 return backend.GetDrbdUsermodeHelper()
486 487 # export/import -------------------------- 488 489 @staticmethod
490 - def perspective_finalize_export(params):
491 """Expose the finalize export functionality. 492 493 """ 494 instance = objects.Instance.FromDict(params[0]) 495 496 snap_disks = [] 497 for disk in params[1]: 498 if isinstance(disk, bool): 499 snap_disks.append(disk) 500 else: 501 snap_disks.append(objects.Disk.FromDict(disk)) 502 503 return backend.FinalizeExport(instance, snap_disks)
504 505 @staticmethod
506 - def perspective_export_info(params):
507 """Query information about an existing export on this node. 508 509 The given path may not contain an export, in which case we return 510 None. 511 512 """ 513 path = params[0] 514 return backend.ExportInfo(path)
515 516 @staticmethod
517 - def perspective_export_list(params):
518 """List the available exports on this node. 519 520 Note that as opposed to export_info, which may query data about an 521 export in any path, this only queries the standard Ganeti path 522 (pathutils.EXPORT_DIR). 523 524 """ 525 return backend.ListExports()
526 527 @staticmethod
528 - def perspective_export_remove(params):
529 """Remove an export. 530 531 """ 532 export = params[0] 533 return backend.RemoveExport(export)
534 535 # block device --------------------- 536 @staticmethod
537 - def perspective_bdev_sizes(params):
538 """Query the list of block devices 539 540 """ 541 devices = params[0] 542 return backend.GetBlockDevSizes(devices)
543 544 # volume -------------------------- 545 546 @staticmethod
547 - def perspective_lv_list(params):
548 """Query the list of logical volumes in a given volume group. 549 550 """ 551 vgname = params[0] 552 return backend.GetVolumeList(vgname)
553 554 @staticmethod
555 - def perspective_vg_list(params):
556 """Query the list of volume groups. 557 558 """ 559 return backend.ListVolumeGroups()
560 561 # Storage -------------------------- 562 563 @staticmethod
564 - def perspective_storage_list(params):
565 """Get list of storage units. 566 567 """ 568 (su_name, su_args, name, fields) = params 569 return container.GetStorage(su_name, *su_args).List(name, fields)
570 571 @staticmethod
572 - def perspective_storage_modify(params):
573 """Modify a storage unit. 574 575 """ 576 (su_name, su_args, name, changes) = params 577 return container.GetStorage(su_name, *su_args).Modify(name, changes)
578 579 @staticmethod
580 - def perspective_storage_execute(params):
581 """Execute an operation on a storage unit. 582 583 """ 584 (su_name, su_args, name, op) = params 585 return container.GetStorage(su_name, *su_args).Execute(name, op)
586 587 # bridge -------------------------- 588 589 @staticmethod
590 - def perspective_bridges_exist(params):
591 """Check if all bridges given exist on this node. 592 593 """ 594 bridges_list = params[0] 595 return backend.BridgesExist(bridges_list)
596 597 # instance -------------------------- 598 599 @staticmethod
600 - def perspective_instance_os_add(params):
601 """Install an OS on a given instance. 602 603 """ 604 inst_s = params[0] 605 inst = objects.Instance.FromDict(inst_s) 606 reinstall = params[1] 607 debug = params[2] 608 return backend.InstanceOsAdd(inst, reinstall, debug)
609 610 @staticmethod
612 """Runs the OS rename script for an instance. 613 614 """ 615 inst_s, old_name, debug = params 616 inst = objects.Instance.FromDict(inst_s) 617 return backend.RunRenameInstance(inst, old_name, debug)
618 619 @staticmethod
620 - def perspective_instance_shutdown(params):
621 """Shutdown an instance. 622 623 """ 624 instance = objects.Instance.FromDict(params[0]) 625 timeout = params[1] 626 trail = params[2] 627 _extendReasonTrail(trail, "shutdown") 628 return backend.InstanceShutdown(instance, timeout, trail)
629 630 @staticmethod
631 - def perspective_instance_start(params):
632 """Start an instance. 633 634 """ 635 (instance_name, startup_paused, trail) = params 636 instance = objects.Instance.FromDict(instance_name) 637 _extendReasonTrail(trail, "start") 638 return backend.StartInstance(instance, startup_paused, trail)
639 640 @staticmethod
641 - def perspective_hotplug_device(params):
642 """Hotplugs device to a running instance. 643 644 """ 645 (idict, action, dev_type, ddict, extra, seq) = params 646 instance = objects.Instance.FromDict(idict) 647 if dev_type == constants.HOTPLUG_TARGET_DISK: 648 device = objects.Disk.FromDict(ddict) 649 elif dev_type == constants.HOTPLUG_TARGET_NIC: 650 device = objects.NIC.FromDict(ddict) 651 else: 652 assert dev_type in constants.HOTPLUG_ALL_TARGETS 653 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
654 655 @staticmethod
656 - def perspective_hotplug_supported(params):
657 """Checks if hotplug is supported. 658 659 """ 660 instance = objects.Instance.FromDict(params[0]) 661 return backend.HotplugSupported(instance)
662 663 @staticmethod
665 """Modify instance metadata. 666 667 """ 668 instance = params[0] 669 return backend.ModifyInstanceMetadata(instance)
670 671 @staticmethod
672 - def perspective_migration_info(params):
673 """Gather information about an instance to be migrated. 674 675 """ 676 instance = objects.Instance.FromDict(params[0]) 677 return backend.MigrationInfo(instance)
678 679 @staticmethod
680 - def perspective_accept_instance(params):
681 """Prepare the node to accept an instance. 682 683 """ 684 instance, info, target = params 685 instance = objects.Instance.FromDict(instance) 686 return backend.AcceptInstance(instance, info, target)
687 688 @staticmethod
690 """Finalize the instance migration on the destination node. 691 692 """ 693 instance, info, success = params 694 instance = objects.Instance.FromDict(instance) 695 return backend.FinalizeMigrationDst(instance, info, success)
696 697 @staticmethod
698 - def perspective_instance_migrate(params):
699 """Migrates an instance. 700 701 """ 702 cluster_name, instance, target, live = params 703 instance = objects.Instance.FromDict(instance) 704 return backend.MigrateInstance(cluster_name, instance, target, live)
705 706 @staticmethod
708 """Finalize the instance migration on the source node. 709 710 """ 711 instance, success, live = params 712 instance = objects.Instance.FromDict(instance) 713 return backend.FinalizeMigrationSource(instance, success, live)
714 715 @staticmethod
717 """Reports migration status. 718 719 """ 720 instance = objects.Instance.FromDict(params[0]) 721 return backend.GetMigrationStatus(instance).ToDict()
722 723 @staticmethod
724 - def perspective_instance_reboot(params):
725 """Reboot an instance. 726 727 """ 728 instance = objects.Instance.FromDict(params[0]) 729 reboot_type = params[1] 730 shutdown_timeout = params[2] 731 trail = params[3] 732 _extendReasonTrail(trail, "reboot") 733 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout, 734 trail)
735 736 @staticmethod
738 """Modify instance runtime memory. 739 740 """ 741 instance_dict, memory = params 742 instance = objects.Instance.FromDict(instance_dict) 743 return backend.InstanceBalloonMemory(instance, memory)
744 745 @staticmethod
746 - def perspective_instance_info(params):
747 """Query instance information. 748 749 """ 750 (instance_name, hypervisor_name, hvparams) = params 751 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
752 753 @staticmethod
755 """Query whether the specified instance can be migrated. 756 757 """ 758 instance = objects.Instance.FromDict(params[0]) 759 return backend.GetInstanceMigratable(instance)
760 761 @staticmethod
763 """Query information about all instances. 764 765 """ 766 (hypervisor_list, all_hvparams) = params 767 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
768 769 @staticmethod
771 """Query information on how to get console access to instances 772 773 """ 774 return backend.GetInstanceConsoleInfo(params)
775 776 @staticmethod
777 - def perspective_instance_list(params):
778 """Query the list of running instances. 779 780 """ 781 (hypervisor_list, hvparams) = params 782 return backend.GetInstanceList(hypervisor_list, hvparams)
783 784 # node -------------------------- 785 786 @staticmethod
788 """Checks if a node has the given ip address. 789 790 """ 791 return netutils.IPAddress.Own(params[0])
792 793 @staticmethod
794 - def perspective_node_info(params):
795 """Query node information. 796 797 """ 798 (storage_units, hv_specs) = params 799 return backend.GetNodeInfo(storage_units, hv_specs)
800 801 @staticmethod
802 - def perspective_etc_hosts_modify(params):
803 """Modify a node entry in /etc/hosts. 804 805 """ 806 backend.EtcHostsModify(params[0], params[1], params[2]) 807 808 return True
809 810 @staticmethod
811 - def perspective_node_verify(params):
812 """Run a verify sequence on this node. 813 814 """ 815 (what, cluster_name, hvparams, node_groups, groups_cfg) = params 816 return backend.VerifyNode(what, cluster_name, hvparams, 817 node_groups, groups_cfg)
818 819 @classmethod
820 - def perspective_node_verify_light(cls, params):
821 """Run a light verify sequence on this node. 822 823 This call is meant to perform a less strict verification of the node in 824 certain situations. Right now, it is invoked only when a node is just about 825 to be added to a cluster, and even then, it performs the same checks as 826 L{perspective_node_verify}. 827 """ 828 return cls.perspective_node_verify(params)
829 830 @staticmethod
832 """Start the master daemons on this node. 833 834 """ 835 return backend.StartMasterDaemons(params[0])
836 837 @staticmethod
839 """Activate the master IP on this node. 840 841 """ 842 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 843 return backend.ActivateMasterIp(master_params, params[1])
844 845 @staticmethod
847 """Deactivate the master IP on this node. 848 849 """ 850 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 851 return backend.DeactivateMasterIp(master_params, params[1])
852 853 @staticmethod
854 - def perspective_node_stop_master(params):
855 """Stops master daemons on this node. 856 857 """ 858 return backend.StopMasterDaemons()
859 860 @staticmethod
862 """Change the master IP netmask. 863 864 """ 865 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 866 params[3])
867 868 @staticmethod
870 """Cleanup after leaving a cluster. 871 872 """ 873 return backend.LeaveCluster(params[0])
874 875 @staticmethod
876 - def perspective_node_volumes(params):
877 """Query the list of all logical volume groups. 878 879 """ 880 return backend.NodeVolumes()
881 882 @staticmethod
884 """Demote a node from the master candidate role. 885 886 """ 887 return backend.DemoteFromMC()
888 889 @staticmethod
890 - def perspective_node_powercycle(params):
891 """Tries to powercycle the node. 892 893 """ 894 (hypervisor_type, hvparams) = params 895 return backend.PowercycleNode(hypervisor_type, hvparams)
896 897 @staticmethod
899 """Sets up OpenvSwitch on the node. 900 901 """ 902 (ovs_name, ovs_link) = params 903 return backend.ConfigureOVS(ovs_name, ovs_link)
904 905 @staticmethod
907 """Gets the node's public crypto tokens. 908 909 """ 910 token_requests = params[0] 911 return backend.GetCryptoTokens(token_requests)
912 913 @staticmethod
915 """Ensure daemon is running. 916 917 """ 918 (daemon_name, run) = params 919 return backend.EnsureDaemon(daemon_name, run)
920 921 @staticmethod
922 - def perspective_node_ssh_key_add(params):
923 """Distributes a new node's SSH key if authorized. 924 925 """ 926 (node_uuid, node_name, potential_master_candidates, 927 to_authorized_keys, to_public_keys, get_public_keys) = params 928 return backend.AddNodeSshKey(node_uuid, node_name, 929 potential_master_candidates, 930 to_authorized_keys=to_authorized_keys, 931 to_public_keys=to_public_keys, 932 get_public_keys=get_public_keys)
933 934 @staticmethod
936 """Generates a new root SSH key pair on the node. 937 938 """ 939 (node_uuids, node_names, master_candidate_uuids, 940 potential_master_candidates) = params 941 return backend.RenewSshKeys(node_uuids, node_names, 942 master_candidate_uuids, 943 potential_master_candidates)
944 945 @staticmethod
947 """Removes a node's SSH key from the other nodes' SSH files. 948 949 """ 950 (node_uuid, node_name, 951 master_candidate_uuids, potential_master_candidates, 952 from_authorized_keys, from_public_keys, clear_authorized_keys, 953 clear_public_keys, readd) = params 954 return backend.RemoveNodeSshKey(node_uuid, node_name, 955 master_candidate_uuids, 956 potential_master_candidates, 957 from_authorized_keys=from_authorized_keys, 958 from_public_keys=from_public_keys, 959 clear_authorized_keys=clear_authorized_keys, 960 clear_public_keys=clear_public_keys, 961 readd=readd)
962 963 # cluster -------------------------- 964 965 @staticmethod
966 - def perspective_version(params):
967 """Query version information. 968 969 """ 970 return constants.PROTOCOL_VERSION
971 972 @staticmethod
973 - def perspective_upload_file(params):
974 """Upload a file. 975 976 Note that the backend implementation imposes strict rules on which 977 files are accepted. 978 979 """ 980 return backend.UploadFile(*(params[0]))
981 982 @staticmethod
984 """Upload a file. 985 986 Note that the backend implementation imposes strict rules on which 987 files are accepted. 988 989 """ 990 return backend.UploadFile(*params)
991 992 @staticmethod
993 - def perspective_master_node_name(params):
994 """Returns the master node name. 995 996 """ 997 return backend.GetMasterNodeName()
998 999 @staticmethod
1000 - def perspective_run_oob(params):
1001 """Runs oob on node. 1002 1003 """ 1004 output = backend.RunOob(params[0], params[1], params[2], params[3]) 1005 if output: 1006 result = serializer.LoadJson(output) 1007 else: 1008 result = None 1009 return result
1010 1011 @staticmethod
1013 """Runs a restricted command. 1014 1015 """ 1016 (cmd, ) = params 1017 1018 return backend.RunRestrictedCmd(cmd)
1019 1020 @staticmethod
1022 """Write ssconf files. 1023 1024 """ 1025 (values,) = params 1026 return ssconf.WriteSsconfFiles(values)
1027 1028 @staticmethod
1029 - def perspective_get_watcher_pause(params):
1030 """Get watcher pause end. 1031 1032 """ 1033 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
1034 1035 @staticmethod
1036 - def perspective_set_watcher_pause(params):
1037 """Set watcher pause. 1038 1039 """ 1040 (until, ) = params 1041 return backend.SetWatcherPause(until)
1042 1043 @staticmethod
1044 - def perspective_get_file_info(params):
1045 """Get info on whether a file exists and its properties. 1046 1047 """ 1048 (path, ) = params 1049 return backend.GetFileInfo(path)
1050 1051 # os ----------------------- 1052 1053 @staticmethod
1054 - def perspective_os_diagnose(params):
1055 """Query detailed information about existing OSes. 1056 1057 """ 1058 return backend.DiagnoseOS()
1059 1060 @staticmethod
1061 - def perspective_os_validate(params):
1062 """Run a given OS' validation routine. 1063 1064 """ 1065 required, name, checks, params, force_variant = params 1066 return backend.ValidateOS(required, name, checks, params, force_variant)
1067 1068 @staticmethod
1069 - def perspective_os_export(params):
1070 """Export an OS definition into an instance specific package. 1071 1072 """ 1073 instance = objects.Instance.FromDict(params[0]) 1074 override_env = params[1] 1075 return backend.ExportOS(instance, override_env)
1076 1077 # extstorage ----------------------- 1078 1079 @staticmethod
1081 """Query detailed information about existing extstorage providers. 1082 1083 """ 1084 return backend.DiagnoseExtStorage()
1085 1086 # hooks ----------------------- 1087 1088 @staticmethod
1089 - def perspective_hooks_runner(params):
1090 """Run hook scripts. 1091 1092 """ 1093 hpath, phase, env = params 1094 hr = backend.HooksRunner() 1095 return hr.RunHooks(hpath, phase, env)
1096 1097 # iallocator ----------------- 1098 1099 @staticmethod
1100 - def perspective_iallocator_runner(params):
1101 """Run an iallocator script. 1102 1103 """ 1104 name, idata, ial_params_dict = params 1105 ial_params = [] 1106 for ial_param in ial_params_dict.items(): 1107 ial_params.append("--" + ial_param[0] + "=" + ial_param[1]) 1108 iar = backend.IAllocatorRunner() 1109 return iar.Run(name, idata, ial_params)
1110 1111 # test ----------------------- 1112 1113 @staticmethod
1114 - def perspective_test_delay(params):
1115 """Run test delay. 1116 1117 """ 1118 duration = params[0] 1119 status, rval = utils.TestDelay(duration) 1120 if not status: 1121 raise backend.RPCFail(rval) 1122 return rval
1123 1124 # file storage --------------- 1125 1126 @staticmethod
1128 """Create the file storage directory. 1129 1130 """ 1131 file_storage_dir = params[0] 1132 return backend.CreateFileStorageDir(file_storage_dir)
1133 1134 @staticmethod
1136 """Remove the file storage directory. 1137 1138 """ 1139 file_storage_dir = params[0] 1140 return backend.RemoveFileStorageDir(file_storage_dir)
1141 1142 @staticmethod
1144 """Rename the file storage directory. 1145 1146 """ 1147 old_file_storage_dir = params[0] 1148 new_file_storage_dir = params[1] 1149 return backend.RenameFileStorageDir(old_file_storage_dir, 1150 new_file_storage_dir)
1151 1152 # jobs ------------------------ 1153 1154 @staticmethod 1155 @_RequireJobQueueLock
1156 - def perspective_jobqueue_update(params):
1157 """Update job queue. 1158 1159 """ 1160 (file_name, content) = params 1161 return backend.JobQueueUpdate(file_name, content)
1162 1163 @staticmethod 1164 @_RequireJobQueueLock
1165 - def perspective_jobqueue_purge(params):
1166 """Purge job queue. 1167 1168 """ 1169 return backend.JobQueuePurge()
1170 1171 @staticmethod 1172 @_RequireJobQueueLock
1173 - def perspective_jobqueue_rename(params):
1174 """Rename a job queue file. 1175 1176 """ 1177 # TODO: What if a file fails to rename? 1178 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1179 1180 @staticmethod 1181 @_RequireJobQueueLock
1183 """Set job queue's drain flag. 1184 1185 """ 1186 (flag, ) = params 1187 1188 return jstore.SetDrainFlag(flag)
1189 1190 # hypervisor --------------- 1191 1192 @staticmethod
1194 """Validate the hypervisor parameters. 1195 1196 """ 1197 (hvname, hvparams) = params 1198 return backend.ValidateHVParams(hvname, hvparams)
1199 1200 # Crypto 1201 1202 @staticmethod
1203 - def perspective_x509_cert_create(params):
1204 """Creates a new X509 certificate for SSL/TLS. 1205 1206 """ 1207 (validity, ) = params 1208 return backend.CreateX509Certificate(validity)
1209 1210 @staticmethod
1211 - def perspective_x509_cert_remove(params):
1212 """Removes a X509 certificate. 1213 1214 """ 1215 (name, ) = params 1216 return backend.RemoveX509Certificate(name)
1217 1218 # Import and export 1219 1220 @staticmethod
1221 - def perspective_import_start(params):
1222 """Starts an import daemon. 1223 1224 """ 1225 (opts_s, instance, component, (dest, dest_args)) = params 1226 1227 opts = objects.ImportExportOptions.FromDict(opts_s) 1228 1229 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 1230 None, None, 1231 objects.Instance.FromDict(instance), 1232 component, dest, 1233 _DecodeImportExportIO(dest, 1234 dest_args))
1235 1236 @staticmethod
1237 - def perspective_export_start(params):
1238 """Starts an export daemon. 1239 1240 """ 1241 (opts_s, host, port, instance, component, (source, source_args)) = params 1242 1243 opts = objects.ImportExportOptions.FromDict(opts_s) 1244 1245 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 1246 host, port, 1247 objects.Instance.FromDict(instance), 1248 component, source, 1249 _DecodeImportExportIO(source, 1250 source_args))
1251 1252 @staticmethod
1253 - def perspective_impexp_status(params):
1254 """Retrieves the status of an import or export daemon. 1255 1256 """ 1257 return backend.GetImportExportStatus(params[0])
1258 1259 @staticmethod
1260 - def perspective_impexp_abort(params):
1261 """Aborts an import or export. 1262 1263 """ 1264 return backend.AbortImportExport(params[0])
1265 1266 @staticmethod
1267 - def perspective_impexp_cleanup(params):
1268 """Cleans up after an import or export. 1269 1270 """ 1271 return backend.CleanupImportExport(params[0])
1272
1273 1274 -def CheckNoded(_, args):
1275 """Initial checks whether to run or exit with a failure. 1276 1277 """ 1278 if args: # noded doesn't take any arguments 1279 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1280 sys.argv[0]) 1281 sys.exit(constants.EXIT_FAILURE) 1282 try: 1283 codecs.lookup("string-escape") 1284 except LookupError: 1285 print >> sys.stderr, ("Can't load the string-escape code which is part" 1286 " of the Python installation. Is your installation" 1287 " complete/correct? Aborting.") 1288 sys.exit(constants.EXIT_FAILURE)
1289
1290 1291 -def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1292 """Callback function to verify a peer against the candidate cert map. 1293 1294 Note that we have a chicken-and-egg problem during cluster init and upgrade. 1295 This method checks whether the incoming connection comes from a master 1296 candidate by comparing it to the master certificate map in the cluster 1297 configuration. However, during cluster init and cluster upgrade there 1298 are various RPC calls done to the master node itself, before the candidate 1299 certificate list is established and the cluster configuration is written. 1300 In this case, we cannot check against the master candidate map. 1301 1302 This problem is solved by checking whether the candidate map is empty. An 1303 initialized 2.11 or higher cluster has at least one entry for the master 1304 node in the candidate map. If the map is empty, we know that we are still 1305 in the bootstrap/upgrade phase. In this case, we read the server certificate 1306 digest and compare it to the incoming request. 1307 1308 This means that after an upgrade of Ganeti, the system continues to operate 1309 like before, using server certificates only. After the client certificates 1310 are generated with ``gnt-cluster renew-crypto --new-node-certificates``, 1311 RPC communication is switched to using client certificates and the trick of 1312 using server certificates does not work anymore. 1313 1314 @type conn: C{OpenSSL.SSL.Connection} 1315 @param conn: the OpenSSL connection object 1316 @type cert: C{OpenSSL.X509} 1317 @param cert: the peer's SSL certificate 1318 @type errdepth: integer 1319 @param errdepth: number of the step in the certificate chain starting at 0 1320 for the actual client certificate. 1321 1322 """ 1323 # some parameters are unused, but this is the API 1324 # pylint: disable=W0613 1325 1326 # If we receive a certificate from the certificate chain that is higher 1327 # than the lowest element of the chain, we have to check it against the 1328 # server certificate. 1329 if errdepth > 0: 1330 server_digest = utils.GetCertificateDigest( 1331 cert_filename=pathutils.NODED_CERT_FILE) 1332 match = cert.digest("sha1") == server_digest 1333 if not match: 1334 logging.debug("Received certificate from the certificate chain, which" 1335 " does not match the server certficate. Digest of the" 1336 " received certificate: %s. Digest of the server" 1337 " certificate: %s.", cert.digest("sha1"), server_digest) 1338 return match 1339 elif errdepth == 0: 1340 sstore = ssconf.SimpleStore() 1341 try: 1342 candidate_certs = sstore.GetMasterCandidatesCertMap() 1343 except errors.ConfigurationError: 1344 logging.info("No candidate certificates found. Switching to " 1345 "bootstrap/update mode.") 1346 candidate_certs = None 1347 if not candidate_certs: 1348 candidate_certs = { 1349 constants.CRYPTO_BOOTSTRAP: utils.GetCertificateDigest( 1350 cert_filename=pathutils.NODED_CERT_FILE)} 1351 match = cert.digest("sha1") in candidate_certs.values() 1352 if not match: 1353 logging.debug("Received certificate which is not a certificate of a" 1354 " master candidate. Certificate digest: %s. List of master" 1355 " candidate certificate digests: %s.", cert.digest("sha1"), 1356 str(candidate_certs)) 1357 return match 1358 else: 1359 logging.error("Invalid errdepth value: %s.", errdepth) 1360 return False
1361 # pylint: enable=W0613
1362 1363 1364 -def PrepNoded(options, _):
1365 """Preparation node daemon function, executed with the PID file held. 1366 1367 """ 1368 if options.mlock: 1369 request_executor_class = MlockallRequestExecutor 1370 try: 1371 utils.Mlockall() 1372 except errors.NoCtypesError: 1373 logging.warning("Cannot set memory lock, ctypes module not found") 1374 request_executor_class = http.server.HttpServerRequestExecutor 1375 else: 1376 request_executor_class = http.server.HttpServerRequestExecutor 1377 1378 # Read SSL certificate 1379 if options.ssl: 1380 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1381 ssl_cert_path=options.ssl_cert) 1382 else: 1383 ssl_params = None 1384 1385 err = _PrepareQueueLock() 1386 if err is not None: 1387 # this might be some kind of file-system/permission error; while 1388 # this breaks the job queue functionality, we shouldn't prevent 1389 # startup of the whole node daemon because of this 1390 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1391 1392 handler = NodeRequestHandler() 1393 1394 mainloop = daemon.Mainloop() 1395 server = \ 1396 http.server.HttpServer(mainloop, options.bind_address, options.port, 1397 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1398 request_executor_class=request_executor_class, 1399 ssl_verify_callback=SSLVerifyPeer) 1400 server.Start() 1401 1402 return (mainloop, server)
1403
1404 1405 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1406 """Main node daemon function, executed with the PID file held. 1407 1408 """ 1409 (mainloop, server) = prep_data 1410 try: 1411 mainloop.Run() 1412 finally: 1413 server.Stop() 1414
1415 1416 -def Main():
1417 """Main function for the node daemon. 1418 1419 """ 1420 parser = OptionParser(description="Ganeti node daemon", 1421 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]" 1422 " [-i INTERFACE]"), 1423 version="%%prog (ganeti) %s" % 1424 constants.RELEASE_VERSION) 1425 parser.add_option("--no-mlock", dest="mlock", 1426 help="Do not mlock the node memory in ram", 1427 default=True, action="store_false") 1428 1429 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1430 default_ssl_cert=pathutils.NODED_CERT_FILE, 1431 default_ssl_key=pathutils.NODED_CERT_FILE, 1432 console_logging=True, 1433 warn_breach=True)
1434