Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Ganeti node daemon""" 
  32   
  33  # pylint: disable=C0103,W0142 
  34   
  35  # C0103: Functions in this module need to have a given name structure, 
  36  # and the name of the daemon doesn't match 
  37   
  38  # W0142: Used * or ** magic, since we do use it extensively in this 
  39  # module 
  40   
  41  import os 
  42  import sys 
  43  import logging 
  44  import signal 
  45  import codecs 
  46   
  47  from optparse import OptionParser 
  48   
  49  from ganeti import backend 
  50  from ganeti import constants 
  51  from ganeti import objects 
  52  from ganeti import errors 
  53  from ganeti import jstore 
  54  from ganeti import daemon 
  55  from ganeti import http 
  56  from ganeti import utils 
  57  from ganeti.storage import container 
  58  from ganeti import serializer 
  59  from ganeti import netutils 
  60  from ganeti import pathutils 
  61  from ganeti import ssconf 
  62   
  63  import ganeti.http.server # pylint: disable=W0611 
  64   
  65   
  66  queue_lock = None 
67 68 69 -def _extendReasonTrail(trail, source, reason=""):
70 """Extend the reason trail with noded information 71 72 The trail is extended by appending the name of the noded functionality 73 """ 74 assert trail is not None 75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source) 76 trail.append((trail_source, reason, utils.EpochNano()))
77
78 79 -def _PrepareQueueLock():
80 """Try to prepare the queue lock. 81 82 @return: None for success, otherwise an exception object 83 84 """ 85 global queue_lock # pylint: disable=W0603 86 87 if queue_lock is not None: 88 return None 89 90 # Prepare job queue 91 try: 92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 93 return None 94 except EnvironmentError, err: 95 return err
96
97 98 -def _RequireJobQueueLock(fn):
99 """Decorator for job queue manipulating functions. 100 101 """ 102 QUEUE_LOCK_TIMEOUT = 10 103 104 def wrapper(*args, **kwargs): 105 # Locking in exclusive, blocking mode because there could be several 106 # children running at the same time. Waiting up to 10 seconds. 107 if _PrepareQueueLock() is not None: 108 raise errors.JobQueueError("Job queue failed initialization," 109 " cannot update jobs") 110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 111 try: 112 return fn(*args, **kwargs) 113 finally: 114 queue_lock.Unlock()
115 116 return wrapper 117
118 119 -def _DecodeImportExportIO(ieio, ieioargs):
120 """Decodes import/export I/O information. 121 122 """ 123 if ieio == constants.IEIO_RAW_DISK: 124 assert len(ieioargs) == 1 125 return (objects.Disk.FromDict(ieioargs[0]), ) 126 127 if ieio == constants.IEIO_SCRIPT: 128 assert len(ieioargs) == 2 129 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 130 131 return ieioargs
132
133 134 -def _DefaultAlternative(value, default):
135 """Returns value or, if evaluating to False, a default value. 136 137 Returns the given value, unless it evaluates to False. In the latter case the 138 default value is returned. 139 140 @param value: Value to return if it doesn't evaluate to False 141 @param default: Default value 142 @return: Given value or the default 143 144 """ 145 if value: 146 return value 147 148 return default
149
150 151 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
152 """Subclass ensuring request handlers are locked in RAM. 153 154 """
155 - def __init__(self, *args, **kwargs):
156 utils.Mlockall() 157 158 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
159
160 161 -class NodeRequestHandler(http.server.HttpServerHandler):
162 """The server implementation. 163 164 This class holds all methods exposed over the RPC interface. 165 166 """ 167 # too many public methods, and unused args - all methods get params 168 # due to the API 169 # pylint: disable=R0904,W0613
170 - def __init__(self):
171 http.server.HttpServerHandler.__init__(self) 172 self.noded_pid = os.getpid()
173
174 - def HandleRequest(self, req):
175 """Handle a request. 176 177 """ 178 179 if req.request_method.upper() != http.HTTP_POST: 180 raise http.HttpBadRequest("Only the POST method is supported") 181 182 path = req.request_path 183 if path.startswith("/"): 184 path = path[1:] 185 186 method = getattr(self, "perspective_%s" % path, None) 187 if method is None: 188 raise http.HttpNotFound() 189 190 try: 191 result = (True, method(serializer.LoadJson(req.request_body))) 192 193 except backend.RPCFail, err: 194 # our custom failure exception; str(err) works fine if the 195 # exception was constructed with a single argument, and in 196 # this case, err.message == err.args[0] == str(err) 197 result = (False, str(err)) 198 except errors.QuitGanetiException, err: 199 # Tell parent to quit 200 logging.info("Shutting down the node daemon, arguments: %s", 201 str(err.args)) 202 os.kill(self.noded_pid, signal.SIGTERM) 203 # And return the error's arguments, which must be already in 204 # correct tuple format 205 result = err.args 206 except Exception, err: 207 logging.exception("Error in RPC call") 208 result = (False, "Error while executing backend function: %s" % str(err)) 209 210 return serializer.DumpJson(result)
211 212 # the new block devices -------------------------- 213 214 @staticmethod
215 - def perspective_blockdev_create(params):
216 """Create a block device. 217 218 """ 219 (bdev_s, size, owner, on_primary, info, excl_stor) = params 220 bdev = objects.Disk.FromDict(bdev_s) 221 if bdev is None: 222 raise ValueError("can't unserialize data!") 223 return backend.BlockdevCreate(bdev, size, owner, on_primary, info, 224 excl_stor)
225 226 @staticmethod
228 """Pause/resume sync of a block device. 229 230 """ 231 disks_s, pause = params 232 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 233 return backend.BlockdevPauseResumeSync(disks, pause)
234 235 @staticmethod
236 - def perspective_blockdev_image(params):
237 """Image a block device. 238 239 """ 240 bdev_s, image, size = params 241 bdev = objects.Disk.FromDict(bdev_s) 242 return backend.BlockdevImage(bdev, image, size)
243 244 @staticmethod
245 - def perspective_blockdev_wipe(params):
246 """Wipe a block device. 247 248 """ 249 bdev_s, offset, size = params 250 bdev = objects.Disk.FromDict(bdev_s) 251 return backend.BlockdevWipe(bdev, offset, size)
252 253 @staticmethod
254 - def perspective_blockdev_remove(params):
255 """Remove a block device. 256 257 """ 258 bdev_s = params[0] 259 bdev = objects.Disk.FromDict(bdev_s) 260 return backend.BlockdevRemove(bdev)
261 262 @staticmethod
263 - def perspective_blockdev_rename(params):
264 """Remove a block device. 265 266 """ 267 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 268 return backend.BlockdevRename(devlist)
269 270 @staticmethod
271 - def perspective_blockdev_assemble(params):
272 """Assemble a block device. 273 274 """ 275 bdev_s, idict, on_primary, idx = params 276 bdev = objects.Disk.FromDict(bdev_s) 277 instance = objects.Instance.FromDict(idict) 278 if bdev is None: 279 raise ValueError("can't unserialize data!") 280 return backend.BlockdevAssemble(bdev, instance, on_primary, idx)
281 282 @staticmethod
283 - def perspective_blockdev_shutdown(params):
284 """Shutdown a block device. 285 286 """ 287 bdev_s = params[0] 288 bdev = objects.Disk.FromDict(bdev_s) 289 if bdev is None: 290 raise ValueError("can't unserialize data!") 291 return backend.BlockdevShutdown(bdev)
292 293 @staticmethod
295 """Add a child to a mirror device. 296 297 Note: this is only valid for mirror devices. It's the caller's duty 298 to send a correct disk, otherwise we raise an error. 299 300 """ 301 bdev_s, ndev_s = params 302 bdev = objects.Disk.FromDict(bdev_s) 303 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 304 if bdev is None or ndevs.count(None) > 0: 305 raise ValueError("can't unserialize data!") 306 return backend.BlockdevAddchildren(bdev, ndevs)
307 308 @staticmethod
310 """Remove a child from a mirror device. 311 312 This is only valid for mirror devices, of course. It's the callers 313 duty to send a correct disk, otherwise we raise an error. 314 315 """ 316 bdev_s, ndev_s = params 317 bdev = objects.Disk.FromDict(bdev_s) 318 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 319 if bdev is None or ndevs.count(None) > 0: 320 raise ValueError("can't unserialize data!") 321 return backend.BlockdevRemovechildren(bdev, ndevs)
322 323 @staticmethod
325 """Return the mirror status for a list of disks. 326 327 """ 328 disks = [objects.Disk.FromDict(dsk_s) 329 for dsk_s in params[0]] 330 return [status.ToDict() 331 for status in backend.BlockdevGetmirrorstatus(disks)]
332 333 @staticmethod
335 """Return the mirror status for a list of disks. 336 337 """ 338 (node_disks, ) = params 339 340 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 341 342 result = [] 343 344 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 345 if success: 346 result.append((success, status.ToDict())) 347 else: 348 result.append((success, status)) 349 350 return result
351 352 @staticmethod
353 - def perspective_blockdev_find(params):
354 """Expose the FindBlockDevice functionality for a disk. 355 356 This will try to find but not activate a disk. 357 358 """ 359 disk = objects.Disk.FromDict(params[0]) 360 361 result = backend.BlockdevFind(disk) 362 if result is None: 363 return None 364 365 return result.ToDict()
366 367 @staticmethod
368 - def perspective_blockdev_snapshot(params):
369 """Create a snapshot device. 370 371 Note that this is only valid for LVM disks, if we get passed 372 something else we raise an exception. The snapshot device can be 373 remove by calling the generic block device remove call. 374 375 """ 376 cfbd = objects.Disk.FromDict(params[0]) 377 return backend.BlockdevSnapshot(cfbd)
378 379 @staticmethod
380 - def perspective_blockdev_grow(params):
381 """Grow a stack of devices. 382 383 """ 384 if len(params) < 5: 385 raise ValueError("Received only %s parameters in blockdev_grow," 386 " old master?" % len(params)) 387 cfbd = objects.Disk.FromDict(params[0]) 388 amount = params[1] 389 dryrun = params[2] 390 backingstore = params[3] 391 excl_stor = params[4] 392 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
393 394 @staticmethod
395 - def perspective_blockdev_close(params):
396 """Closes the given block devices. 397 398 """ 399 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 400 return backend.BlockdevClose(params[0], disks)
401 402 @staticmethod
404 """Compute the sizes of the given block devices. 405 406 """ 407 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 408 return backend.BlockdevGetdimensions(disks)
409 410 @staticmethod
411 - def perspective_blockdev_setinfo(params):
412 """Sets metadata information on the given block device. 413 414 """ 415 (disk, info) = params 416 disk = objects.Disk.FromDict(disk) 417 return backend.BlockdevSetInfo(disk, info)
418 419 # blockdev/drbd specific methods ---------- 420 421 @staticmethod
423 """Disconnects the network connection of drbd disks. 424 425 Note that this is only valid for drbd disks, so the members of the 426 disk list must all be drbd devices. 427 428 """ 429 (disks,) = params 430 disks = [objects.Disk.FromDict(disk) for disk in disks] 431 return backend.DrbdDisconnectNet(disks)
432 433 @staticmethod
434 - def perspective_drbd_attach_net(params):
435 """Attaches the network connection of drbd disks. 436 437 Note that this is only valid for drbd disks, so the members of the 438 disk list must all be drbd devices. 439 440 """ 441 disks, instance_name, multimaster = params 442 disks = [objects.Disk.FromDict(disk) for disk in disks] 443 return backend.DrbdAttachNet(disks, instance_name, multimaster)
444 445 @staticmethod
446 - def perspective_drbd_wait_sync(params):
447 """Wait until DRBD disks are synched. 448 449 Note that this is only valid for drbd disks, so the members of the 450 disk list must all be drbd devices. 451 452 """ 453 (disks,) = params 454 disks = [objects.Disk.FromDict(disk) for disk in disks] 455 return backend.DrbdWaitSync(disks)
456 457 @staticmethod
459 """Checks if the drbd devices need activation 460 461 Note that this is only valid for drbd disks, so the members of the 462 disk list must all be drbd devices. 463 464 """ 465 (disks,) = params 466 disks = [objects.Disk.FromDict(disk) for disk in disks] 467 return backend.DrbdNeedsActivation(disks)
468 469 @staticmethod
471 """Query drbd helper. 472 473 """ 474 return backend.GetDrbdUsermodeHelper()
475 476 # export/import -------------------------- 477 478 @staticmethod
479 - def perspective_finalize_export(params):
480 """Expose the finalize export functionality. 481 482 """ 483 instance = objects.Instance.FromDict(params[0]) 484 485 snap_disks = [] 486 for disk in params[1]: 487 if isinstance(disk, bool): 488 snap_disks.append(disk) 489 else: 490 snap_disks.append(objects.Disk.FromDict(disk)) 491 492 return backend.FinalizeExport(instance, snap_disks)
493 494 @staticmethod
495 - def perspective_export_info(params):
496 """Query information about an existing export on this node. 497 498 The given path may not contain an export, in which case we return 499 None. 500 501 """ 502 path = params[0] 503 return backend.ExportInfo(path)
504 505 @staticmethod
506 - def perspective_export_list(params):
507 """List the available exports on this node. 508 509 Note that as opposed to export_info, which may query data about an 510 export in any path, this only queries the standard Ganeti path 511 (pathutils.EXPORT_DIR). 512 513 """ 514 return backend.ListExports()
515 516 @staticmethod
517 - def perspective_export_remove(params):
518 """Remove an export. 519 520 """ 521 export = params[0] 522 return backend.RemoveExport(export)
523 524 # block device --------------------- 525 @staticmethod
526 - def perspective_bdev_sizes(params):
527 """Query the list of block devices 528 529 """ 530 devices = params[0] 531 return backend.GetBlockDevSizes(devices)
532 533 # volume -------------------------- 534 535 @staticmethod
536 - def perspective_lv_list(params):
537 """Query the list of logical volumes in a given volume group. 538 539 """ 540 vgname = params[0] 541 return backend.GetVolumeList(vgname)
542 543 @staticmethod
544 - def perspective_vg_list(params):
545 """Query the list of volume groups. 546 547 """ 548 return backend.ListVolumeGroups()
549 550 # Storage -------------------------- 551 552 @staticmethod
553 - def perspective_storage_list(params):
554 """Get list of storage units. 555 556 """ 557 (su_name, su_args, name, fields) = params 558 return container.GetStorage(su_name, *su_args).List(name, fields)
559 560 @staticmethod
561 - def perspective_storage_modify(params):
562 """Modify a storage unit. 563 564 """ 565 (su_name, su_args, name, changes) = params 566 return container.GetStorage(su_name, *su_args).Modify(name, changes)
567 568 @staticmethod
569 - def perspective_storage_execute(params):
570 """Execute an operation on a storage unit. 571 572 """ 573 (su_name, su_args, name, op) = params 574 return container.GetStorage(su_name, *su_args).Execute(name, op)
575 576 # bridge -------------------------- 577 578 @staticmethod
579 - def perspective_bridges_exist(params):
580 """Check if all bridges given exist on this node. 581 582 """ 583 bridges_list = params[0] 584 return backend.BridgesExist(bridges_list)
585 586 # instance -------------------------- 587 588 @staticmethod
589 - def perspective_instance_os_add(params):
590 """Install an OS on a given instance. 591 592 """ 593 inst_s = params[0] 594 inst = objects.Instance.FromDict(inst_s) 595 reinstall = params[1] 596 debug = params[2] 597 return backend.InstanceOsAdd(inst, reinstall, debug)
598 599 @staticmethod
601 """Runs the OS rename script for an instance. 602 603 """ 604 inst_s, old_name, debug = params 605 inst = objects.Instance.FromDict(inst_s) 606 return backend.RunRenameInstance(inst, old_name, debug)
607 608 @staticmethod
609 - def perspective_instance_shutdown(params):
610 """Shutdown an instance. 611 612 """ 613 instance = objects.Instance.FromDict(params[0]) 614 timeout = params[1] 615 trail = params[2] 616 _extendReasonTrail(trail, "shutdown") 617 return backend.InstanceShutdown(instance, timeout, trail)
618 619 @staticmethod
620 - def perspective_instance_start(params):
621 """Start an instance. 622 623 """ 624 (instance_name, startup_paused, trail) = params 625 instance = objects.Instance.FromDict(instance_name) 626 _extendReasonTrail(trail, "start") 627 return backend.StartInstance(instance, startup_paused, trail)
628 629 @staticmethod
630 - def perspective_hotplug_device(params):
631 """Hotplugs device to a running instance. 632 633 """ 634 (idict, action, dev_type, ddict, extra, seq) = params 635 instance = objects.Instance.FromDict(idict) 636 if dev_type == constants.HOTPLUG_TARGET_DISK: 637 device = objects.Disk.FromDict(ddict) 638 elif dev_type == constants.HOTPLUG_TARGET_NIC: 639 device = objects.NIC.FromDict(ddict) 640 else: 641 assert dev_type in constants.HOTPLUG_ALL_TARGETS 642 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
643 644 @staticmethod
645 - def perspective_hotplug_supported(params):
646 """Checks if hotplug is supported. 647 648 """ 649 instance = objects.Instance.FromDict(params[0]) 650 return backend.HotplugSupported(instance)
651 652 @staticmethod
654 """Modify instance metadata. 655 656 """ 657 instance = params[0] 658 return backend.ModifyInstanceMetadata(instance)
659 660 @staticmethod
661 - def perspective_migration_info(params):
662 """Gather information about an instance to be migrated. 663 664 """ 665 instance = objects.Instance.FromDict(params[0]) 666 return backend.MigrationInfo(instance)
667 668 @staticmethod
669 - def perspective_accept_instance(params):
670 """Prepare the node to accept an instance. 671 672 """ 673 instance, info, target = params 674 instance = objects.Instance.FromDict(instance) 675 return backend.AcceptInstance(instance, info, target)
676 677 @staticmethod
679 """Finalize the instance migration on the destination node. 680 681 """ 682 instance, info, success = params 683 instance = objects.Instance.FromDict(instance) 684 return backend.FinalizeMigrationDst(instance, info, success)
685 686 @staticmethod
687 - def perspective_instance_migrate(params):
688 """Migrates an instance. 689 690 """ 691 cluster_name, instance, target, live = params 692 instance = objects.Instance.FromDict(instance) 693 return backend.MigrateInstance(cluster_name, instance, target, live)
694 695 @staticmethod
697 """Finalize the instance migration on the source node. 698 699 """ 700 instance, success, live = params 701 instance = objects.Instance.FromDict(instance) 702 return backend.FinalizeMigrationSource(instance, success, live)
703 704 @staticmethod
706 """Reports migration status. 707 708 """ 709 instance = objects.Instance.FromDict(params[0]) 710 return backend.GetMigrationStatus(instance).ToDict()
711 712 @staticmethod
713 - def perspective_instance_reboot(params):
714 """Reboot an instance. 715 716 """ 717 instance = objects.Instance.FromDict(params[0]) 718 reboot_type = params[1] 719 shutdown_timeout = params[2] 720 trail = params[3] 721 _extendReasonTrail(trail, "reboot") 722 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout, 723 trail)
724 725 @staticmethod
727 """Modify instance runtime memory. 728 729 """ 730 instance_dict, memory = params 731 instance = objects.Instance.FromDict(instance_dict) 732 return backend.InstanceBalloonMemory(instance, memory)
733 734 @staticmethod
735 - def perspective_instance_info(params):
736 """Query instance information. 737 738 """ 739 (instance_name, hypervisor_name, hvparams) = params 740 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
741 742 @staticmethod
744 """Query whether the specified instance can be migrated. 745 746 """ 747 instance = objects.Instance.FromDict(params[0]) 748 return backend.GetInstanceMigratable(instance)
749 750 @staticmethod
752 """Query information about all instances. 753 754 """ 755 (hypervisor_list, all_hvparams) = params 756 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
757 758 @staticmethod
760 """Query information on how to get console access to instances 761 762 """ 763 return backend.GetInstanceConsoleInfo(params)
764 765 @staticmethod
766 - def perspective_instance_list(params):
767 """Query the list of running instances. 768 769 """ 770 (hypervisor_list, hvparams) = params 771 return backend.GetInstanceList(hypervisor_list, hvparams)
772 773 # node -------------------------- 774 775 @staticmethod
777 """Checks if a node has the given ip address. 778 779 """ 780 return netutils.IPAddress.Own(params[0])
781 782 @staticmethod
783 - def perspective_node_info(params):
784 """Query node information. 785 786 """ 787 (storage_units, hv_specs) = params 788 return backend.GetNodeInfo(storage_units, hv_specs)
789 790 @staticmethod
791 - def perspective_etc_hosts_modify(params):
792 """Modify a node entry in /etc/hosts. 793 794 """ 795 backend.EtcHostsModify(params[0], params[1], params[2]) 796 797 return True
798 799 @staticmethod
800 - def perspective_node_verify(params):
801 """Run a verify sequence on this node. 802 803 """ 804 (what, cluster_name, hvparams, node_groups, groups_cfg) = params 805 return backend.VerifyNode(what, cluster_name, hvparams, 806 node_groups, groups_cfg)
807 808 @classmethod
809 - def perspective_node_verify_light(cls, params):
810 """Run a light verify sequence on this node. 811 812 This call is meant to perform a less strict verification of the node in 813 certain situations. Right now, it is invoked only when a node is just about 814 to be added to a cluster, and even then, it performs the same checks as 815 L{perspective_node_verify}. 816 """ 817 return cls.perspective_node_verify(params)
818 819 @staticmethod
821 """Start the master daemons on this node. 822 823 """ 824 return backend.StartMasterDaemons(params[0])
825 826 @staticmethod
828 """Activate the master IP on this node. 829 830 """ 831 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 832 return backend.ActivateMasterIp(master_params, params[1])
833 834 @staticmethod
836 """Deactivate the master IP on this node. 837 838 """ 839 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 840 return backend.DeactivateMasterIp(master_params, params[1])
841 842 @staticmethod
843 - def perspective_node_stop_master(params):
844 """Stops master daemons on this node. 845 846 """ 847 return backend.StopMasterDaemons()
848 849 @staticmethod
851 """Change the master IP netmask. 852 853 """ 854 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 855 params[3])
856 857 @staticmethod
859 """Cleanup after leaving a cluster. 860 861 """ 862 return backend.LeaveCluster(params[0])
863 864 @staticmethod
865 - def perspective_node_volumes(params):
866 """Query the list of all logical volume groups. 867 868 """ 869 return backend.NodeVolumes()
870 871 @staticmethod
873 """Demote a node from the master candidate role. 874 875 """ 876 return backend.DemoteFromMC()
877 878 @staticmethod
879 - def perspective_node_powercycle(params):
880 """Tries to powercycle the node. 881 882 """ 883 (hypervisor_type, hvparams) = params 884 return backend.PowercycleNode(hypervisor_type, hvparams)
885 886 @staticmethod
888 """Sets up OpenvSwitch on the node. 889 890 """ 891 (ovs_name, ovs_link) = params 892 return backend.ConfigureOVS(ovs_name, ovs_link)
893 894 @staticmethod
896 """Gets the node's public crypto tokens. 897 898 """ 899 token_requests = params[0] 900 return backend.GetCryptoTokens(token_requests)
901 902 @staticmethod
904 """Ensure daemon is running. 905 906 """ 907 (daemon_name, run) = params 908 return backend.EnsureDaemon(daemon_name, run)
909 910 # cluster -------------------------- 911 912 @staticmethod
913 - def perspective_version(params):
914 """Query version information. 915 916 """ 917 return constants.PROTOCOL_VERSION
918 919 @staticmethod
920 - def perspective_upload_file(params):
921 """Upload a file. 922 923 Note that the backend implementation imposes strict rules on which 924 files are accepted. 925 926 """ 927 return backend.UploadFile(*(params[0]))
928 929 @staticmethod
931 """Upload a file. 932 933 Note that the backend implementation imposes strict rules on which 934 files are accepted. 935 936 """ 937 return backend.UploadFile(*params)
938 939 @staticmethod
940 - def perspective_master_node_name(params):
941 """Returns the master node name. 942 943 """ 944 return backend.GetMasterNodeName()
945 946 @staticmethod
947 - def perspective_run_oob(params):
948 """Runs oob on node. 949 950 """ 951 output = backend.RunOob(params[0], params[1], params[2], params[3]) 952 if output: 953 result = serializer.LoadJson(output) 954 else: 955 result = None 956 return result
957 958 @staticmethod
960 """Runs a restricted command. 961 962 """ 963 (cmd, ) = params 964 965 return backend.RunRestrictedCmd(cmd)
966 967 @staticmethod
969 """Write ssconf files. 970 971 """ 972 (values,) = params 973 return ssconf.WriteSsconfFiles(values)
974 975 @staticmethod
976 - def perspective_get_watcher_pause(params):
977 """Get watcher pause end. 978 979 """ 980 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
981 982 @staticmethod
983 - def perspective_set_watcher_pause(params):
984 """Set watcher pause. 985 986 """ 987 (until, ) = params 988 return backend.SetWatcherPause(until)
989 990 @staticmethod
991 - def perspective_get_file_info(params):
992 """Get info on whether a file exists and its properties. 993 994 """ 995 (path, ) = params 996 return backend.GetFileInfo(path)
997 998 # os ----------------------- 999 1000 @staticmethod
1001 - def perspective_os_diagnose(params):
1002 """Query detailed information about existing OSes. 1003 1004 """ 1005 return backend.DiagnoseOS()
1006 1007 @staticmethod
1008 - def perspective_os_validate(params):
1009 """Run a given OS' validation routine. 1010 1011 """ 1012 required, name, checks, params, force_variant = params 1013 return backend.ValidateOS(required, name, checks, params, force_variant)
1014 1015 @staticmethod
1016 - def perspective_os_export(params):
1017 """Export an OS definition into an instance specific package. 1018 1019 """ 1020 instance = objects.Instance.FromDict(params[0]) 1021 override_env = params[1] 1022 return backend.ExportOS(instance, override_env)
1023 1024 # extstorage ----------------------- 1025 1026 @staticmethod
1028 """Query detailed information about existing extstorage providers. 1029 1030 """ 1031 return backend.DiagnoseExtStorage()
1032 1033 # hooks ----------------------- 1034 1035 @staticmethod
1036 - def perspective_hooks_runner(params):
1037 """Run hook scripts. 1038 1039 """ 1040 hpath, phase, env = params 1041 hr = backend.HooksRunner() 1042 return hr.RunHooks(hpath, phase, env)
1043 1044 # iallocator ----------------- 1045 1046 @staticmethod
1047 - def perspective_iallocator_runner(params):
1048 """Run an iallocator script. 1049 1050 """ 1051 name, idata, ial_params_dict = params 1052 ial_params = [] 1053 for ial_param in ial_params_dict.items(): 1054 ial_params.append("--" + ial_param[0] + "=" + ial_param[1]) 1055 iar = backend.IAllocatorRunner() 1056 return iar.Run(name, idata, ial_params)
1057 1058 # test ----------------------- 1059 1060 @staticmethod
1061 - def perspective_test_delay(params):
1062 """Run test delay. 1063 1064 """ 1065 duration = params[0] 1066 status, rval = utils.TestDelay(duration) 1067 if not status: 1068 raise backend.RPCFail(rval) 1069 return rval
1070 1071 # file storage --------------- 1072 1073 @staticmethod
1075 """Create the file storage directory. 1076 1077 """ 1078 file_storage_dir = params[0] 1079 return backend.CreateFileStorageDir(file_storage_dir)
1080 1081 @staticmethod
1083 """Remove the file storage directory. 1084 1085 """ 1086 file_storage_dir = params[0] 1087 return backend.RemoveFileStorageDir(file_storage_dir)
1088 1089 @staticmethod
1091 """Rename the file storage directory. 1092 1093 """ 1094 old_file_storage_dir = params[0] 1095 new_file_storage_dir = params[1] 1096 return backend.RenameFileStorageDir(old_file_storage_dir, 1097 new_file_storage_dir)
1098 1099 # jobs ------------------------ 1100 1101 @staticmethod 1102 @_RequireJobQueueLock
1103 - def perspective_jobqueue_update(params):
1104 """Update job queue. 1105 1106 """ 1107 (file_name, content) = params 1108 return backend.JobQueueUpdate(file_name, content)
1109 1110 @staticmethod 1111 @_RequireJobQueueLock
1112 - def perspective_jobqueue_purge(params):
1113 """Purge job queue. 1114 1115 """ 1116 return backend.JobQueuePurge()
1117 1118 @staticmethod 1119 @_RequireJobQueueLock
1120 - def perspective_jobqueue_rename(params):
1121 """Rename a job queue file. 1122 1123 """ 1124 # TODO: What if a file fails to rename? 1125 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1126 1127 @staticmethod 1128 @_RequireJobQueueLock
1130 """Set job queue's drain flag. 1131 1132 """ 1133 (flag, ) = params 1134 1135 return jstore.SetDrainFlag(flag)
1136 1137 # hypervisor --------------- 1138 1139 @staticmethod
1141 """Validate the hypervisor parameters. 1142 1143 """ 1144 (hvname, hvparams) = params 1145 return backend.ValidateHVParams(hvname, hvparams)
1146 1147 # Crypto 1148 1149 @staticmethod
1150 - def perspective_x509_cert_create(params):
1151 """Creates a new X509 certificate for SSL/TLS. 1152 1153 """ 1154 (validity, ) = params 1155 return backend.CreateX509Certificate(validity)
1156 1157 @staticmethod
1158 - def perspective_x509_cert_remove(params):
1159 """Removes a X509 certificate. 1160 1161 """ 1162 (name, ) = params 1163 return backend.RemoveX509Certificate(name)
1164 1165 # Import and export 1166 1167 @staticmethod
1168 - def perspective_import_start(params):
1169 """Starts an import daemon. 1170 1171 """ 1172 (opts_s, instance, component, (dest, dest_args)) = params 1173 1174 opts = objects.ImportExportOptions.FromDict(opts_s) 1175 1176 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 1177 None, None, 1178 objects.Instance.FromDict(instance), 1179 component, dest, 1180 _DecodeImportExportIO(dest, 1181 dest_args))
1182 1183 @staticmethod
1184 - def perspective_export_start(params):
1185 """Starts an export daemon. 1186 1187 """ 1188 (opts_s, host, port, instance, component, (source, source_args)) = params 1189 1190 opts = objects.ImportExportOptions.FromDict(opts_s) 1191 1192 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 1193 host, port, 1194 objects.Instance.FromDict(instance), 1195 component, source, 1196 _DecodeImportExportIO(source, 1197 source_args))
1198 1199 @staticmethod
1200 - def perspective_impexp_status(params):
1201 """Retrieves the status of an import or export daemon. 1202 1203 """ 1204 return backend.GetImportExportStatus(params[0])
1205 1206 @staticmethod
1207 - def perspective_impexp_abort(params):
1208 """Aborts an import or export. 1209 1210 """ 1211 return backend.AbortImportExport(params[0])
1212 1213 @staticmethod
1214 - def perspective_impexp_cleanup(params):
1215 """Cleans up after an import or export. 1216 1217 """ 1218 return backend.CleanupImportExport(params[0])
1219
1220 1221 -def CheckNoded(_, args):
1222 """Initial checks whether to run or exit with a failure. 1223 1224 """ 1225 if args: # noded doesn't take any arguments 1226 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1227 sys.argv[0]) 1228 sys.exit(constants.EXIT_FAILURE) 1229 try: 1230 codecs.lookup("string-escape") 1231 except LookupError: 1232 print >> sys.stderr, ("Can't load the string-escape code which is part" 1233 " of the Python installation. Is your installation" 1234 " complete/correct? Aborting.") 1235 sys.exit(constants.EXIT_FAILURE)
1236
1237 1238 -def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1239 """Callback function to verify a peer against the candidate cert map. 1240 1241 Note that we have a chicken-and-egg problem during cluster init and upgrade. 1242 This method checks whether the incoming connection comes from a master 1243 candidate by comparing it to the master certificate map in the cluster 1244 configuration. However, during cluster init and cluster upgrade there 1245 are various RPC calls done to the master node itself, before the candidate 1246 certificate list is established and the cluster configuration is written. 1247 In this case, we cannot check against the master candidate map. 1248 1249 This problem is solved by checking whether the candidate map is empty. An 1250 initialized 2.11 or higher cluster has at least one entry for the master 1251 node in the candidate map. If the map is empty, we know that we are still 1252 in the bootstrap/upgrade phase. In this case, we read the server certificate 1253 digest and compare it to the incoming request. 1254 1255 This means that after an upgrade of Ganeti, the system continues to operate 1256 like before, using server certificates only. After the client certificates 1257 are generated with ``gnt-cluster renew-crypto --new-node-certificates``, 1258 RPC communication is switched to using client certificates and the trick of 1259 using server certificates does not work anymore. 1260 1261 @type conn: C{OpenSSL.SSL.Connection} 1262 @param conn: the OpenSSL connection object 1263 @type cert: C{OpenSSL.X509} 1264 @param cert: the peer's SSL certificate 1265 @type errdepth: integer 1266 @param errdepth: number of the step in the certificate chain starting at 0 1267 for the actual client certificate. 1268 1269 """ 1270 # some parameters are unused, but this is the API 1271 # pylint: disable=W0613 1272 1273 # If we receive a certificate from the certificate chain that is higher 1274 # than the lowest element of the chain, we have to check it against the 1275 # server certificate. 1276 if errdepth > 0: 1277 server_digest = utils.GetCertificateDigest( 1278 cert_filename=pathutils.NODED_CERT_FILE) 1279 match = cert.digest("sha1") == server_digest 1280 if not match: 1281 logging.debug("Received certificate from the certificate chain, which" 1282 " does not match the server certficate. Digest of the" 1283 " received certificate: %s. Digest of the server" 1284 " certificate: %s.", cert.digest("sha1"), server_digest) 1285 return match 1286 elif errdepth == 0: 1287 sstore = ssconf.SimpleStore() 1288 try: 1289 candidate_certs = sstore.GetMasterCandidatesCertMap() 1290 except errors.ConfigurationError: 1291 logging.info("No candidate certificates found. Switching to " 1292 "bootstrap/update mode.") 1293 candidate_certs = None 1294 if not candidate_certs: 1295 candidate_certs = { 1296 constants.CRYPTO_BOOTSTRAP: utils.GetCertificateDigest( 1297 cert_filename=pathutils.NODED_CERT_FILE)} 1298 match = cert.digest("sha1") in candidate_certs.values() 1299 if not match: 1300 logging.debug("Received certificate which is not a certificate of a" 1301 " master candidate. Certificate digest: %s. List of master" 1302 " candidate certificate digests: %s.", cert.digest("sha1"), 1303 str(candidate_certs)) 1304 return match 1305 else: 1306 logging.error("Invalid errdepth value: %s.", errdepth) 1307 return False
1308 # pylint: enable=W0613
1309 1310 1311 -def PrepNoded(options, _):
1312 """Preparation node daemon function, executed with the PID file held. 1313 1314 """ 1315 if options.mlock: 1316 request_executor_class = MlockallRequestExecutor 1317 try: 1318 utils.Mlockall() 1319 except errors.NoCtypesError: 1320 logging.warning("Cannot set memory lock, ctypes module not found") 1321 request_executor_class = http.server.HttpServerRequestExecutor 1322 else: 1323 request_executor_class = http.server.HttpServerRequestExecutor 1324 1325 # Read SSL certificate 1326 if options.ssl: 1327 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1328 ssl_cert_path=options.ssl_cert) 1329 else: 1330 ssl_params = None 1331 1332 err = _PrepareQueueLock() 1333 if err is not None: 1334 # this might be some kind of file-system/permission error; while 1335 # this breaks the job queue functionality, we shouldn't prevent 1336 # startup of the whole node daemon because of this 1337 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1338 1339 handler = NodeRequestHandler() 1340 1341 mainloop = daemon.Mainloop() 1342 server = \ 1343 http.server.HttpServer(mainloop, options.bind_address, options.port, 1344 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1345 request_executor_class=request_executor_class, 1346 ssl_verify_callback=SSLVerifyPeer) 1347 server.Start() 1348 1349 return (mainloop, server)
1350
1351 1352 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1353 """Main node daemon function, executed with the PID file held. 1354 1355 """ 1356 (mainloop, server) = prep_data 1357 try: 1358 mainloop.Run() 1359 finally: 1360 server.Stop() 1361
1362 1363 -def Main():
1364 """Main function for the node daemon. 1365 1366 """ 1367 parser = OptionParser(description="Ganeti node daemon", 1368 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]" 1369 " [-i INTERFACE]"), 1370 version="%%prog (ganeti) %s" % 1371 constants.RELEASE_VERSION) 1372 parser.add_option("--no-mlock", dest="mlock", 1373 help="Do not mlock the node memory in ram", 1374 default=True, action="store_false") 1375 1376 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1377 default_ssl_cert=pathutils.NODED_CERT_FILE, 1378 default_ssl_key=pathutils.NODED_CERT_FILE, 1379 console_logging=True, 1380 warn_breach=True)
1381