Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Ganeti node daemon""" 
  32   
  33  # pylint: disable=C0103,W0142 
  34   
  35  # C0103: Functions in this module need to have a given name structure, 
  36  # and the name of the daemon doesn't match 
  37   
  38  # W0142: Used * or ** magic, since we do use it extensively in this 
  39  # module 
  40   
  41  import os 
  42  import sys 
  43  import logging 
  44  import signal 
  45  import codecs 
  46   
  47  from optparse import OptionParser 
  48   
  49  from ganeti import backend 
  50  from ganeti import constants 
  51  from ganeti import objects 
  52  from ganeti import errors 
  53  from ganeti import jstore 
  54  from ganeti import daemon 
  55  from ganeti import http 
  56  from ganeti import utils 
  57  from ganeti.storage import container 
  58  from ganeti import serializer 
  59  from ganeti import netutils 
  60  from ganeti import pathutils 
  61  from ganeti import ssconf 
  62   
  63  import ganeti.http.server # pylint: disable=W0611 
  64   
  65   
  66  queue_lock = None 
67 68 69 -def _extendReasonTrail(trail, source, reason=""):
70 """Extend the reason trail with noded information 71 72 The trail is extended by appending the name of the noded functionality 73 """ 74 assert trail is not None 75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source) 76 trail.append((trail_source, reason, utils.EpochNano()))
77
78 79 -def _PrepareQueueLock():
80 """Try to prepare the queue lock. 81 82 @return: None for success, otherwise an exception object 83 84 """ 85 global queue_lock # pylint: disable=W0603 86 87 if queue_lock is not None: 88 return None 89 90 # Prepare job queue 91 try: 92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 93 return None 94 except EnvironmentError, err: 95 return err
96
97 98 -def _RequireJobQueueLock(fn):
99 """Decorator for job queue manipulating functions. 100 101 """ 102 QUEUE_LOCK_TIMEOUT = 10 103 104 def wrapper(*args, **kwargs): 105 # Locking in exclusive, blocking mode because there could be several 106 # children running at the same time. Waiting up to 10 seconds. 107 if _PrepareQueueLock() is not None: 108 raise errors.JobQueueError("Job queue failed initialization," 109 " cannot update jobs") 110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 111 try: 112 return fn(*args, **kwargs) 113 finally: 114 queue_lock.Unlock()
115 116 return wrapper 117
118 119 -def _DecodeImportExportIO(ieio, ieioargs):
120 """Decodes import/export I/O information. 121 122 """ 123 if ieio == constants.IEIO_RAW_DISK: 124 assert len(ieioargs) == 1 125 return (objects.Disk.FromDict(ieioargs[0]), ) 126 127 if ieio == constants.IEIO_SCRIPT: 128 assert len(ieioargs) == 2 129 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 130 131 return ieioargs
132
133 134 -def _DefaultAlternative(value, default):
135 """Returns value or, if evaluating to False, a default value. 136 137 Returns the given value, unless it evaluates to False. In the latter case the 138 default value is returned. 139 140 @param value: Value to return if it doesn't evaluate to False 141 @param default: Default value 142 @return: Given value or the default 143 144 """ 145 if value: 146 return value 147 148 return default
149
150 151 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
152 """Subclass ensuring request handlers are locked in RAM. 153 154 """
155 - def __init__(self, *args, **kwargs):
156 utils.Mlockall() 157 158 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
159
160 161 -class NodeRequestHandler(http.server.HttpServerHandler):
162 """The server implementation. 163 164 This class holds all methods exposed over the RPC interface. 165 166 """ 167 # too many public methods, and unused args - all methods get params 168 # due to the API 169 # pylint: disable=R0904,W0613
170 - def __init__(self):
171 http.server.HttpServerHandler.__init__(self) 172 self.noded_pid = os.getpid()
173
174 - def HandleRequest(self, req):
175 """Handle a request. 176 177 """ 178 179 if req.request_method.upper() != http.HTTP_POST: 180 raise http.HttpBadRequest("Only the POST method is supported") 181 182 path = req.request_path 183 if path.startswith("/"): 184 path = path[1:] 185 186 method = getattr(self, "perspective_%s" % path, None) 187 if method is None: 188 raise http.HttpNotFound() 189 190 try: 191 result = (True, method(serializer.LoadJson(req.request_body))) 192 193 except backend.RPCFail, err: 194 # our custom failure exception; str(err) works fine if the 195 # exception was constructed with a single argument, and in 196 # this case, err.message == err.args[0] == str(err) 197 result = (False, str(err)) 198 except errors.QuitGanetiException, err: 199 # Tell parent to quit 200 logging.info("Shutting down the node daemon, arguments: %s", 201 str(err.args)) 202 os.kill(self.noded_pid, signal.SIGTERM) 203 # And return the error's arguments, which must be already in 204 # correct tuple format 205 result = err.args 206 except Exception, err: 207 logging.exception("Error in RPC call") 208 result = (False, "Error while executing backend function: %s" % str(err)) 209 210 return serializer.DumpJson(result)
211 212 # the new block devices -------------------------- 213 214 @staticmethod
215 - def perspective_blockdev_create(params):
216 """Create a block device. 217 218 """ 219 (bdev_s, size, owner, on_primary, info, excl_stor) = params 220 bdev = objects.Disk.FromDict(bdev_s) 221 if bdev is None: 222 raise ValueError("can't unserialize data!") 223 return backend.BlockdevCreate(bdev, size, owner, on_primary, info, 224 excl_stor)
225 226 @staticmethod
227 - def perspective_blockdev_convert(params):
228 """Copy data from source block device to target. 229 230 """ 231 disk_src, disk_dest = params 232 bdev_src = objects.Disk.FromDict(disk_src) 233 bdev_dest = objects.Disk.FromDict(disk_dest) 234 return backend.BlockdevConvert(bdev_src, bdev_dest)
235 236 @staticmethod
238 """Pause/resume sync of a block device. 239 240 """ 241 disks_s, pause = params 242 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 243 return backend.BlockdevPauseResumeSync(disks, pause)
244 245 @staticmethod
246 - def perspective_blockdev_image(params):
247 """Image a block device. 248 249 """ 250 bdev_s, image, size = params 251 bdev = objects.Disk.FromDict(bdev_s) 252 return backend.BlockdevImage(bdev, image, size)
253 254 @staticmethod
255 - def perspective_blockdev_wipe(params):
256 """Wipe a block device. 257 258 """ 259 bdev_s, offset, size = params 260 bdev = objects.Disk.FromDict(bdev_s) 261 return backend.BlockdevWipe(bdev, offset, size)
262 263 @staticmethod
264 - def perspective_blockdev_remove(params):
265 """Remove a block device. 266 267 """ 268 bdev_s = params[0] 269 bdev = objects.Disk.FromDict(bdev_s) 270 return backend.BlockdevRemove(bdev)
271 272 @staticmethod
273 - def perspective_blockdev_rename(params):
274 """Remove a block device. 275 276 """ 277 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 278 return backend.BlockdevRename(devlist)
279 280 @staticmethod
281 - def perspective_blockdev_assemble(params):
282 """Assemble a block device. 283 284 """ 285 bdev_s, idict, on_primary, idx = params 286 bdev = objects.Disk.FromDict(bdev_s) 287 instance = objects.Instance.FromDict(idict) 288 if bdev is None: 289 raise ValueError("can't unserialize data!") 290 return backend.BlockdevAssemble(bdev, instance, on_primary, idx)
291 292 @staticmethod
293 - def perspective_blockdev_shutdown(params):
294 """Shutdown a block device. 295 296 """ 297 bdev_s = params[0] 298 bdev = objects.Disk.FromDict(bdev_s) 299 if bdev is None: 300 raise ValueError("can't unserialize data!") 301 return backend.BlockdevShutdown(bdev)
302 303 @staticmethod
305 """Add a child to a mirror device. 306 307 Note: this is only valid for mirror devices. It's the caller's duty 308 to send a correct disk, otherwise we raise an error. 309 310 """ 311 bdev_s, ndev_s = params 312 bdev = objects.Disk.FromDict(bdev_s) 313 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 314 if bdev is None or ndevs.count(None) > 0: 315 raise ValueError("can't unserialize data!") 316 return backend.BlockdevAddchildren(bdev, ndevs)
317 318 @staticmethod
320 """Remove a child from a mirror device. 321 322 This is only valid for mirror devices, of course. It's the callers 323 duty to send a correct disk, otherwise we raise an error. 324 325 """ 326 bdev_s, ndev_s = params 327 bdev = objects.Disk.FromDict(bdev_s) 328 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 329 if bdev is None or ndevs.count(None) > 0: 330 raise ValueError("can't unserialize data!") 331 return backend.BlockdevRemovechildren(bdev, ndevs)
332 333 @staticmethod
335 """Return the mirror status for a list of disks. 336 337 """ 338 disks = [objects.Disk.FromDict(dsk_s) 339 for dsk_s in params[0]] 340 return [status.ToDict() 341 for status in backend.BlockdevGetmirrorstatus(disks)]
342 343 @staticmethod
345 """Return the mirror status for a list of disks. 346 347 """ 348 (node_disks, ) = params 349 350 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 351 352 result = [] 353 354 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 355 if success: 356 result.append((success, status.ToDict())) 357 else: 358 result.append((success, status)) 359 360 return result
361 362 @staticmethod
363 - def perspective_blockdev_find(params):
364 """Expose the FindBlockDevice functionality for a disk. 365 366 This will try to find but not activate a disk. 367 368 """ 369 disk = objects.Disk.FromDict(params[0]) 370 371 result = backend.BlockdevFind(disk) 372 if result is None: 373 return None 374 375 return result.ToDict()
376 377 @staticmethod
378 - def perspective_blockdev_snapshot(params):
379 """Create a snapshot device. 380 381 Note that this is only valid for LVM and ExtStorage disks, if we get passed 382 something else we raise an exception. The snapshot device can be 383 remove by calling the generic block device remove call. 384 385 """ 386 (disk, snap_name, snap_size) = params 387 cfbd = objects.Disk.FromDict(disk) 388 return backend.BlockdevSnapshot(cfbd, snap_name, snap_size)
389 390 @staticmethod
391 - def perspective_blockdev_grow(params):
392 """Grow a stack of devices. 393 394 """ 395 if len(params) < 5: 396 raise ValueError("Received only %s parameters in blockdev_grow," 397 " old master?" % len(params)) 398 cfbd = objects.Disk.FromDict(params[0]) 399 amount = params[1] 400 dryrun = params[2] 401 backingstore = params[3] 402 excl_stor = params[4] 403 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
404 405 @staticmethod
406 - def perspective_blockdev_close(params):
407 """Closes the given block devices. 408 409 """ 410 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 411 return backend.BlockdevClose(params[0], disks)
412 413 @staticmethod
414 - def perspective_blockdev_open(params):
415 """Opens the given block devices. 416 417 """ 418 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 419 exclusive = params[2] 420 return backend.BlockdevOpen(params[0], disks, exclusive)
421 422 @staticmethod
424 """Compute the sizes of the given block devices. 425 426 """ 427 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 428 return backend.BlockdevGetdimensions(disks)
429 430 @staticmethod
431 - def perspective_blockdev_setinfo(params):
432 """Sets metadata information on the given block device. 433 434 """ 435 (disk, info) = params 436 disk = objects.Disk.FromDict(disk) 437 return backend.BlockdevSetInfo(disk, info)
438 439 # blockdev/drbd specific methods ---------- 440 441 @staticmethod
443 """Disconnects the network connection of drbd disks. 444 445 Note that this is only valid for drbd disks, so the members of the 446 disk list must all be drbd devices. 447 448 """ 449 (disks,) = params 450 disks = [objects.Disk.FromDict(disk) for disk in disks] 451 return backend.DrbdDisconnectNet(disks)
452 453 @staticmethod
454 - def perspective_drbd_attach_net(params):
455 """Attaches the network connection of drbd disks. 456 457 Note that this is only valid for drbd disks, so the members of the 458 disk list must all be drbd devices. 459 460 """ 461 disks, multimaster = params 462 disks = [objects.Disk.FromDict(disk) for disk in disks] 463 return backend.DrbdAttachNet(disks, multimaster)
464 465 @staticmethod
466 - def perspective_drbd_wait_sync(params):
467 """Wait until DRBD disks are synched. 468 469 Note that this is only valid for drbd disks, so the members of the 470 disk list must all be drbd devices. 471 472 """ 473 (disks,) = params 474 disks = [objects.Disk.FromDict(disk) for disk in disks] 475 return backend.DrbdWaitSync(disks)
476 477 @staticmethod
479 """Checks if the drbd devices need activation 480 481 Note that this is only valid for drbd disks, so the members of the 482 disk list must all be drbd devices. 483 484 """ 485 (disks,) = params 486 disks = [objects.Disk.FromDict(disk) for disk in disks] 487 return backend.DrbdNeedsActivation(disks)
488 489 @staticmethod
491 """Query drbd helper. 492 493 """ 494 return backend.GetDrbdUsermodeHelper()
495 496 # export/import -------------------------- 497 498 @staticmethod
499 - def perspective_finalize_export(params):
500 """Expose the finalize export functionality. 501 502 """ 503 instance = objects.Instance.FromDict(params[0]) 504 505 snap_disks = [] 506 for disk in params[1]: 507 if isinstance(disk, bool): 508 snap_disks.append(disk) 509 else: 510 snap_disks.append(objects.Disk.FromDict(disk)) 511 512 return backend.FinalizeExport(instance, snap_disks)
513 514 @staticmethod
515 - def perspective_export_info(params):
516 """Query information about an existing export on this node. 517 518 The given path may not contain an export, in which case we return 519 None. 520 521 """ 522 path = params[0] 523 return backend.ExportInfo(path)
524 525 @staticmethod
526 - def perspective_export_list(params):
527 """List the available exports on this node. 528 529 Note that as opposed to export_info, which may query data about an 530 export in any path, this only queries the standard Ganeti path 531 (pathutils.EXPORT_DIR). 532 533 """ 534 return backend.ListExports()
535 536 @staticmethod
537 - def perspective_export_remove(params):
538 """Remove an export. 539 540 """ 541 export = params[0] 542 return backend.RemoveExport(export)
543 544 # block device --------------------- 545 @staticmethod
546 - def perspective_bdev_sizes(params):
547 """Query the list of block devices 548 549 """ 550 devices = params[0] 551 return backend.GetBlockDevSizes(devices)
552 553 # volume -------------------------- 554 555 @staticmethod
556 - def perspective_lv_list(params):
557 """Query the list of logical volumes in a given volume group. 558 559 """ 560 vgname = params[0] 561 return backend.GetVolumeList(vgname)
562 563 @staticmethod
564 - def perspective_vg_list(params):
565 """Query the list of volume groups. 566 567 """ 568 return backend.ListVolumeGroups()
569 570 # Storage -------------------------- 571 572 @staticmethod
573 - def perspective_storage_list(params):
574 """Get list of storage units. 575 576 """ 577 (su_name, su_args, name, fields) = params 578 return container.GetStorage(su_name, *su_args).List(name, fields)
579 580 @staticmethod
581 - def perspective_storage_modify(params):
582 """Modify a storage unit. 583 584 """ 585 (su_name, su_args, name, changes) = params 586 return container.GetStorage(su_name, *su_args).Modify(name, changes)
587 588 @staticmethod
589 - def perspective_storage_execute(params):
590 """Execute an operation on a storage unit. 591 592 """ 593 (su_name, su_args, name, op) = params 594 return container.GetStorage(su_name, *su_args).Execute(name, op)
595 596 # bridge -------------------------- 597 598 @staticmethod
599 - def perspective_bridges_exist(params):
600 """Check if all bridges given exist on this node. 601 602 """ 603 bridges_list = params[0] 604 return backend.BridgesExist(bridges_list)
605 606 # instance -------------------------- 607 608 @staticmethod
609 - def perspective_instance_os_add(params):
610 """Install an OS on a given instance. 611 612 """ 613 inst_s = params[0] 614 inst = objects.Instance.FromDict(inst_s) 615 reinstall = params[1] 616 debug = params[2] 617 return backend.InstanceOsAdd(inst, reinstall, debug)
618 619 @staticmethod
621 """Runs the OS rename script for an instance. 622 623 """ 624 inst_s, old_name, debug = params 625 inst = objects.Instance.FromDict(inst_s) 626 return backend.RunRenameInstance(inst, old_name, debug)
627 628 @staticmethod
629 - def perspective_instance_shutdown(params):
630 """Shutdown an instance. 631 632 """ 633 instance = objects.Instance.FromDict(params[0]) 634 timeout = params[1] 635 trail = params[2] 636 _extendReasonTrail(trail, "shutdown") 637 return backend.InstanceShutdown(instance, timeout, trail)
638 639 @staticmethod
640 - def perspective_instance_start(params):
641 """Start an instance. 642 643 """ 644 (instance_name, startup_paused, trail) = params 645 instance = objects.Instance.FromDict(instance_name) 646 _extendReasonTrail(trail, "start") 647 return backend.StartInstance(instance, startup_paused, trail)
648 649 @staticmethod
650 - def perspective_hotplug_device(params):
651 """Hotplugs device to a running instance. 652 653 """ 654 (idict, action, dev_type, ddict, extra, seq) = params 655 instance = objects.Instance.FromDict(idict) 656 if dev_type == constants.HOTPLUG_TARGET_DISK: 657 device = objects.Disk.FromDict(ddict) 658 elif dev_type == constants.HOTPLUG_TARGET_NIC: 659 device = objects.NIC.FromDict(ddict) 660 else: 661 assert dev_type in constants.HOTPLUG_ALL_TARGETS 662 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
663 664 @staticmethod
665 - def perspective_hotplug_supported(params):
666 """Checks if hotplug is supported. 667 668 """ 669 instance = objects.Instance.FromDict(params[0]) 670 return backend.HotplugSupported(instance)
671 672 @staticmethod
674 """Modify instance metadata. 675 676 """ 677 instance = params[0] 678 return backend.ModifyInstanceMetadata(instance)
679 680 @staticmethod
681 - def perspective_migration_info(params):
682 """Gather information about an instance to be migrated. 683 684 """ 685 instance = objects.Instance.FromDict(params[0]) 686 return backend.MigrationInfo(instance)
687 688 @staticmethod
689 - def perspective_accept_instance(params):
690 """Prepare the node to accept an instance. 691 692 """ 693 instance, info, target = params 694 instance = objects.Instance.FromDict(instance) 695 return backend.AcceptInstance(instance, info, target)
696 697 @staticmethod
699 """Finalize the instance migration on the destination node. 700 701 """ 702 instance, info, success = params 703 instance = objects.Instance.FromDict(instance) 704 return backend.FinalizeMigrationDst(instance, info, success)
705 706 @staticmethod
707 - def perspective_instance_migrate(params):
708 """Migrates an instance. 709 710 """ 711 cluster_name, instance, target, live = params 712 instance = objects.Instance.FromDict(instance) 713 return backend.MigrateInstance(cluster_name, instance, target, live)
714 715 @staticmethod
717 """Finalize the instance migration on the source node. 718 719 """ 720 instance, success, live = params 721 instance = objects.Instance.FromDict(instance) 722 return backend.FinalizeMigrationSource(instance, success, live)
723 724 @staticmethod
726 """Reports migration status. 727 728 """ 729 instance = objects.Instance.FromDict(params[0]) 730 return backend.GetMigrationStatus(instance).ToDict()
731 732 @staticmethod
733 - def perspective_instance_reboot(params):
734 """Reboot an instance. 735 736 """ 737 instance = objects.Instance.FromDict(params[0]) 738 reboot_type = params[1] 739 shutdown_timeout = params[2] 740 trail = params[3] 741 _extendReasonTrail(trail, "reboot") 742 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout, 743 trail)
744 745 @staticmethod
747 """Modify instance runtime memory. 748 749 """ 750 instance_dict, memory = params 751 instance = objects.Instance.FromDict(instance_dict) 752 return backend.InstanceBalloonMemory(instance, memory)
753 754 @staticmethod
755 - def perspective_instance_info(params):
756 """Query instance information. 757 758 """ 759 (instance_name, hypervisor_name, hvparams) = params 760 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
761 762 @staticmethod
764 """Query whether the specified instance can be migrated. 765 766 """ 767 instance = objects.Instance.FromDict(params[0]) 768 return backend.GetInstanceMigratable(instance)
769 770 @staticmethod
772 """Query information about all instances. 773 774 """ 775 (hypervisor_list, all_hvparams) = params 776 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
777 778 @staticmethod
780 """Query information on how to get console access to instances 781 782 """ 783 return backend.GetInstanceConsoleInfo(params)
784 785 @staticmethod
786 - def perspective_instance_list(params):
787 """Query the list of running instances. 788 789 """ 790 (hypervisor_list, hvparams) = params 791 return backend.GetInstanceList(hypervisor_list, hvparams)
792 793 # node -------------------------- 794 795 @staticmethod
797 """Checks if a node has the given ip address. 798 799 """ 800 return netutils.IPAddress.Own(params[0])
801 802 @staticmethod
803 - def perspective_node_info(params):
804 """Query node information. 805 806 """ 807 (storage_units, hv_specs) = params 808 return backend.GetNodeInfo(storage_units, hv_specs)
809 810 @staticmethod
811 - def perspective_etc_hosts_modify(params):
812 """Modify a node entry in /etc/hosts. 813 814 """ 815 backend.EtcHostsModify(params[0], params[1], params[2]) 816 817 return True
818 819 @staticmethod
820 - def perspective_node_verify(params):
821 """Run a verify sequence on this node. 822 823 """ 824 (what, cluster_name, hvparams) = params 825 return backend.VerifyNode(what, cluster_name, hvparams)
826 827 @classmethod
828 - def perspective_node_verify_light(cls, params):
829 """Run a light verify sequence on this node. 830 831 This call is meant to perform a less strict verification of the node in 832 certain situations. Right now, it is invoked only when a node is just about 833 to be added to a cluster, and even then, it performs the same checks as 834 L{perspective_node_verify}. 835 """ 836 return cls.perspective_node_verify(params)
837 838 @staticmethod
840 """Start the master daemons on this node. 841 842 """ 843 return backend.StartMasterDaemons(params[0])
844 845 @staticmethod
847 """Activate the master IP on this node. 848 849 """ 850 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 851 return backend.ActivateMasterIp(master_params, params[1])
852 853 @staticmethod
855 """Deactivate the master IP on this node. 856 857 """ 858 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 859 return backend.DeactivateMasterIp(master_params, params[1])
860 861 @staticmethod
862 - def perspective_node_stop_master(params):
863 """Stops master daemons on this node. 864 865 """ 866 return backend.StopMasterDaemons()
867 868 @staticmethod
870 """Change the master IP netmask. 871 872 """ 873 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 874 params[3])
875 876 @staticmethod
878 """Cleanup after leaving a cluster. 879 880 """ 881 return backend.LeaveCluster(params[0])
882 883 @staticmethod
884 - def perspective_node_volumes(params):
885 """Query the list of all logical volume groups. 886 887 """ 888 return backend.NodeVolumes()
889 890 @staticmethod
892 """Demote a node from the master candidate role. 893 894 """ 895 return backend.DemoteFromMC()
896 897 @staticmethod
898 - def perspective_node_powercycle(params):
899 """Tries to powercycle the node. 900 901 """ 902 (hypervisor_type, hvparams) = params 903 return backend.PowercycleNode(hypervisor_type, hvparams)
904 905 @staticmethod
907 """Sets up OpenvSwitch on the node. 908 909 """ 910 (ovs_name, ovs_link) = params 911 return backend.ConfigureOVS(ovs_name, ovs_link)
912 913 @staticmethod
915 """Gets the node's public crypto tokens. 916 917 """ 918 token_requests = params[0] 919 return backend.GetCryptoTokens(token_requests)
920 921 @staticmethod
923 """Ensure daemon is running. 924 925 """ 926 (daemon_name, run) = params 927 return backend.EnsureDaemon(daemon_name, run)
928 929 @staticmethod
930 - def perspective_node_ssh_key_add(params):
931 """Distributes a new node's SSH key if authorized. 932 933 """ 934 (node_uuid, node_name, potential_master_candidates, 935 to_authorized_keys, to_public_keys, get_public_keys, 936 debug, verbose) = params 937 return backend.AddNodeSshKey(node_uuid, node_name, 938 potential_master_candidates, 939 to_authorized_keys=to_authorized_keys, 940 to_public_keys=to_public_keys, 941 get_public_keys=get_public_keys, 942 ssh_update_debug=debug, 943 ssh_update_verbose=verbose)
944 945 @staticmethod
947 """Generates a new root SSH key pair on the node. 948 949 """ 950 (node_uuids, node_names, master_candidate_uuids, 951 potential_master_candidates, old_key_type, new_key_type, 952 new_key_bits, debug, verbose) = params 953 return backend.RenewSshKeys(node_uuids, node_names, master_candidate_uuids, 954 potential_master_candidates, old_key_type, 955 new_key_type, new_key_bits, 956 ssh_update_debug=debug, 957 ssh_update_verbose=verbose)
958 959 @staticmethod
961 """Removes a node's SSH key from the other nodes' SSH files. 962 963 """ 964 (node_uuid, node_name, 965 master_candidate_uuids, potential_master_candidates, 966 from_authorized_keys, from_public_keys, clear_authorized_keys, 967 clear_public_keys, readd, debug, verbose) = params 968 return backend.RemoveNodeSshKey(node_uuid, node_name, 969 master_candidate_uuids, 970 potential_master_candidates, 971 from_authorized_keys=from_authorized_keys, 972 from_public_keys=from_public_keys, 973 clear_authorized_keys=clear_authorized_keys, 974 clear_public_keys=clear_public_keys, 975 readd=readd, 976 ssh_update_debug=debug, 977 ssh_update_verbose=verbose)
978 979 @staticmethod
981 """Removes a node's SSH key from the master's public key file. 982 983 """ 984 (node_name, ) = params 985 return backend.RemoveSshKeyFromPublicKeyFile(node_name)
986 987 # cluster -------------------------- 988 989 @staticmethod
990 - def perspective_version(params):
991 """Query version information. 992 993 """ 994 return constants.PROTOCOL_VERSION
995 996 @staticmethod
997 - def perspective_upload_file(params):
998 """Upload a file. 999 1000 Note that the backend implementation imposes strict rules on which 1001 files are accepted. 1002 1003 """ 1004 return backend.UploadFile(*(params[0]))
1005 1006 @staticmethod
1008 """Upload a file. 1009 1010 Note that the backend implementation imposes strict rules on which 1011 files are accepted. 1012 1013 """ 1014 return backend.UploadFile(*params)
1015 1016 @staticmethod
1017 - def perspective_master_node_name(params):
1018 """Returns the master node name. 1019 1020 """ 1021 return backend.GetMasterNodeName()
1022 1023 @staticmethod
1024 - def perspective_run_oob(params):
1025 """Runs oob on node. 1026 1027 """ 1028 output = backend.RunOob(params[0], params[1], params[2], params[3]) 1029 if output: 1030 result = serializer.LoadJson(output) 1031 else: 1032 result = None 1033 return result
1034 1035 @staticmethod
1037 """Runs a restricted command. 1038 1039 """ 1040 (cmd, ) = params 1041 1042 return backend.RunConstrainedCmd( 1043 cmd, 1044 lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE, 1045 path=pathutils.RESTRICTED_COMMANDS_DIR)
1046 1047 @staticmethod
1048 - def perspective_repair_command(params):
1049 """ Run a repair command. 1050 1051 """ 1052 (cmd, inp, ) = params 1053 1054 return backend.RunConstrainedCmd( 1055 cmd, 1056 lock_file=pathutils.REPAIR_COMMANDS_LOCK_FILE, 1057 path=pathutils.REPAIR_COMMANDS_DIR, 1058 inp=inp)
1059 1060 @staticmethod
1062 """Write ssconf files. 1063 1064 """ 1065 (values,) = params 1066 return ssconf.WriteSsconfFiles(values)
1067 1068 @staticmethod
1069 - def perspective_get_watcher_pause(params):
1070 """Get watcher pause end. 1071 1072 """ 1073 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
1074 1075 @staticmethod
1076 - def perspective_set_watcher_pause(params):
1077 """Set watcher pause. 1078 1079 """ 1080 (until, ) = params 1081 return backend.SetWatcherPause(until)
1082 1083 @staticmethod
1084 - def perspective_get_file_info(params):
1085 """Get info on whether a file exists and its properties. 1086 1087 """ 1088 (path, ) = params 1089 return backend.GetFileInfo(path)
1090 1091 # os ----------------------- 1092 1093 @staticmethod
1094 - def perspective_os_diagnose(params):
1095 """Query detailed information about existing OSes. 1096 1097 """ 1098 return backend.DiagnoseOS()
1099 1100 @staticmethod
1101 - def perspective_os_validate(params):
1102 """Run a given OS' validation routine. 1103 1104 """ 1105 required, name, checks, params, force_variant = params 1106 return backend.ValidateOS(required, name, checks, params, force_variant)
1107 1108 @staticmethod
1109 - def perspective_os_export(params):
1110 """Export an OS definition into an instance specific package. 1111 1112 """ 1113 instance = objects.Instance.FromDict(params[0]) 1114 override_env = params[1] 1115 return backend.ExportOS(instance, override_env)
1116 1117 # extstorage ----------------------- 1118 1119 @staticmethod
1121 """Query detailed information about existing extstorage providers. 1122 1123 """ 1124 return backend.DiagnoseExtStorage()
1125 1126 # hooks ----------------------- 1127 1128 @staticmethod
1129 - def perspective_hooks_runner(params):
1130 """Run hook scripts. 1131 1132 """ 1133 hpath, phase, env = params 1134 hr = backend.HooksRunner() 1135 return hr.RunHooks(hpath, phase, env)
1136 1137 # iallocator ----------------- 1138 1139 @staticmethod
1140 - def perspective_iallocator_runner(params):
1141 """Run an iallocator script. 1142 1143 """ 1144 name, idata, ial_params_dict = params 1145 ial_params = [] 1146 for ial_param in ial_params_dict.items(): 1147 if ial_param[1] is not None: 1148 ial_params.append("--" + ial_param[0] + "=" + ial_param[1]) 1149 else: 1150 ial_params.append("--" + ial_param[0]) 1151 iar = backend.IAllocatorRunner() 1152 return iar.Run(name, idata, ial_params)
1153 1154 # test ----------------------- 1155 1156 @staticmethod
1157 - def perspective_test_delay(params):
1158 """Run test delay. 1159 1160 """ 1161 duration = params[0] 1162 status, rval = utils.TestDelay(duration) 1163 if not status: 1164 raise backend.RPCFail(rval) 1165 return rval
1166 1167 # file storage --------------- 1168 1169 @staticmethod
1171 """Create the file storage directory. 1172 1173 """ 1174 file_storage_dir = params[0] 1175 return backend.CreateFileStorageDir(file_storage_dir)
1176 1177 @staticmethod
1179 """Remove the file storage directory. 1180 1181 """ 1182 file_storage_dir = params[0] 1183 return backend.RemoveFileStorageDir(file_storage_dir)
1184 1185 @staticmethod
1187 """Rename the file storage directory. 1188 1189 """ 1190 old_file_storage_dir = params[0] 1191 new_file_storage_dir = params[1] 1192 return backend.RenameFileStorageDir(old_file_storage_dir, 1193 new_file_storage_dir)
1194 1195 # jobs ------------------------ 1196 1197 @staticmethod 1198 @_RequireJobQueueLock
1199 - def perspective_jobqueue_update(params):
1200 """Update job queue. 1201 1202 """ 1203 (file_name, content) = params 1204 return backend.JobQueueUpdate(file_name, content)
1205 1206 @staticmethod 1207 @_RequireJobQueueLock
1208 - def perspective_jobqueue_purge(params):
1209 """Purge job queue. 1210 1211 """ 1212 return backend.JobQueuePurge()
1213 1214 @staticmethod 1215 @_RequireJobQueueLock
1216 - def perspective_jobqueue_rename(params):
1217 """Rename a job queue file. 1218 1219 """ 1220 # TODO: What if a file fails to rename? 1221 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1222 1223 @staticmethod 1224 @_RequireJobQueueLock
1226 """Set job queue's drain flag. 1227 1228 """ 1229 (flag, ) = params 1230 1231 return jstore.SetDrainFlag(flag)
1232 1233 # hypervisor --------------- 1234 1235 @staticmethod
1237 """Validate the hypervisor parameters. 1238 1239 """ 1240 (hvname, hvparams) = params 1241 return backend.ValidateHVParams(hvname, hvparams)
1242 1243 # Crypto 1244 1245 @staticmethod
1246 - def perspective_x509_cert_create(params):
1247 """Creates a new X509 certificate for SSL/TLS. 1248 1249 """ 1250 (validity, ) = params 1251 return backend.CreateX509Certificate(validity)
1252 1253 @staticmethod
1254 - def perspective_x509_cert_remove(params):
1255 """Removes a X509 certificate. 1256 1257 """ 1258 (name, ) = params 1259 return backend.RemoveX509Certificate(name)
1260 1261 # Import and export 1262 1263 @staticmethod
1264 - def perspective_import_start(params):
1265 """Starts an import daemon. 1266 1267 """ 1268 (opts_s, instance, component, (dest, dest_args)) = params 1269 1270 opts = objects.ImportExportOptions.FromDict(opts_s) 1271 1272 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 1273 None, None, 1274 objects.Instance.FromDict(instance), 1275 component, dest, 1276 _DecodeImportExportIO(dest, 1277 dest_args))
1278 1279 @staticmethod
1280 - def perspective_export_start(params):
1281 """Starts an export daemon. 1282 1283 """ 1284 (opts_s, host, port, instance, component, (source, source_args)) = params 1285 1286 opts = objects.ImportExportOptions.FromDict(opts_s) 1287 1288 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 1289 host, port, 1290 objects.Instance.FromDict(instance), 1291 component, source, 1292 _DecodeImportExportIO(source, 1293 source_args))
1294 1295 @staticmethod
1296 - def perspective_impexp_status(params):
1297 """Retrieves the status of an import or export daemon. 1298 1299 """ 1300 return backend.GetImportExportStatus(params[0])
1301 1302 @staticmethod
1303 - def perspective_impexp_abort(params):
1304 """Aborts an import or export. 1305 1306 """ 1307 return backend.AbortImportExport(params[0])
1308 1309 @staticmethod
1310 - def perspective_impexp_cleanup(params):
1311 """Cleans up after an import or export. 1312 1313 """ 1314 return backend.CleanupImportExport(params[0])
1315
1316 1317 -def CheckNoded(_, args):
1318 """Initial checks whether to run or exit with a failure. 1319 1320 """ 1321 if args: # noded doesn't take any arguments 1322 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1323 sys.argv[0]) 1324 sys.exit(constants.EXIT_FAILURE) 1325 try: 1326 codecs.lookup("string-escape") 1327 except LookupError: 1328 print >> sys.stderr, ("Can't load the string-escape code which is part" 1329 " of the Python installation. Is your installation" 1330 " complete/correct? Aborting.") 1331 sys.exit(constants.EXIT_FAILURE)
1332
1333 1334 -def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1335 """Callback function to verify a peer against the candidate cert map. 1336 1337 Note that we have a chicken-and-egg problem during cluster init and upgrade. 1338 This method checks whether the incoming connection comes from a master 1339 candidate by comparing it to the master certificate map in the cluster 1340 configuration. However, during cluster init and cluster upgrade there 1341 are various RPC calls done to the master node itself, before the candidate 1342 certificate list is established and the cluster configuration is written. 1343 In this case, we cannot check against the master candidate map. 1344 1345 This problem is solved by checking whether the candidate map is empty. An 1346 initialized 2.11 or higher cluster has at least one entry for the master 1347 node in the candidate map. If the map is empty, we know that we are still 1348 in the bootstrap/upgrade phase. In this case, we read the server certificate 1349 digest and compare it to the incoming request. 1350 1351 This means that after an upgrade of Ganeti, the system continues to operate 1352 like before, using server certificates only. After the client certificates 1353 are generated with ``gnt-cluster renew-crypto --new-node-certificates``, 1354 RPC communication is switched to using client certificates and the trick of 1355 using server certificates does not work anymore. 1356 1357 @type conn: C{OpenSSL.SSL.Connection} 1358 @param conn: the OpenSSL connection object 1359 @type cert: C{OpenSSL.X509} 1360 @param cert: the peer's SSL certificate 1361 @type errdepth: integer 1362 @param errdepth: number of the step in the certificate chain starting at 0 1363 for the actual client certificate. 1364 1365 """ 1366 # some parameters are unused, but this is the API 1367 # pylint: disable=W0613 1368 1369 # If we receive a certificate from the certificate chain that is higher 1370 # than the lowest element of the chain, we have to check it against the 1371 # server certificate. 1372 if errdepth > 0: 1373 server_digest = utils.GetCertificateDigest( 1374 cert_filename=pathutils.NODED_CERT_FILE) 1375 match = cert.digest("sha1") == server_digest 1376 if not match: 1377 logging.debug("Received certificate from the certificate chain, which" 1378 " does not match the server certficate. Digest of the" 1379 " received certificate: %s. Digest of the server" 1380 " certificate: %s.", cert.digest("sha1"), server_digest) 1381 return match 1382 elif errdepth == 0: 1383 sstore = ssconf.SimpleStore() 1384 try: 1385 candidate_certs = sstore.GetMasterCandidatesCertMap() 1386 except errors.ConfigurationError: 1387 logging.info("No candidate certificates found. Switching to " 1388 "bootstrap/update mode.") 1389 candidate_certs = None 1390 if not candidate_certs: 1391 candidate_certs = { 1392 constants.CRYPTO_BOOTSTRAP: utils.GetCertificateDigest( 1393 cert_filename=pathutils.NODED_CERT_FILE)} 1394 match = cert.digest("sha1") in candidate_certs.values() 1395 if not match: 1396 logging.debug("Received certificate which is not a certificate of a" 1397 " master candidate. Certificate digest: %s. List of master" 1398 " candidate certificate digests: %s.", cert.digest("sha1"), 1399 str(candidate_certs)) 1400 return match 1401 else: 1402 logging.error("Invalid errdepth value: %s.", errdepth) 1403 return False
1404 # pylint: enable=W0613
1405 1406 1407 -def PrepNoded(options, _):
1408 """Preparation node daemon function, executed with the PID file held. 1409 1410 """ 1411 if options.mlock: 1412 request_executor_class = MlockallRequestExecutor 1413 try: 1414 utils.Mlockall() 1415 except errors.NoCtypesError: 1416 logging.warning("Cannot set memory lock, ctypes module not found") 1417 request_executor_class = http.server.HttpServerRequestExecutor 1418 else: 1419 request_executor_class = http.server.HttpServerRequestExecutor 1420 1421 # Read SSL certificate 1422 if options.ssl: 1423 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1424 ssl_cert_path=options.ssl_cert) 1425 else: 1426 ssl_params = None 1427 1428 err = _PrepareQueueLock() 1429 if err is not None: 1430 # this might be some kind of file-system/permission error; while 1431 # this breaks the job queue functionality, we shouldn't prevent 1432 # startup of the whole node daemon because of this 1433 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1434 1435 handler = NodeRequestHandler() 1436 1437 mainloop = daemon.Mainloop() 1438 server = \ 1439 http.server.HttpServer(mainloop, options.bind_address, options.port, 1440 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1441 request_executor_class=request_executor_class, 1442 ssl_verify_callback=SSLVerifyPeer) 1443 server.Start() 1444 1445 return (mainloop, server)
1446
1447 1448 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1449 """Main node daemon function, executed with the PID file held. 1450 1451 """ 1452 (mainloop, server) = prep_data 1453 try: 1454 mainloop.Run() 1455 finally: 1456 server.Stop() 1457
1458 1459 -def Main():
1460 """Main function for the node daemon. 1461 1462 """ 1463 parser = OptionParser(description="Ganeti node daemon", 1464 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]" 1465 " [-i INTERFACE]"), 1466 version="%%prog (ganeti) %s" % 1467 constants.RELEASE_VERSION) 1468 parser.add_option("--no-mlock", dest="mlock", 1469 help="Do not mlock the node memory in ram", 1470 default=True, action="store_false") 1471 1472 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1473 default_ssl_cert=pathutils.NODED_CERT_FILE, 1474 default_ssl_key=pathutils.NODED_CERT_FILE, 1475 console_logging=True, 1476 warn_breach=True)
1477