Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Ganeti node daemon""" 
  32   
  33  # pylint: disable=C0103,W0142 
  34   
  35  # C0103: Functions in this module need to have a given name structure, 
  36  # and the name of the daemon doesn't match 
  37   
  38  # W0142: Used * or ** magic, since we do use it extensively in this 
  39  # module 
  40   
  41  import os 
  42  import sys 
  43  import logging 
  44  import signal 
  45  import codecs 
  46   
  47  from optparse import OptionParser 
  48   
  49  from ganeti import backend 
  50  from ganeti import constants 
  51  from ganeti import objects 
  52  from ganeti import errors 
  53  from ganeti import jstore 
  54  from ganeti import daemon 
  55  from ganeti import http 
  56  from ganeti import utils 
  57  from ganeti.storage import container 
  58  from ganeti import serializer 
  59  from ganeti import netutils 
  60  from ganeti import pathutils 
  61  from ganeti import ssconf 
  62   
  63  import ganeti.http.server # pylint: disable=W0611 
  64   
  65   
  66  queue_lock = None 
67 68 69 -def _extendReasonTrail(trail, source, reason=""):
70 """Extend the reason trail with noded information 71 72 The trail is extended by appending the name of the noded functionality 73 """ 74 assert trail is not None 75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source) 76 trail.append((trail_source, reason, utils.EpochNano()))
77
78 79 -def _PrepareQueueLock():
80 """Try to prepare the queue lock. 81 82 @return: None for success, otherwise an exception object 83 84 """ 85 global queue_lock # pylint: disable=W0603 86 87 if queue_lock is not None: 88 return None 89 90 # Prepare job queue 91 try: 92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 93 return None 94 except EnvironmentError, err: 95 return err
96
97 98 -def _RequireJobQueueLock(fn):
99 """Decorator for job queue manipulating functions. 100 101 """ 102 QUEUE_LOCK_TIMEOUT = 10 103 104 def wrapper(*args, **kwargs): 105 # Locking in exclusive, blocking mode because there could be several 106 # children running at the same time. Waiting up to 10 seconds. 107 if _PrepareQueueLock() is not None: 108 raise errors.JobQueueError("Job queue failed initialization," 109 " cannot update jobs") 110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 111 try: 112 return fn(*args, **kwargs) 113 finally: 114 queue_lock.Unlock()
115 116 return wrapper 117
118 119 -def _DecodeImportExportIO(ieio, ieioargs):
120 """Decodes import/export I/O information. 121 122 """ 123 if ieio == constants.IEIO_RAW_DISK: 124 assert len(ieioargs) == 1 125 return (objects.Disk.FromDict(ieioargs[0]), ) 126 127 if ieio == constants.IEIO_SCRIPT: 128 assert len(ieioargs) == 2 129 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 130 131 return ieioargs
132
133 134 -def _DefaultAlternative(value, default):
135 """Returns value or, if evaluating to False, a default value. 136 137 Returns the given value, unless it evaluates to False. In the latter case the 138 default value is returned. 139 140 @param value: Value to return if it doesn't evaluate to False 141 @param default: Default value 142 @return: Given value or the default 143 144 """ 145 if value: 146 return value 147 148 return default
149
150 151 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
152 """Subclass ensuring request handlers are locked in RAM. 153 154 """
155 - def __init__(self, *args, **kwargs):
156 utils.Mlockall() 157 158 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
159
160 161 -class NodeRequestHandler(http.server.HttpServerHandler):
162 """The server implementation. 163 164 This class holds all methods exposed over the RPC interface. 165 166 """ 167 # too many public methods, and unused args - all methods get params 168 # due to the API 169 # pylint: disable=R0904,W0613
170 - def __init__(self):
171 http.server.HttpServerHandler.__init__(self) 172 self.noded_pid = os.getpid()
173
174 - def HandleRequest(self, req):
175 """Handle a request. 176 177 """ 178 if req.request_method.upper() != http.HTTP_POST: 179 raise http.HttpBadRequest("Only the POST method is supported") 180 181 path = req.request_path 182 if path.startswith("/"): 183 path = path[1:] 184 185 method = getattr(self, "perspective_%s" % path, None) 186 if method is None: 187 raise http.HttpNotFound() 188 189 try: 190 result = (True, method(serializer.LoadJson(req.request_body))) 191 192 except backend.RPCFail, err: 193 # our custom failure exception; str(err) works fine if the 194 # exception was constructed with a single argument, and in 195 # this case, err.message == err.args[0] == str(err) 196 result = (False, str(err)) 197 except errors.QuitGanetiException, err: 198 # Tell parent to quit 199 logging.info("Shutting down the node daemon, arguments: %s", 200 str(err.args)) 201 os.kill(self.noded_pid, signal.SIGTERM) 202 # And return the error's arguments, which must be already in 203 # correct tuple format 204 result = err.args 205 except Exception, err: 206 logging.exception("Error in RPC call") 207 result = (False, "Error while executing backend function: %s" % str(err)) 208 209 return serializer.DumpJson(result)
210 211 # the new block devices -------------------------- 212 213 @staticmethod
214 - def perspective_blockdev_create(params):
215 """Create a block device. 216 217 """ 218 (bdev_s, size, owner, on_primary, info, excl_stor) = params 219 bdev = objects.Disk.FromDict(bdev_s) 220 if bdev is None: 221 raise ValueError("can't unserialize data!") 222 return backend.BlockdevCreate(bdev, size, owner, on_primary, info, 223 excl_stor)
224 225 @staticmethod
227 """Pause/resume sync of a block device. 228 229 """ 230 disks_s, pause = params 231 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 232 return backend.BlockdevPauseResumeSync(disks, pause)
233 234 @staticmethod
235 - def perspective_blockdev_wipe(params):
236 """Wipe a block device. 237 238 """ 239 bdev_s, offset, size = params 240 bdev = objects.Disk.FromDict(bdev_s) 241 return backend.BlockdevWipe(bdev, offset, size)
242 243 @staticmethod
244 - def perspective_blockdev_remove(params):
245 """Remove a block device. 246 247 """ 248 bdev_s = params[0] 249 bdev = objects.Disk.FromDict(bdev_s) 250 return backend.BlockdevRemove(bdev)
251 252 @staticmethod
253 - def perspective_blockdev_rename(params):
254 """Remove a block device. 255 256 """ 257 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 258 return backend.BlockdevRename(devlist)
259 260 @staticmethod
261 - def perspective_blockdev_assemble(params):
262 """Assemble a block device. 263 264 """ 265 bdev_s, idict, on_primary, idx = params 266 bdev = objects.Disk.FromDict(bdev_s) 267 instance = objects.Instance.FromDict(idict) 268 if bdev is None: 269 raise ValueError("can't unserialize data!") 270 return backend.BlockdevAssemble(bdev, instance, on_primary, idx)
271 272 @staticmethod
273 - def perspective_blockdev_shutdown(params):
274 """Shutdown a block device. 275 276 """ 277 bdev_s = params[0] 278 bdev = objects.Disk.FromDict(bdev_s) 279 if bdev is None: 280 raise ValueError("can't unserialize data!") 281 return backend.BlockdevShutdown(bdev)
282 283 @staticmethod
285 """Add a child to a mirror device. 286 287 Note: this is only valid for mirror devices. It's the caller's duty 288 to send a correct disk, otherwise we raise an error. 289 290 """ 291 bdev_s, ndev_s = params 292 bdev = objects.Disk.FromDict(bdev_s) 293 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 294 if bdev is None or ndevs.count(None) > 0: 295 raise ValueError("can't unserialize data!") 296 return backend.BlockdevAddchildren(bdev, ndevs)
297 298 @staticmethod
300 """Remove a child from a mirror device. 301 302 This is only valid for mirror devices, of course. It's the callers 303 duty to send a correct disk, otherwise we raise an error. 304 305 """ 306 bdev_s, ndev_s = params 307 bdev = objects.Disk.FromDict(bdev_s) 308 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 309 if bdev is None or ndevs.count(None) > 0: 310 raise ValueError("can't unserialize data!") 311 return backend.BlockdevRemovechildren(bdev, ndevs)
312 313 @staticmethod
315 """Return the mirror status for a list of disks. 316 317 """ 318 disks = [objects.Disk.FromDict(dsk_s) 319 for dsk_s in params[0]] 320 return [status.ToDict() 321 for status in backend.BlockdevGetmirrorstatus(disks)]
322 323 @staticmethod
325 """Return the mirror status for a list of disks. 326 327 """ 328 (node_disks, ) = params 329 330 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 331 332 result = [] 333 334 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 335 if success: 336 result.append((success, status.ToDict())) 337 else: 338 result.append((success, status)) 339 340 return result
341 342 @staticmethod
343 - def perspective_blockdev_find(params):
344 """Expose the FindBlockDevice functionality for a disk. 345 346 This will try to find but not activate a disk. 347 348 """ 349 disk = objects.Disk.FromDict(params[0]) 350 351 result = backend.BlockdevFind(disk) 352 if result is None: 353 return None 354 355 return result.ToDict()
356 357 @staticmethod
358 - def perspective_blockdev_snapshot(params):
359 """Create a snapshot device. 360 361 Note that this is only valid for LVM disks, if we get passed 362 something else we raise an exception. The snapshot device can be 363 remove by calling the generic block device remove call. 364 365 """ 366 cfbd = objects.Disk.FromDict(params[0]) 367 return backend.BlockdevSnapshot(cfbd)
368 369 @staticmethod
370 - def perspective_blockdev_grow(params):
371 """Grow a stack of devices. 372 373 """ 374 if len(params) < 5: 375 raise ValueError("Received only %s parameters in blockdev_grow," 376 " old master?" % len(params)) 377 cfbd = objects.Disk.FromDict(params[0]) 378 amount = params[1] 379 dryrun = params[2] 380 backingstore = params[3] 381 excl_stor = params[4] 382 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
383 384 @staticmethod
385 - def perspective_blockdev_close(params):
386 """Closes the given block devices. 387 388 """ 389 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 390 return backend.BlockdevClose(params[0], disks)
391 392 @staticmethod
394 """Compute the sizes of the given block devices. 395 396 """ 397 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 398 return backend.BlockdevGetdimensions(disks)
399 400 @staticmethod
401 - def perspective_blockdev_export(params):
402 """Compute the sizes of the given block devices. 403 404 """ 405 disk = objects.Disk.FromDict(params[0]) 406 dest_node_ip, dest_path, cluster_name = params[1:] 407 return backend.BlockdevExport(disk, dest_node_ip, dest_path, cluster_name)
408 409 @staticmethod
410 - def perspective_blockdev_setinfo(params):
411 """Sets metadata information on the given block device. 412 413 """ 414 (disk, info) = params 415 disk = objects.Disk.FromDict(disk) 416 return backend.BlockdevSetInfo(disk, info)
417 418 # blockdev/drbd specific methods ---------- 419 420 @staticmethod
422 """Disconnects the network connection of drbd disks. 423 424 Note that this is only valid for drbd disks, so the members of the 425 disk list must all be drbd devices. 426 427 """ 428 (disks,) = params 429 disks = [objects.Disk.FromDict(disk) for disk in disks] 430 return backend.DrbdDisconnectNet(disks)
431 432 @staticmethod
433 - def perspective_drbd_attach_net(params):
434 """Attaches the network connection of drbd disks. 435 436 Note that this is only valid for drbd disks, so the members of the 437 disk list must all be drbd devices. 438 439 """ 440 disks, instance_name, multimaster = params 441 disks = [objects.Disk.FromDict(disk) for disk in disks] 442 return backend.DrbdAttachNet(disks, instance_name, multimaster)
443 444 @staticmethod
445 - def perspective_drbd_wait_sync(params):
446 """Wait until DRBD disks are synched. 447 448 Note that this is only valid for drbd disks, so the members of the 449 disk list must all be drbd devices. 450 451 """ 452 (disks,) = params 453 disks = [objects.Disk.FromDict(disk) for disk in disks] 454 return backend.DrbdWaitSync(disks)
455 456 @staticmethod
458 """Checks if the drbd devices need activation 459 460 Note that this is only valid for drbd disks, so the members of the 461 disk list must all be drbd devices. 462 463 """ 464 (disks,) = params 465 disks = [objects.Disk.FromDict(disk) for disk in disks] 466 return backend.DrbdNeedsActivation(disks)
467 468 @staticmethod
470 """Query drbd helper. 471 472 """ 473 return backend.GetDrbdUsermodeHelper()
474 475 # export/import -------------------------- 476 477 @staticmethod
478 - def perspective_finalize_export(params):
479 """Expose the finalize export functionality. 480 481 """ 482 instance = objects.Instance.FromDict(params[0]) 483 484 snap_disks = [] 485 for disk in params[1]: 486 if isinstance(disk, bool): 487 snap_disks.append(disk) 488 else: 489 snap_disks.append(objects.Disk.FromDict(disk)) 490 491 return backend.FinalizeExport(instance, snap_disks)
492 493 @staticmethod
494 - def perspective_export_info(params):
495 """Query information about an existing export on this node. 496 497 The given path may not contain an export, in which case we return 498 None. 499 500 """ 501 path = params[0] 502 return backend.ExportInfo(path)
503 504 @staticmethod
505 - def perspective_export_list(params):
506 """List the available exports on this node. 507 508 Note that as opposed to export_info, which may query data about an 509 export in any path, this only queries the standard Ganeti path 510 (pathutils.EXPORT_DIR). 511 512 """ 513 return backend.ListExports()
514 515 @staticmethod
516 - def perspective_export_remove(params):
517 """Remove an export. 518 519 """ 520 export = params[0] 521 return backend.RemoveExport(export)
522 523 # block device --------------------- 524 @staticmethod
525 - def perspective_bdev_sizes(params):
526 """Query the list of block devices 527 528 """ 529 devices = params[0] 530 return backend.GetBlockDevSizes(devices)
531 532 # volume -------------------------- 533 534 @staticmethod
535 - def perspective_lv_list(params):
536 """Query the list of logical volumes in a given volume group. 537 538 """ 539 vgname = params[0] 540 return backend.GetVolumeList(vgname)
541 542 @staticmethod
543 - def perspective_vg_list(params):
544 """Query the list of volume groups. 545 546 """ 547 return backend.ListVolumeGroups()
548 549 # Storage -------------------------- 550 551 @staticmethod
552 - def perspective_storage_list(params):
553 """Get list of storage units. 554 555 """ 556 (su_name, su_args, name, fields) = params 557 return container.GetStorage(su_name, *su_args).List(name, fields)
558 559 @staticmethod
560 - def perspective_storage_modify(params):
561 """Modify a storage unit. 562 563 """ 564 (su_name, su_args, name, changes) = params 565 return container.GetStorage(su_name, *su_args).Modify(name, changes)
566 567 @staticmethod
568 - def perspective_storage_execute(params):
569 """Execute an operation on a storage unit. 570 571 """ 572 (su_name, su_args, name, op) = params 573 return container.GetStorage(su_name, *su_args).Execute(name, op)
574 575 # bridge -------------------------- 576 577 @staticmethod
578 - def perspective_bridges_exist(params):
579 """Check if all bridges given exist on this node. 580 581 """ 582 bridges_list = params[0] 583 return backend.BridgesExist(bridges_list)
584 585 # instance -------------------------- 586 587 @staticmethod
588 - def perspective_instance_os_add(params):
589 """Install an OS on a given instance. 590 591 """ 592 inst_s = params[0] 593 inst = objects.Instance.FromDict(inst_s) 594 reinstall = params[1] 595 debug = params[2] 596 return backend.InstanceOsAdd(inst, reinstall, debug)
597 598 @staticmethod
600 """Runs the OS rename script for an instance. 601 602 """ 603 inst_s, old_name, debug = params 604 inst = objects.Instance.FromDict(inst_s) 605 return backend.RunRenameInstance(inst, old_name, debug)
606 607 @staticmethod
608 - def perspective_instance_shutdown(params):
609 """Shutdown an instance. 610 611 """ 612 instance = objects.Instance.FromDict(params[0]) 613 timeout = params[1] 614 trail = params[2] 615 _extendReasonTrail(trail, "shutdown") 616 return backend.InstanceShutdown(instance, timeout, trail)
617 618 @staticmethod
619 - def perspective_instance_start(params):
620 """Start an instance. 621 622 """ 623 (instance_name, startup_paused, trail) = params 624 instance = objects.Instance.FromDict(instance_name) 625 _extendReasonTrail(trail, "start") 626 return backend.StartInstance(instance, startup_paused, trail)
627 628 @staticmethod
629 - def perspective_hotplug_device(params):
630 """Hotplugs device to a running instance. 631 632 """ 633 (idict, action, dev_type, ddict, extra, seq) = params 634 instance = objects.Instance.FromDict(idict) 635 if dev_type == constants.HOTPLUG_TARGET_DISK: 636 device = objects.Disk.FromDict(ddict) 637 elif dev_type == constants.HOTPLUG_TARGET_NIC: 638 device = objects.NIC.FromDict(ddict) 639 else: 640 assert dev_type in constants.HOTPLUG_ALL_TARGETS 641 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
642 643 @staticmethod
644 - def perspective_hotplug_supported(params):
645 """Checks if hotplug is supported. 646 647 """ 648 instance = objects.Instance.FromDict(params[0]) 649 return backend.HotplugSupported(instance)
650 651 @staticmethod
652 - def perspective_migration_info(params):
653 """Gather information about an instance to be migrated. 654 655 """ 656 instance = objects.Instance.FromDict(params[0]) 657 return backend.MigrationInfo(instance)
658 659 @staticmethod
660 - def perspective_accept_instance(params):
661 """Prepare the node to accept an instance. 662 663 """ 664 instance, info, target = params 665 instance = objects.Instance.FromDict(instance) 666 return backend.AcceptInstance(instance, info, target)
667 668 @staticmethod
670 """Finalize the instance migration on the destination node. 671 672 """ 673 instance, info, success = params 674 instance = objects.Instance.FromDict(instance) 675 return backend.FinalizeMigrationDst(instance, info, success)
676 677 @staticmethod
678 - def perspective_instance_migrate(params):
679 """Migrates an instance. 680 681 """ 682 cluster_name, instance, target, live = params 683 instance = objects.Instance.FromDict(instance) 684 return backend.MigrateInstance(cluster_name, instance, target, live)
685 686 @staticmethod
688 """Finalize the instance migration on the source node. 689 690 """ 691 instance, success, live = params 692 instance = objects.Instance.FromDict(instance) 693 return backend.FinalizeMigrationSource(instance, success, live)
694 695 @staticmethod
697 """Reports migration status. 698 699 """ 700 instance = objects.Instance.FromDict(params[0]) 701 return backend.GetMigrationStatus(instance).ToDict()
702 703 @staticmethod
704 - def perspective_instance_reboot(params):
705 """Reboot an instance. 706 707 """ 708 instance = objects.Instance.FromDict(params[0]) 709 reboot_type = params[1] 710 shutdown_timeout = params[2] 711 trail = params[3] 712 _extendReasonTrail(trail, "reboot") 713 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout, 714 trail)
715 716 @staticmethod
718 """Modify instance runtime memory. 719 720 """ 721 instance_dict, memory = params 722 instance = objects.Instance.FromDict(instance_dict) 723 return backend.InstanceBalloonMemory(instance, memory)
724 725 @staticmethod
726 - def perspective_instance_info(params):
727 """Query instance information. 728 729 """ 730 (instance_name, hypervisor_name, hvparams) = params 731 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
732 733 @staticmethod
735 """Query whether the specified instance can be migrated. 736 737 """ 738 instance = objects.Instance.FromDict(params[0]) 739 return backend.GetInstanceMigratable(instance)
740 741 @staticmethod
743 """Query information about all instances. 744 745 """ 746 (hypervisor_list, all_hvparams) = params 747 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
748 749 @staticmethod
750 - def perspective_instance_list(params):
751 """Query the list of running instances. 752 753 """ 754 (hypervisor_list, hvparams) = params 755 return backend.GetInstanceList(hypervisor_list, hvparams)
756 757 # node -------------------------- 758 759 @staticmethod
761 """Checks if a node has the given ip address. 762 763 """ 764 return netutils.IPAddress.Own(params[0])
765 766 @staticmethod
767 - def perspective_node_info(params):
768 """Query node information. 769 770 """ 771 (storage_units, hv_specs) = params 772 return backend.GetNodeInfo(storage_units, hv_specs)
773 774 @staticmethod
775 - def perspective_etc_hosts_modify(params):
776 """Modify a node entry in /etc/hosts. 777 778 """ 779 backend.EtcHostsModify(params[0], params[1], params[2]) 780 781 return True
782 783 @staticmethod
784 - def perspective_node_verify(params):
785 """Run a verify sequence on this node. 786 787 """ 788 (what, cluster_name, hvparams) = params 789 return backend.VerifyNode(what, cluster_name, hvparams)
790 791 @classmethod
792 - def perspective_node_verify_light(cls, params):
793 """Run a light verify sequence on this node. 794 795 """ 796 # So far it's the same as the normal node_verify 797 return cls.perspective_node_verify(params)
798 799 @staticmethod
801 """Start the master daemons on this node. 802 803 """ 804 return backend.StartMasterDaemons(params[0])
805 806 @staticmethod
808 """Activate the master IP on this node. 809 810 """ 811 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 812 return backend.ActivateMasterIp(master_params, params[1])
813 814 @staticmethod
816 """Deactivate the master IP on this node. 817 818 """ 819 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 820 return backend.DeactivateMasterIp(master_params, params[1])
821 822 @staticmethod
823 - def perspective_node_stop_master(params):
824 """Stops master daemons on this node. 825 826 """ 827 return backend.StopMasterDaemons()
828 829 @staticmethod
831 """Change the master IP netmask. 832 833 """ 834 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 835 params[3])
836 837 @staticmethod
839 """Cleanup after leaving a cluster. 840 841 """ 842 return backend.LeaveCluster(params[0])
843 844 @staticmethod
845 - def perspective_node_volumes(params):
846 """Query the list of all logical volume groups. 847 848 """ 849 return backend.NodeVolumes()
850 851 @staticmethod
853 """Demote a node from the master candidate role. 854 855 """ 856 return backend.DemoteFromMC()
857 858 @staticmethod
859 - def perspective_node_powercycle(params):
860 """Tries to powercycle the node. 861 862 """ 863 (hypervisor_type, hvparams) = params 864 return backend.PowercycleNode(hypervisor_type, hvparams)
865 866 @staticmethod
868 """Sets up OpenvSwitch on the node. 869 870 """ 871 (ovs_name, ovs_link) = params 872 return backend.ConfigureOVS(ovs_name, ovs_link)
873 874 # cluster -------------------------- 875 876 @staticmethod
877 - def perspective_version(params):
878 """Query version information. 879 880 """ 881 return constants.PROTOCOL_VERSION
882 883 @staticmethod
884 - def perspective_upload_file(params):
885 """Upload a file. 886 887 Note that the backend implementation imposes strict rules on which 888 files are accepted. 889 890 """ 891 return backend.UploadFile(*(params[0]))
892 893 @staticmethod
894 - def perspective_master_info(params):
895 """Query master information. 896 897 """ 898 return backend.GetMasterInfo()
899 900 @staticmethod
901 - def perspective_run_oob(params):
902 """Runs oob on node. 903 904 """ 905 output = backend.RunOob(params[0], params[1], params[2], params[3]) 906 if output: 907 result = serializer.LoadJson(output) 908 else: 909 result = None 910 return result
911 912 @staticmethod
914 """Runs a restricted command. 915 916 """ 917 (cmd, ) = params 918 919 return backend.RunRestrictedCmd(cmd)
920 921 @staticmethod
923 """Write ssconf files. 924 925 """ 926 (values,) = params 927 return ssconf.WriteSsconfFiles(values)
928 929 @staticmethod
930 - def perspective_get_watcher_pause(params):
931 """Get watcher pause end. 932 933 """ 934 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
935 936 @staticmethod
937 - def perspective_set_watcher_pause(params):
938 """Set watcher pause. 939 940 """ 941 (until, ) = params 942 return backend.SetWatcherPause(until)
943 944 # os ----------------------- 945 946 @staticmethod
947 - def perspective_os_diagnose(params):
948 """Query detailed information about existing OSes. 949 950 """ 951 return backend.DiagnoseOS()
952 953 @staticmethod
954 - def perspective_os_get(params):
955 """Query information about a given OS. 956 957 """ 958 name = params[0] 959 os_obj = backend.OSFromDisk(name) 960 return os_obj.ToDict()
961 962 @staticmethod
963 - def perspective_os_validate(params):
964 """Run a given OS' validation routine. 965 966 """ 967 required, name, checks, params = params 968 return backend.ValidateOS(required, name, checks, params)
969 970 # extstorage ----------------------- 971 972 @staticmethod
974 """Query detailed information about existing extstorage providers. 975 976 """ 977 return backend.DiagnoseExtStorage()
978 979 # hooks ----------------------- 980 981 @staticmethod
982 - def perspective_hooks_runner(params):
983 """Run hook scripts. 984 985 """ 986 hpath, phase, env = params 987 hr = backend.HooksRunner() 988 return hr.RunHooks(hpath, phase, env)
989 990 # iallocator ----------------- 991 992 @staticmethod
993 - def perspective_iallocator_runner(params):
994 """Run an iallocator script. 995 996 """ 997 name, idata = params 998 iar = backend.IAllocatorRunner() 999 return iar.Run(name, idata)
1000 1001 # test ----------------------- 1002 1003 @staticmethod
1004 - def perspective_test_delay(params):
1005 """Run test delay. 1006 1007 """ 1008 duration = params[0] 1009 status, rval = utils.TestDelay(duration) 1010 if not status: 1011 raise backend.RPCFail(rval) 1012 return rval
1013 1014 # file storage --------------- 1015 1016 @staticmethod
1018 """Create the file storage directory. 1019 1020 """ 1021 file_storage_dir = params[0] 1022 return backend.CreateFileStorageDir(file_storage_dir)
1023 1024 @staticmethod
1026 """Remove the file storage directory. 1027 1028 """ 1029 file_storage_dir = params[0] 1030 return backend.RemoveFileStorageDir(file_storage_dir)
1031 1032 @staticmethod
1034 """Rename the file storage directory. 1035 1036 """ 1037 old_file_storage_dir = params[0] 1038 new_file_storage_dir = params[1] 1039 return backend.RenameFileStorageDir(old_file_storage_dir, 1040 new_file_storage_dir)
1041 1042 # jobs ------------------------ 1043 1044 @staticmethod 1045 @_RequireJobQueueLock
1046 - def perspective_jobqueue_update(params):
1047 """Update job queue. 1048 1049 """ 1050 (file_name, content) = params 1051 return backend.JobQueueUpdate(file_name, content)
1052 1053 @staticmethod 1054 @_RequireJobQueueLock
1055 - def perspective_jobqueue_purge(params):
1056 """Purge job queue. 1057 1058 """ 1059 return backend.JobQueuePurge()
1060 1061 @staticmethod 1062 @_RequireJobQueueLock
1063 - def perspective_jobqueue_rename(params):
1064 """Rename a job queue file. 1065 1066 """ 1067 # TODO: What if a file fails to rename? 1068 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1069 1070 @staticmethod 1071 @_RequireJobQueueLock
1073 """Set job queue's drain flag. 1074 1075 """ 1076 (flag, ) = params 1077 1078 return jstore.SetDrainFlag(flag)
1079 1080 # hypervisor --------------- 1081 1082 @staticmethod
1084 """Validate the hypervisor parameters. 1085 1086 """ 1087 (hvname, hvparams) = params 1088 return backend.ValidateHVParams(hvname, hvparams)
1089 1090 # Crypto 1091 1092 @staticmethod
1093 - def perspective_x509_cert_create(params):
1094 """Creates a new X509 certificate for SSL/TLS. 1095 1096 """ 1097 (validity, ) = params 1098 return backend.CreateX509Certificate(validity)
1099 1100 @staticmethod
1101 - def perspective_x509_cert_remove(params):
1102 """Removes a X509 certificate. 1103 1104 """ 1105 (name, ) = params 1106 return backend.RemoveX509Certificate(name)
1107 1108 # Import and export 1109 1110 @staticmethod
1111 - def perspective_import_start(params):
1112 """Starts an import daemon. 1113 1114 """ 1115 (opts_s, instance, component, (dest, dest_args)) = params 1116 1117 opts = objects.ImportExportOptions.FromDict(opts_s) 1118 1119 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 1120 None, None, 1121 objects.Instance.FromDict(instance), 1122 component, dest, 1123 _DecodeImportExportIO(dest, 1124 dest_args))
1125 1126 @staticmethod
1127 - def perspective_export_start(params):
1128 """Starts an export daemon. 1129 1130 """ 1131 (opts_s, host, port, instance, component, (source, source_args)) = params 1132 1133 opts = objects.ImportExportOptions.FromDict(opts_s) 1134 1135 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 1136 host, port, 1137 objects.Instance.FromDict(instance), 1138 component, source, 1139 _DecodeImportExportIO(source, 1140 source_args))
1141 1142 @staticmethod
1143 - def perspective_impexp_status(params):
1144 """Retrieves the status of an import or export daemon. 1145 1146 """ 1147 return backend.GetImportExportStatus(params[0])
1148 1149 @staticmethod
1150 - def perspective_impexp_abort(params):
1151 """Aborts an import or export. 1152 1153 """ 1154 return backend.AbortImportExport(params[0])
1155 1156 @staticmethod
1157 - def perspective_impexp_cleanup(params):
1158 """Cleans up after an import or export. 1159 1160 """ 1161 return backend.CleanupImportExport(params[0])
1162
1163 1164 -def CheckNoded(_, args):
1165 """Initial checks whether to run or exit with a failure. 1166 1167 """ 1168 if args: # noded doesn't take any arguments 1169 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1170 sys.argv[0]) 1171 sys.exit(constants.EXIT_FAILURE) 1172 try: 1173 codecs.lookup("string-escape") 1174 except LookupError: 1175 print >> sys.stderr, ("Can't load the string-escape code which is part" 1176 " of the Python installation. Is your installation" 1177 " complete/correct? Aborting.") 1178 sys.exit(constants.EXIT_FAILURE)
1179
1180 1181 -def PrepNoded(options, _):
1182 """Preparation node daemon function, executed with the PID file held. 1183 1184 """ 1185 if options.mlock: 1186 request_executor_class = MlockallRequestExecutor 1187 try: 1188 utils.Mlockall() 1189 except errors.NoCtypesError: 1190 logging.warning("Cannot set memory lock, ctypes module not found") 1191 request_executor_class = http.server.HttpServerRequestExecutor 1192 else: 1193 request_executor_class = http.server.HttpServerRequestExecutor 1194 1195 # Read SSL certificate 1196 if options.ssl: 1197 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1198 ssl_cert_path=options.ssl_cert) 1199 else: 1200 ssl_params = None 1201 1202 err = _PrepareQueueLock() 1203 if err is not None: 1204 # this might be some kind of file-system/permission error; while 1205 # this breaks the job queue functionality, we shouldn't prevent 1206 # startup of the whole node daemon because of this 1207 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1208 1209 handler = NodeRequestHandler() 1210 1211 mainloop = daemon.Mainloop() 1212 server = \ 1213 http.server.HttpServer(mainloop, options.bind_address, options.port, 1214 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1215 request_executor_class=request_executor_class) 1216 server.Start() 1217 1218 return (mainloop, server)
1219
1220 1221 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1222 """Main node daemon function, executed with the PID file held. 1223 1224 """ 1225 (mainloop, server) = prep_data 1226 try: 1227 mainloop.Run() 1228 finally: 1229 server.Stop() 1230
1231 1232 -def Main():
1233 """Main function for the node daemon. 1234 1235 """ 1236 parser = OptionParser(description="Ganeti node daemon", 1237 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]" 1238 " [-i INTERFACE]"), 1239 version="%%prog (ganeti) %s" % 1240 constants.RELEASE_VERSION) 1241 parser.add_option("--no-mlock", dest="mlock", 1242 help="Do not mlock the node memory in ram", 1243 default=True, action="store_false") 1244 1245 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1246 default_ssl_cert=pathutils.NODED_CERT_FILE, 1247 default_ssl_key=pathutils.NODED_CERT_FILE, 1248 console_logging=True)
1249