Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Ganeti node daemon""" 
  32   
  33  # pylint: disable=C0103,W0142 
  34   
  35  # C0103: Functions in this module need to have a given name structure, 
  36  # and the name of the daemon doesn't match 
  37   
  38  # W0142: Used * or ** magic, since we do use it extensively in this 
  39  # module 
  40   
  41  import os 
  42  import sys 
  43  import logging 
  44  import signal 
  45  import codecs 
  46   
  47  from optparse import OptionParser 
  48   
  49  from ganeti import backend 
  50  from ganeti import constants 
  51  from ganeti import objects 
  52  from ganeti import errors 
  53  from ganeti import jstore 
  54  from ganeti import daemon 
  55  from ganeti import http 
  56  from ganeti import utils 
  57  from ganeti.storage import container 
  58  from ganeti import serializer 
  59  from ganeti import netutils 
  60  from ganeti import pathutils 
  61  from ganeti import ssconf 
  62   
  63  import ganeti.http.server # pylint: disable=W0611 
  64   
  65   
  66  queue_lock = None 
67 68 69 -def _extendReasonTrail(trail, source, reason=""):
70 """Extend the reason trail with noded information 71 72 The trail is extended by appending the name of the noded functionality 73 """ 74 assert trail is not None 75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source) 76 trail.append((trail_source, reason, utils.EpochNano()))
77
78 79 -def _PrepareQueueLock():
80 """Try to prepare the queue lock. 81 82 @return: None for success, otherwise an exception object 83 84 """ 85 global queue_lock # pylint: disable=W0603 86 87 if queue_lock is not None: 88 return None 89 90 # Prepare job queue 91 try: 92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 93 return None 94 except EnvironmentError, err: 95 return err
96
97 98 -def _RequireJobQueueLock(fn):
99 """Decorator for job queue manipulating functions. 100 101 """ 102 QUEUE_LOCK_TIMEOUT = 10 103 104 def wrapper(*args, **kwargs): 105 # Locking in exclusive, blocking mode because there could be several 106 # children running at the same time. Waiting up to 10 seconds. 107 if _PrepareQueueLock() is not None: 108 raise errors.JobQueueError("Job queue failed initialization," 109 " cannot update jobs") 110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 111 try: 112 return fn(*args, **kwargs) 113 finally: 114 queue_lock.Unlock()
115 116 return wrapper 117
118 119 -def _DecodeImportExportIO(ieio, ieioargs):
120 """Decodes import/export I/O information. 121 122 """ 123 if ieio == constants.IEIO_RAW_DISK: 124 assert len(ieioargs) == 1 125 return (objects.Disk.FromDict(ieioargs[0]), ) 126 127 if ieio == constants.IEIO_SCRIPT: 128 assert len(ieioargs) == 2 129 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 130 131 return ieioargs
132
133 134 -def _DefaultAlternative(value, default):
135 """Returns value or, if evaluating to False, a default value. 136 137 Returns the given value, unless it evaluates to False. In the latter case the 138 default value is returned. 139 140 @param value: Value to return if it doesn't evaluate to False 141 @param default: Default value 142 @return: Given value or the default 143 144 """ 145 if value: 146 return value 147 148 return default
149
150 151 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
152 """Subclass ensuring request handlers are locked in RAM. 153 154 """
155 - def __init__(self, *args, **kwargs):
156 utils.Mlockall() 157 158 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
159
160 161 -class NodeRequestHandler(http.server.HttpServerHandler):
162 """The server implementation. 163 164 This class holds all methods exposed over the RPC interface. 165 166 """ 167 # too many public methods, and unused args - all methods get params 168 # due to the API 169 # pylint: disable=R0904,W0613
170 - def __init__(self):
171 http.server.HttpServerHandler.__init__(self) 172 self.noded_pid = os.getpid()
173
174 - def HandleRequest(self, req):
175 """Handle a request. 176 177 """ 178 179 if req.request_method.upper() != http.HTTP_POST: 180 raise http.HttpBadRequest("Only the POST method is supported") 181 182 path = req.request_path 183 if path.startswith("/"): 184 path = path[1:] 185 186 method = getattr(self, "perspective_%s" % path, None) 187 if method is None: 188 raise http.HttpNotFound() 189 190 try: 191 result = (True, method(serializer.LoadJson(req.request_body))) 192 193 except backend.RPCFail, err: 194 # our custom failure exception; str(err) works fine if the 195 # exception was constructed with a single argument, and in 196 # this case, err.message == err.args[0] == str(err) 197 result = (False, str(err)) 198 except errors.QuitGanetiException, err: 199 # Tell parent to quit 200 logging.info("Shutting down the node daemon, arguments: %s", 201 str(err.args)) 202 os.kill(self.noded_pid, signal.SIGTERM) 203 # And return the error's arguments, which must be already in 204 # correct tuple format 205 result = err.args 206 except Exception, err: 207 logging.exception("Error in RPC call") 208 result = (False, "Error while executing backend function: %s" % str(err)) 209 210 return serializer.DumpJson(result)
211 212 # the new block devices -------------------------- 213 214 @staticmethod
215 - def perspective_blockdev_create(params):
216 """Create a block device. 217 218 """ 219 (bdev_s, size, owner, on_primary, info, excl_stor) = params 220 bdev = objects.Disk.FromDict(bdev_s) 221 if bdev is None: 222 raise ValueError("can't unserialize data!") 223 return backend.BlockdevCreate(bdev, size, owner, on_primary, info, 224 excl_stor)
225 226 @staticmethod
227 - def perspective_blockdev_convert(params):
228 """Copy data from source block device to target. 229 230 """ 231 disk_src, disk_dest = params 232 bdev_src = objects.Disk.FromDict(disk_src) 233 bdev_dest = objects.Disk.FromDict(disk_dest) 234 return backend.BlockdevConvert(bdev_src, bdev_dest)
235 236 @staticmethod
238 """Pause/resume sync of a block device. 239 240 """ 241 disks_s, pause = params 242 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 243 return backend.BlockdevPauseResumeSync(disks, pause)
244 245 @staticmethod
246 - def perspective_blockdev_image(params):
247 """Image a block device. 248 249 """ 250 bdev_s, image, size = params 251 bdev = objects.Disk.FromDict(bdev_s) 252 return backend.BlockdevImage(bdev, image, size)
253 254 @staticmethod
255 - def perspective_blockdev_wipe(params):
256 """Wipe a block device. 257 258 """ 259 bdev_s, offset, size = params 260 bdev = objects.Disk.FromDict(bdev_s) 261 return backend.BlockdevWipe(bdev, offset, size)
262 263 @staticmethod
264 - def perspective_blockdev_remove(params):
265 """Remove a block device. 266 267 """ 268 bdev_s = params[0] 269 bdev = objects.Disk.FromDict(bdev_s) 270 return backend.BlockdevRemove(bdev)
271 272 @staticmethod
273 - def perspective_blockdev_rename(params):
274 """Remove a block device. 275 276 """ 277 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 278 return backend.BlockdevRename(devlist)
279 280 @staticmethod
281 - def perspective_blockdev_assemble(params):
282 """Assemble a block device. 283 284 """ 285 bdev_s, idict, on_primary, idx = params 286 bdev = objects.Disk.FromDict(bdev_s) 287 instance = objects.Instance.FromDict(idict) 288 if bdev is None: 289 raise ValueError("can't unserialize data!") 290 return backend.BlockdevAssemble(bdev, instance, on_primary, idx)
291 292 @staticmethod
293 - def perspective_blockdev_shutdown(params):
294 """Shutdown a block device. 295 296 """ 297 bdev_s = params[0] 298 bdev = objects.Disk.FromDict(bdev_s) 299 if bdev is None: 300 raise ValueError("can't unserialize data!") 301 return backend.BlockdevShutdown(bdev)
302 303 @staticmethod
305 """Add a child to a mirror device. 306 307 Note: this is only valid for mirror devices. It's the caller's duty 308 to send a correct disk, otherwise we raise an error. 309 310 """ 311 bdev_s, ndev_s = params 312 bdev = objects.Disk.FromDict(bdev_s) 313 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 314 if bdev is None or ndevs.count(None) > 0: 315 raise ValueError("can't unserialize data!") 316 return backend.BlockdevAddchildren(bdev, ndevs)
317 318 @staticmethod
320 """Remove a child from a mirror device. 321 322 This is only valid for mirror devices, of course. It's the callers 323 duty to send a correct disk, otherwise we raise an error. 324 325 """ 326 bdev_s, ndev_s = params 327 bdev = objects.Disk.FromDict(bdev_s) 328 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 329 if bdev is None or ndevs.count(None) > 0: 330 raise ValueError("can't unserialize data!") 331 return backend.BlockdevRemovechildren(bdev, ndevs)
332 333 @staticmethod
335 """Return the mirror status for a list of disks. 336 337 """ 338 disks = [objects.Disk.FromDict(dsk_s) 339 for dsk_s in params[0]] 340 return [status.ToDict() 341 for status in backend.BlockdevGetmirrorstatus(disks)]
342 343 @staticmethod
345 """Return the mirror status for a list of disks. 346 347 """ 348 (node_disks, ) = params 349 350 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 351 352 result = [] 353 354 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 355 if success: 356 result.append((success, status.ToDict())) 357 else: 358 result.append((success, status)) 359 360 return result
361 362 @staticmethod
363 - def perspective_blockdev_find(params):
364 """Expose the FindBlockDevice functionality for a disk. 365 366 This will try to find but not activate a disk. 367 368 """ 369 disk = objects.Disk.FromDict(params[0]) 370 371 result = backend.BlockdevFind(disk) 372 if result is None: 373 return None 374 375 return result.ToDict()
376 377 @staticmethod
378 - def perspective_blockdev_snapshot(params):
379 """Create a snapshot device. 380 381 Note that this is only valid for LVM and ExtStorage disks, if we get passed 382 something else we raise an exception. The snapshot device can be 383 remove by calling the generic block device remove call. 384 385 """ 386 (disk, snap_name, snap_size) = params 387 cfbd = objects.Disk.FromDict(disk) 388 return backend.BlockdevSnapshot(cfbd, snap_name, snap_size)
389 390 @staticmethod
391 - def perspective_blockdev_grow(params):
392 """Grow a stack of devices. 393 394 """ 395 if len(params) < 5: 396 raise ValueError("Received only %s parameters in blockdev_grow," 397 " old master?" % len(params)) 398 cfbd = objects.Disk.FromDict(params[0]) 399 amount = params[1] 400 dryrun = params[2] 401 backingstore = params[3] 402 excl_stor = params[4] 403 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
404 405 @staticmethod
406 - def perspective_blockdev_close(params):
407 """Closes the given block devices. 408 409 """ 410 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 411 return backend.BlockdevClose(params[0], disks)
412 413 @staticmethod
414 - def perspective_blockdev_open(params):
415 """Opens the given block devices. 416 417 """ 418 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 419 exclusive = params[2] 420 return backend.BlockdevOpen(params[0], disks, exclusive)
421 422 @staticmethod
424 """Compute the sizes of the given block devices. 425 426 """ 427 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 428 return backend.BlockdevGetdimensions(disks)
429 430 @staticmethod
431 - def perspective_blockdev_setinfo(params):
432 """Sets metadata information on the given block device. 433 434 """ 435 (disk, info) = params 436 disk = objects.Disk.FromDict(disk) 437 return backend.BlockdevSetInfo(disk, info)
438 439 # blockdev/drbd specific methods ---------- 440 441 @staticmethod
443 """Disconnects the network connection of drbd disks. 444 445 Note that this is only valid for drbd disks, so the members of the 446 disk list must all be drbd devices. 447 448 """ 449 (disks,) = params 450 disks = [objects.Disk.FromDict(disk) for disk in disks] 451 return backend.DrbdDisconnectNet(disks)
452 453 @staticmethod
454 - def perspective_drbd_attach_net(params):
455 """Attaches the network connection of drbd disks. 456 457 Note that this is only valid for drbd disks, so the members of the 458 disk list must all be drbd devices. 459 460 """ 461 disks, multimaster = params 462 disks = [objects.Disk.FromDict(disk) for disk in disks] 463 return backend.DrbdAttachNet(disks, multimaster)
464 465 @staticmethod
466 - def perspective_drbd_wait_sync(params):
467 """Wait until DRBD disks are synched. 468 469 Note that this is only valid for drbd disks, so the members of the 470 disk list must all be drbd devices. 471 472 """ 473 (disks,) = params 474 disks = [objects.Disk.FromDict(disk) for disk in disks] 475 return backend.DrbdWaitSync(disks)
476 477 @staticmethod
479 """Checks if the drbd devices need activation 480 481 Note that this is only valid for drbd disks, so the members of the 482 disk list must all be drbd devices. 483 484 """ 485 (disks,) = params 486 disks = [objects.Disk.FromDict(disk) for disk in disks] 487 return backend.DrbdNeedsActivation(disks)
488 489 @staticmethod
491 """Query drbd helper. 492 493 """ 494 return backend.GetDrbdUsermodeHelper()
495 496 # export/import -------------------------- 497 498 @staticmethod
499 - def perspective_finalize_export(params):
500 """Expose the finalize export functionality. 501 502 """ 503 instance = objects.Instance.FromDict(params[0]) 504 505 snap_disks = [] 506 for disk in params[1]: 507 if isinstance(disk, bool): 508 snap_disks.append(disk) 509 else: 510 snap_disks.append(objects.Disk.FromDict(disk)) 511 512 return backend.FinalizeExport(instance, snap_disks)
513 514 @staticmethod
515 - def perspective_export_info(params):
516 """Query information about an existing export on this node. 517 518 The given path may not contain an export, in which case we return 519 None. 520 521 """ 522 path = params[0] 523 return backend.ExportInfo(path)
524 525 @staticmethod
526 - def perspective_export_list(params):
527 """List the available exports on this node. 528 529 Note that as opposed to export_info, which may query data about an 530 export in any path, this only queries the standard Ganeti path 531 (pathutils.EXPORT_DIR). 532 533 """ 534 return backend.ListExports()
535 536 @staticmethod
537 - def perspective_export_remove(params):
538 """Remove an export. 539 540 """ 541 export = params[0] 542 return backend.RemoveExport(export)
543 544 # block device --------------------- 545 @staticmethod
546 - def perspective_bdev_sizes(params):
547 """Query the list of block devices 548 549 """ 550 devices = params[0] 551 return backend.GetBlockDevSizes(devices)
552 553 # volume -------------------------- 554 555 @staticmethod
556 - def perspective_lv_list(params):
557 """Query the list of logical volumes in a given volume group. 558 559 """ 560 vgname = params[0] 561 return backend.GetVolumeList(vgname)
562 563 @staticmethod
564 - def perspective_vg_list(params):
565 """Query the list of volume groups. 566 567 """ 568 return backend.ListVolumeGroups()
569 570 # Storage -------------------------- 571 572 @staticmethod
573 - def perspective_storage_list(params):
574 """Get list of storage units. 575 576 """ 577 (su_name, su_args, name, fields) = params 578 return container.GetStorage(su_name, *su_args).List(name, fields)
579 580 @staticmethod
581 - def perspective_storage_modify(params):
582 """Modify a storage unit. 583 584 """ 585 (su_name, su_args, name, changes) = params 586 return container.GetStorage(su_name, *su_args).Modify(name, changes)
587 588 @staticmethod
589 - def perspective_storage_execute(params):
590 """Execute an operation on a storage unit. 591 592 """ 593 (su_name, su_args, name, op) = params 594 return container.GetStorage(su_name, *su_args).Execute(name, op)
595 596 # bridge -------------------------- 597 598 @staticmethod
599 - def perspective_bridges_exist(params):
600 """Check if all bridges given exist on this node. 601 602 """ 603 bridges_list = params[0] 604 return backend.BridgesExist(bridges_list)
605 606 # instance -------------------------- 607 608 @staticmethod
609 - def perspective_instance_os_add(params):
610 """Install an OS on a given instance. 611 612 """ 613 inst_s = params[0] 614 inst = objects.Instance.FromDict(inst_s) 615 reinstall = params[1] 616 debug = params[2] 617 return backend.InstanceOsAdd(inst, reinstall, debug)
618 619 @staticmethod
621 """Runs the OS rename script for an instance. 622 623 """ 624 inst_s, old_name, debug = params 625 inst = objects.Instance.FromDict(inst_s) 626 return backend.RunRenameInstance(inst, old_name, debug)
627 628 @staticmethod
629 - def perspective_instance_shutdown(params):
630 """Shutdown an instance. 631 632 """ 633 instance = objects.Instance.FromDict(params[0]) 634 timeout = params[1] 635 trail = params[2] 636 _extendReasonTrail(trail, "shutdown") 637 return backend.InstanceShutdown(instance, timeout, trail)
638 639 @staticmethod
640 - def perspective_instance_start(params):
641 """Start an instance. 642 643 """ 644 (instance_name, startup_paused, trail) = params 645 instance = objects.Instance.FromDict(instance_name) 646 _extendReasonTrail(trail, "start") 647 return backend.StartInstance(instance, startup_paused, trail)
648 649 @staticmethod
650 - def perspective_hotplug_device(params):
651 """Hotplugs device to a running instance. 652 653 """ 654 (idict, action, dev_type, ddict, extra, seq) = params 655 instance = objects.Instance.FromDict(idict) 656 if dev_type == constants.HOTPLUG_TARGET_DISK: 657 device = objects.Disk.FromDict(ddict) 658 elif dev_type == constants.HOTPLUG_TARGET_NIC: 659 device = objects.NIC.FromDict(ddict) 660 else: 661 assert dev_type in constants.HOTPLUG_ALL_TARGETS 662 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
663 664 @staticmethod
665 - def perspective_hotplug_supported(params):
666 """Checks if hotplug is supported. 667 668 """ 669 instance = objects.Instance.FromDict(params[0]) 670 return backend.HotplugSupported(instance)
671 672 @staticmethod
674 """Modify instance metadata. 675 676 """ 677 instance = params[0] 678 return backend.ModifyInstanceMetadata(instance)
679 680 @staticmethod
681 - def perspective_migration_info(params):
682 """Gather information about an instance to be migrated. 683 684 """ 685 instance = objects.Instance.FromDict(params[0]) 686 return backend.MigrationInfo(instance)
687 688 @staticmethod
689 - def perspective_accept_instance(params):
690 """Prepare the node to accept an instance. 691 692 """ 693 instance, info, target = params 694 instance = objects.Instance.FromDict(instance) 695 return backend.AcceptInstance(instance, info, target)
696 697 @staticmethod
699 """Finalize the instance migration on the destination node. 700 701 """ 702 instance, info, success = params 703 instance = objects.Instance.FromDict(instance) 704 return backend.FinalizeMigrationDst(instance, info, success)
705 706 @staticmethod
707 - def perspective_instance_migrate(params):
708 """Migrates an instance. 709 710 """ 711 cluster_name, instance, target, live = params 712 instance = objects.Instance.FromDict(instance) 713 return backend.MigrateInstance(cluster_name, instance, target, live)
714 715 @staticmethod
717 """Finalize the instance migration on the source node. 718 719 """ 720 instance, success, live = params 721 instance = objects.Instance.FromDict(instance) 722 return backend.FinalizeMigrationSource(instance, success, live)
723 724 @staticmethod
726 """Reports migration status. 727 728 """ 729 instance = objects.Instance.FromDict(params[0]) 730 return backend.GetMigrationStatus(instance).ToDict()
731 732 @staticmethod
733 - def perspective_instance_reboot(params):
734 """Reboot an instance. 735 736 """ 737 instance = objects.Instance.FromDict(params[0]) 738 reboot_type = params[1] 739 shutdown_timeout = params[2] 740 trail = params[3] 741 _extendReasonTrail(trail, "reboot") 742 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout, 743 trail)
744 745 @staticmethod
747 """Modify instance runtime memory. 748 749 """ 750 instance_dict, memory = params 751 instance = objects.Instance.FromDict(instance_dict) 752 return backend.InstanceBalloonMemory(instance, memory)
753 754 @staticmethod
755 - def perspective_instance_info(params):
756 """Query instance information. 757 758 """ 759 (instance_name, hypervisor_name, hvparams) = params 760 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
761 762 @staticmethod
764 """Query whether the specified instance can be migrated. 765 766 """ 767 instance = objects.Instance.FromDict(params[0]) 768 return backend.GetInstanceMigratable(instance)
769 770 @staticmethod
772 """Query information about all instances. 773 774 """ 775 (hypervisor_list, all_hvparams) = params 776 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
777 778 @staticmethod
780 """Query information on how to get console access to instances 781 782 """ 783 return backend.GetInstanceConsoleInfo(params)
784 785 @staticmethod
786 - def perspective_instance_list(params):
787 """Query the list of running instances. 788 789 """ 790 (hypervisor_list, hvparams) = params 791 return backend.GetInstanceList(hypervisor_list, hvparams)
792 793 # node -------------------------- 794 795 @staticmethod
797 """Checks if a node has the given ip address. 798 799 """ 800 return netutils.IPAddress.Own(params[0])
801 802 @staticmethod
803 - def perspective_node_info(params):
804 """Query node information. 805 806 """ 807 (storage_units, hv_specs) = params 808 return backend.GetNodeInfo(storage_units, hv_specs)
809 810 @staticmethod
811 - def perspective_etc_hosts_modify(params):
812 """Modify a node entry in /etc/hosts. 813 814 """ 815 backend.EtcHostsModify(params[0], params[1], params[2]) 816 817 return True
818 819 @staticmethod
820 - def perspective_node_verify(params):
821 """Run a verify sequence on this node. 822 823 """ 824 (what, cluster_name, hvparams, node_groups, groups_cfg) = params 825 return backend.VerifyNode(what, cluster_name, hvparams, 826 node_groups, groups_cfg)
827 828 @classmethod
829 - def perspective_node_verify_light(cls, params):
830 """Run a light verify sequence on this node. 831 832 This call is meant to perform a less strict verification of the node in 833 certain situations. Right now, it is invoked only when a node is just about 834 to be added to a cluster, and even then, it performs the same checks as 835 L{perspective_node_verify}. 836 """ 837 return cls.perspective_node_verify(params)
838 839 @staticmethod
841 """Start the master daemons on this node. 842 843 """ 844 return backend.StartMasterDaemons(params[0])
845 846 @staticmethod
848 """Activate the master IP on this node. 849 850 """ 851 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 852 return backend.ActivateMasterIp(master_params, params[1])
853 854 @staticmethod
856 """Deactivate the master IP on this node. 857 858 """ 859 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 860 return backend.DeactivateMasterIp(master_params, params[1])
861 862 @staticmethod
863 - def perspective_node_stop_master(params):
864 """Stops master daemons on this node. 865 866 """ 867 return backend.StopMasterDaemons()
868 869 @staticmethod
871 """Change the master IP netmask. 872 873 """ 874 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 875 params[3])
876 877 @staticmethod
879 """Cleanup after leaving a cluster. 880 881 """ 882 return backend.LeaveCluster(params[0])
883 884 @staticmethod
885 - def perspective_node_volumes(params):
886 """Query the list of all logical volume groups. 887 888 """ 889 return backend.NodeVolumes()
890 891 @staticmethod
893 """Demote a node from the master candidate role. 894 895 """ 896 return backend.DemoteFromMC()
897 898 @staticmethod
899 - def perspective_node_powercycle(params):
900 """Tries to powercycle the node. 901 902 """ 903 (hypervisor_type, hvparams) = params 904 return backend.PowercycleNode(hypervisor_type, hvparams)
905 906 @staticmethod
908 """Sets up OpenvSwitch on the node. 909 910 """ 911 (ovs_name, ovs_link) = params 912 return backend.ConfigureOVS(ovs_name, ovs_link)
913 914 @staticmethod
916 """Gets the node's public crypto tokens. 917 918 """ 919 token_requests = params[0] 920 return backend.GetCryptoTokens(token_requests)
921 922 @staticmethod
924 """Ensure daemon is running. 925 926 """ 927 (daemon_name, run) = params 928 return backend.EnsureDaemon(daemon_name, run)
929 930 @staticmethod
931 - def perspective_node_ssh_key_add(params):
932 """Distributes a new node's SSH key if authorized. 933 934 """ 935 (node_uuid, node_name, potential_master_candidates, 936 to_authorized_keys, to_public_keys, get_public_keys) = params 937 return backend.AddNodeSshKey(node_uuid, node_name, 938 potential_master_candidates, 939 to_authorized_keys=to_authorized_keys, 940 to_public_keys=to_public_keys, 941 get_public_keys=get_public_keys)
942 943 @staticmethod
945 """Generates a new root SSH key pair on the node. 946 947 """ 948 (node_uuids, node_names, master_candidate_uuids, 949 potential_master_candidates) = params 950 return backend.RenewSshKeys(node_uuids, node_names, 951 master_candidate_uuids, 952 potential_master_candidates)
953 954 @staticmethod
956 """Removes a node's SSH key from the other nodes' SSH files. 957 958 """ 959 (node_uuid, node_name, 960 master_candidate_uuids, potential_master_candidates, 961 from_authorized_keys, from_public_keys, clear_authorized_keys, 962 clear_public_keys, readd) = params 963 return backend.RemoveNodeSshKey(node_uuid, node_name, 964 master_candidate_uuids, 965 potential_master_candidates, 966 from_authorized_keys=from_authorized_keys, 967 from_public_keys=from_public_keys, 968 clear_authorized_keys=clear_authorized_keys, 969 clear_public_keys=clear_public_keys, 970 readd=readd)
971 972 # cluster -------------------------- 973 974 @staticmethod
975 - def perspective_version(params):
976 """Query version information. 977 978 """ 979 return constants.PROTOCOL_VERSION
980 981 @staticmethod
982 - def perspective_upload_file(params):
983 """Upload a file. 984 985 Note that the backend implementation imposes strict rules on which 986 files are accepted. 987 988 """ 989 return backend.UploadFile(*(params[0]))
990 991 @staticmethod
993 """Upload a file. 994 995 Note that the backend implementation imposes strict rules on which 996 files are accepted. 997 998 """ 999 return backend.UploadFile(*params)
1000 1001 @staticmethod
1002 - def perspective_master_node_name(params):
1003 """Returns the master node name. 1004 1005 """ 1006 return backend.GetMasterNodeName()
1007 1008 @staticmethod
1009 - def perspective_run_oob(params):
1010 """Runs oob on node. 1011 1012 """ 1013 output = backend.RunOob(params[0], params[1], params[2], params[3]) 1014 if output: 1015 result = serializer.LoadJson(output) 1016 else: 1017 result = None 1018 return result
1019 1020 @staticmethod
1022 """Runs a restricted command. 1023 1024 """ 1025 (cmd, ) = params 1026 1027 return backend.RunRestrictedCmd(cmd)
1028 1029 @staticmethod
1031 """Write ssconf files. 1032 1033 """ 1034 (values,) = params 1035 return ssconf.WriteSsconfFiles(values)
1036 1037 @staticmethod
1038 - def perspective_get_watcher_pause(params):
1039 """Get watcher pause end. 1040 1041 """ 1042 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
1043 1044 @staticmethod
1045 - def perspective_set_watcher_pause(params):
1046 """Set watcher pause. 1047 1048 """ 1049 (until, ) = params 1050 return backend.SetWatcherPause(until)
1051 1052 @staticmethod
1053 - def perspective_get_file_info(params):
1054 """Get info on whether a file exists and its properties. 1055 1056 """ 1057 (path, ) = params 1058 return backend.GetFileInfo(path)
1059 1060 # os ----------------------- 1061 1062 @staticmethod
1063 - def perspective_os_diagnose(params):
1064 """Query detailed information about existing OSes. 1065 1066 """ 1067 return backend.DiagnoseOS()
1068 1069 @staticmethod
1070 - def perspective_os_validate(params):
1071 """Run a given OS' validation routine. 1072 1073 """ 1074 required, name, checks, params, force_variant = params 1075 return backend.ValidateOS(required, name, checks, params, force_variant)
1076 1077 @staticmethod
1078 - def perspective_os_export(params):
1079 """Export an OS definition into an instance specific package. 1080 1081 """ 1082 instance = objects.Instance.FromDict(params[0]) 1083 override_env = params[1] 1084 return backend.ExportOS(instance, override_env)
1085 1086 # extstorage ----------------------- 1087 1088 @staticmethod
1090 """Query detailed information about existing extstorage providers. 1091 1092 """ 1093 return backend.DiagnoseExtStorage()
1094 1095 # hooks ----------------------- 1096 1097 @staticmethod
1098 - def perspective_hooks_runner(params):
1099 """Run hook scripts. 1100 1101 """ 1102 hpath, phase, env = params 1103 hr = backend.HooksRunner() 1104 return hr.RunHooks(hpath, phase, env)
1105 1106 # iallocator ----------------- 1107 1108 @staticmethod
1109 - def perspective_iallocator_runner(params):
1110 """Run an iallocator script. 1111 1112 """ 1113 name, idata, ial_params_dict = params 1114 ial_params = [] 1115 for ial_param in ial_params_dict.items(): 1116 if ial_param[1] is not None: 1117 ial_params.append("--" + ial_param[0] + "=" + ial_param[1]) 1118 else: 1119 ial_params.append("--" + ial_param[0]) 1120 iar = backend.IAllocatorRunner() 1121 return iar.Run(name, idata, ial_params)
1122 1123 # test ----------------------- 1124 1125 @staticmethod
1126 - def perspective_test_delay(params):
1127 """Run test delay. 1128 1129 """ 1130 duration = params[0] 1131 status, rval = utils.TestDelay(duration) 1132 if not status: 1133 raise backend.RPCFail(rval) 1134 return rval
1135 1136 # file storage --------------- 1137 1138 @staticmethod
1140 """Create the file storage directory. 1141 1142 """ 1143 file_storage_dir = params[0] 1144 return backend.CreateFileStorageDir(file_storage_dir)
1145 1146 @staticmethod
1148 """Remove the file storage directory. 1149 1150 """ 1151 file_storage_dir = params[0] 1152 return backend.RemoveFileStorageDir(file_storage_dir)
1153 1154 @staticmethod
1156 """Rename the file storage directory. 1157 1158 """ 1159 old_file_storage_dir = params[0] 1160 new_file_storage_dir = params[1] 1161 return backend.RenameFileStorageDir(old_file_storage_dir, 1162 new_file_storage_dir)
1163 1164 # jobs ------------------------ 1165 1166 @staticmethod 1167 @_RequireJobQueueLock
1168 - def perspective_jobqueue_update(params):
1169 """Update job queue. 1170 1171 """ 1172 (file_name, content) = params 1173 return backend.JobQueueUpdate(file_name, content)
1174 1175 @staticmethod 1176 @_RequireJobQueueLock
1177 - def perspective_jobqueue_purge(params):
1178 """Purge job queue. 1179 1180 """ 1181 return backend.JobQueuePurge()
1182 1183 @staticmethod 1184 @_RequireJobQueueLock
1185 - def perspective_jobqueue_rename(params):
1186 """Rename a job queue file. 1187 1188 """ 1189 # TODO: What if a file fails to rename? 1190 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1191 1192 @staticmethod 1193 @_RequireJobQueueLock
1195 """Set job queue's drain flag. 1196 1197 """ 1198 (flag, ) = params 1199 1200 return jstore.SetDrainFlag(flag)
1201 1202 # hypervisor --------------- 1203 1204 @staticmethod
1206 """Validate the hypervisor parameters. 1207 1208 """ 1209 (hvname, hvparams) = params 1210 return backend.ValidateHVParams(hvname, hvparams)
1211 1212 # Crypto 1213 1214 @staticmethod
1215 - def perspective_x509_cert_create(params):
1216 """Creates a new X509 certificate for SSL/TLS. 1217 1218 """ 1219 (validity, ) = params 1220 return backend.CreateX509Certificate(validity)
1221 1222 @staticmethod
1223 - def perspective_x509_cert_remove(params):
1224 """Removes a X509 certificate. 1225 1226 """ 1227 (name, ) = params 1228 return backend.RemoveX509Certificate(name)
1229 1230 # Import and export 1231 1232 @staticmethod
1233 - def perspective_import_start(params):
1234 """Starts an import daemon. 1235 1236 """ 1237 (opts_s, instance, component, (dest, dest_args)) = params 1238 1239 opts = objects.ImportExportOptions.FromDict(opts_s) 1240 1241 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 1242 None, None, 1243 objects.Instance.FromDict(instance), 1244 component, dest, 1245 _DecodeImportExportIO(dest, 1246 dest_args))
1247 1248 @staticmethod
1249 - def perspective_export_start(params):
1250 """Starts an export daemon. 1251 1252 """ 1253 (opts_s, host, port, instance, component, (source, source_args)) = params 1254 1255 opts = objects.ImportExportOptions.FromDict(opts_s) 1256 1257 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 1258 host, port, 1259 objects.Instance.FromDict(instance), 1260 component, source, 1261 _DecodeImportExportIO(source, 1262 source_args))
1263 1264 @staticmethod
1265 - def perspective_impexp_status(params):
1266 """Retrieves the status of an import or export daemon. 1267 1268 """ 1269 return backend.GetImportExportStatus(params[0])
1270 1271 @staticmethod
1272 - def perspective_impexp_abort(params):
1273 """Aborts an import or export. 1274 1275 """ 1276 return backend.AbortImportExport(params[0])
1277 1278 @staticmethod
1279 - def perspective_impexp_cleanup(params):
1280 """Cleans up after an import or export. 1281 1282 """ 1283 return backend.CleanupImportExport(params[0])
1284
1285 1286 -def CheckNoded(_, args):
1287 """Initial checks whether to run or exit with a failure. 1288 1289 """ 1290 if args: # noded doesn't take any arguments 1291 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1292 sys.argv[0]) 1293 sys.exit(constants.EXIT_FAILURE) 1294 try: 1295 codecs.lookup("string-escape") 1296 except LookupError: 1297 print >> sys.stderr, ("Can't load the string-escape code which is part" 1298 " of the Python installation. Is your installation" 1299 " complete/correct? Aborting.") 1300 sys.exit(constants.EXIT_FAILURE)
1301
1302 1303 -def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1304 """Callback function to verify a peer against the candidate cert map. 1305 1306 Note that we have a chicken-and-egg problem during cluster init and upgrade. 1307 This method checks whether the incoming connection comes from a master 1308 candidate by comparing it to the master certificate map in the cluster 1309 configuration. However, during cluster init and cluster upgrade there 1310 are various RPC calls done to the master node itself, before the candidate 1311 certificate list is established and the cluster configuration is written. 1312 In this case, we cannot check against the master candidate map. 1313 1314 This problem is solved by checking whether the candidate map is empty. An 1315 initialized 2.11 or higher cluster has at least one entry for the master 1316 node in the candidate map. If the map is empty, we know that we are still 1317 in the bootstrap/upgrade phase. In this case, we read the server certificate 1318 digest and compare it to the incoming request. 1319 1320 This means that after an upgrade of Ganeti, the system continues to operate 1321 like before, using server certificates only. After the client certificates 1322 are generated with ``gnt-cluster renew-crypto --new-node-certificates``, 1323 RPC communication is switched to using client certificates and the trick of 1324 using server certificates does not work anymore. 1325 1326 @type conn: C{OpenSSL.SSL.Connection} 1327 @param conn: the OpenSSL connection object 1328 @type cert: C{OpenSSL.X509} 1329 @param cert: the peer's SSL certificate 1330 @type errdepth: integer 1331 @param errdepth: number of the step in the certificate chain starting at 0 1332 for the actual client certificate. 1333 1334 """ 1335 # some parameters are unused, but this is the API 1336 # pylint: disable=W0613 1337 1338 # If we receive a certificate from the certificate chain that is higher 1339 # than the lowest element of the chain, we have to check it against the 1340 # server certificate. 1341 if errdepth > 0: 1342 server_digest = utils.GetCertificateDigest( 1343 cert_filename=pathutils.NODED_CERT_FILE) 1344 match = cert.digest("sha1") == server_digest 1345 if not match: 1346 logging.debug("Received certificate from the certificate chain, which" 1347 " does not match the server certficate. Digest of the" 1348 " received certificate: %s. Digest of the server" 1349 " certificate: %s.", cert.digest("sha1"), server_digest) 1350 return match 1351 elif errdepth == 0: 1352 sstore = ssconf.SimpleStore() 1353 try: 1354 candidate_certs = sstore.GetMasterCandidatesCertMap() 1355 except errors.ConfigurationError: 1356 logging.info("No candidate certificates found. Switching to " 1357 "bootstrap/update mode.") 1358 candidate_certs = None 1359 if not candidate_certs: 1360 candidate_certs = { 1361 constants.CRYPTO_BOOTSTRAP: utils.GetCertificateDigest( 1362 cert_filename=pathutils.NODED_CERT_FILE)} 1363 match = cert.digest("sha1") in candidate_certs.values() 1364 if not match: 1365 logging.debug("Received certificate which is not a certificate of a" 1366 " master candidate. Certificate digest: %s. List of master" 1367 " candidate certificate digests: %s.", cert.digest("sha1"), 1368 str(candidate_certs)) 1369 return match 1370 else: 1371 logging.error("Invalid errdepth value: %s.", errdepth) 1372 return False
1373 # pylint: enable=W0613
1374 1375 1376 -def PrepNoded(options, _):
1377 """Preparation node daemon function, executed with the PID file held. 1378 1379 """ 1380 if options.mlock: 1381 request_executor_class = MlockallRequestExecutor 1382 try: 1383 utils.Mlockall() 1384 except errors.NoCtypesError: 1385 logging.warning("Cannot set memory lock, ctypes module not found") 1386 request_executor_class = http.server.HttpServerRequestExecutor 1387 else: 1388 request_executor_class = http.server.HttpServerRequestExecutor 1389 1390 # Read SSL certificate 1391 if options.ssl: 1392 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1393 ssl_cert_path=options.ssl_cert) 1394 else: 1395 ssl_params = None 1396 1397 err = _PrepareQueueLock() 1398 if err is not None: 1399 # this might be some kind of file-system/permission error; while 1400 # this breaks the job queue functionality, we shouldn't prevent 1401 # startup of the whole node daemon because of this 1402 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1403 1404 handler = NodeRequestHandler() 1405 1406 mainloop = daemon.Mainloop() 1407 server = \ 1408 http.server.HttpServer(mainloop, options.bind_address, options.port, 1409 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1410 request_executor_class=request_executor_class, 1411 ssl_verify_callback=SSLVerifyPeer) 1412 server.Start() 1413 1414 return (mainloop, server)
1415
1416 1417 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1418 """Main node daemon function, executed with the PID file held. 1419 1420 """ 1421 (mainloop, server) = prep_data 1422 try: 1423 mainloop.Run() 1424 finally: 1425 server.Stop() 1426
1427 1428 -def Main():
1429 """Main function for the node daemon. 1430 1431 """ 1432 parser = OptionParser(description="Ganeti node daemon", 1433 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]" 1434 " [-i INTERFACE]"), 1435 version="%%prog (ganeti) %s" % 1436 constants.RELEASE_VERSION) 1437 parser.add_option("--no-mlock", dest="mlock", 1438 help="Do not mlock the node memory in ram", 1439 default=True, action="store_false") 1440 1441 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1442 default_ssl_cert=pathutils.NODED_CERT_FILE, 1443 default_ssl_key=pathutils.NODED_CERT_FILE, 1444 console_logging=True, 1445 warn_breach=True)
1446