Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Ganeti node daemon""" 
  23   
  24  # pylint: disable=C0103,W0142 
  25   
  26  # C0103: Functions in this module need to have a given name structure, 
  27  # and the name of the daemon doesn't match 
  28   
  29  # W0142: Used * or ** magic, since we do use it extensively in this 
  30  # module 
  31   
  32  import os 
  33  import sys 
  34  import logging 
  35  import signal 
  36  import codecs 
  37   
  38  from optparse import OptionParser 
  39   
  40  from ganeti import backend 
  41  from ganeti import constants 
  42  from ganeti import objects 
  43  from ganeti import errors 
  44  from ganeti import jstore 
  45  from ganeti import daemon 
  46  from ganeti import http 
  47  from ganeti import utils 
  48  from ganeti import storage 
  49  from ganeti import serializer 
  50  from ganeti import netutils 
  51  from ganeti import pathutils 
  52  from ganeti import ssconf 
  53   
  54  import ganeti.http.server # pylint: disable=W0611 
  55   
  56   
  57  queue_lock = None 
58 59 60 -def _PrepareQueueLock():
61 """Try to prepare the queue lock. 62 63 @return: None for success, otherwise an exception object 64 65 """ 66 global queue_lock # pylint: disable=W0603 67 68 if queue_lock is not None: 69 return None 70 71 # Prepare job queue 72 try: 73 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 74 return None 75 except EnvironmentError, err: 76 return err
77
78 79 -def _RequireJobQueueLock(fn):
80 """Decorator for job queue manipulating functions. 81 82 """ 83 QUEUE_LOCK_TIMEOUT = 10 84 85 def wrapper(*args, **kwargs): 86 # Locking in exclusive, blocking mode because there could be several 87 # children running at the same time. Waiting up to 10 seconds. 88 if _PrepareQueueLock() is not None: 89 raise errors.JobQueueError("Job queue failed initialization," 90 " cannot update jobs") 91 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 92 try: 93 return fn(*args, **kwargs) 94 finally: 95 queue_lock.Unlock()
96 97 return wrapper 98
99 100 -def _DecodeImportExportIO(ieio, ieioargs):
101 """Decodes import/export I/O information. 102 103 """ 104 if ieio == constants.IEIO_RAW_DISK: 105 assert len(ieioargs) == 1 106 return (objects.Disk.FromDict(ieioargs[0]), ) 107 108 if ieio == constants.IEIO_SCRIPT: 109 assert len(ieioargs) == 2 110 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 111 112 return ieioargs
113
114 115 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
116 """Subclass ensuring request handlers are locked in RAM. 117 118 """
119 - def __init__(self, *args, **kwargs):
120 utils.Mlockall() 121 122 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
123
124 125 -class NodeRequestHandler(http.server.HttpServerHandler):
126 """The server implementation. 127 128 This class holds all methods exposed over the RPC interface. 129 130 """ 131 # too many public methods, and unused args - all methods get params 132 # due to the API 133 # pylint: disable=R0904,W0613
134 - def __init__(self):
135 http.server.HttpServerHandler.__init__(self) 136 self.noded_pid = os.getpid()
137
138 - def HandleRequest(self, req):
139 """Handle a request. 140 141 """ 142 if req.request_method.upper() != http.HTTP_POST: 143 raise http.HttpBadRequest("Only the POST method is supported") 144 145 path = req.request_path 146 if path.startswith("/"): 147 path = path[1:] 148 149 method = getattr(self, "perspective_%s" % path, None) 150 if method is None: 151 raise http.HttpNotFound() 152 153 try: 154 result = (True, method(serializer.LoadJson(req.request_body))) 155 156 except backend.RPCFail, err: 157 # our custom failure exception; str(err) works fine if the 158 # exception was constructed with a single argument, and in 159 # this case, err.message == err.args[0] == str(err) 160 result = (False, str(err)) 161 except errors.QuitGanetiException, err: 162 # Tell parent to quit 163 logging.info("Shutting down the node daemon, arguments: %s", 164 str(err.args)) 165 os.kill(self.noded_pid, signal.SIGTERM) 166 # And return the error's arguments, which must be already in 167 # correct tuple format 168 result = err.args 169 except Exception, err: 170 logging.exception("Error in RPC call") 171 result = (False, "Error while executing backend function: %s" % str(err)) 172 173 return serializer.DumpJson(result)
174 175 # the new block devices -------------------------- 176 177 @staticmethod
178 - def perspective_blockdev_create(params):
179 """Create a block device. 180 181 """ 182 (bdev_s, size, owner, on_primary, info, excl_stor) = params 183 bdev = objects.Disk.FromDict(bdev_s) 184 if bdev is None: 185 raise ValueError("can't unserialize data!") 186 return backend.BlockdevCreate(bdev, size, owner, on_primary, info, 187 excl_stor)
188 189 @staticmethod
191 """Pause/resume sync of a block device. 192 193 """ 194 disks_s, pause = params 195 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 196 return backend.BlockdevPauseResumeSync(disks, pause)
197 198 @staticmethod
199 - def perspective_blockdev_wipe(params):
200 """Wipe a block device. 201 202 """ 203 bdev_s, offset, size = params 204 bdev = objects.Disk.FromDict(bdev_s) 205 return backend.BlockdevWipe(bdev, offset, size)
206 207 @staticmethod
208 - def perspective_blockdev_remove(params):
209 """Remove a block device. 210 211 """ 212 bdev_s = params[0] 213 bdev = objects.Disk.FromDict(bdev_s) 214 return backend.BlockdevRemove(bdev)
215 216 @staticmethod
217 - def perspective_blockdev_rename(params):
218 """Remove a block device. 219 220 """ 221 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 222 return backend.BlockdevRename(devlist)
223 224 @staticmethod
225 - def perspective_blockdev_assemble(params):
226 """Assemble a block device. 227 228 """ 229 bdev_s, owner, on_primary, idx = params 230 bdev = objects.Disk.FromDict(bdev_s) 231 if bdev is None: 232 raise ValueError("can't unserialize data!") 233 return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
234 235 @staticmethod
236 - def perspective_blockdev_shutdown(params):
237 """Shutdown a block device. 238 239 """ 240 bdev_s = params[0] 241 bdev = objects.Disk.FromDict(bdev_s) 242 if bdev is None: 243 raise ValueError("can't unserialize data!") 244 return backend.BlockdevShutdown(bdev)
245 246 @staticmethod
248 """Add a child to a mirror device. 249 250 Note: this is only valid for mirror devices. It's the caller's duty 251 to send a correct disk, otherwise we raise an error. 252 253 """ 254 bdev_s, ndev_s = params 255 bdev = objects.Disk.FromDict(bdev_s) 256 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 257 if bdev is None or ndevs.count(None) > 0: 258 raise ValueError("can't unserialize data!") 259 return backend.BlockdevAddchildren(bdev, ndevs)
260 261 @staticmethod
263 """Remove a child from a mirror device. 264 265 This is only valid for mirror devices, of course. It's the callers 266 duty to send a correct disk, otherwise we raise an error. 267 268 """ 269 bdev_s, ndev_s = params 270 bdev = objects.Disk.FromDict(bdev_s) 271 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 272 if bdev is None or ndevs.count(None) > 0: 273 raise ValueError("can't unserialize data!") 274 return backend.BlockdevRemovechildren(bdev, ndevs)
275 276 @staticmethod
278 """Return the mirror status for a list of disks. 279 280 """ 281 disks = [objects.Disk.FromDict(dsk_s) 282 for dsk_s in params[0]] 283 return [status.ToDict() 284 for status in backend.BlockdevGetmirrorstatus(disks)]
285 286 @staticmethod
288 """Return the mirror status for a list of disks. 289 290 """ 291 (node_disks, ) = params 292 293 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 294 295 result = [] 296 297 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 298 if success: 299 result.append((success, status.ToDict())) 300 else: 301 result.append((success, status)) 302 303 return result
304 305 @staticmethod
306 - def perspective_blockdev_find(params):
307 """Expose the FindBlockDevice functionality for a disk. 308 309 This will try to find but not activate a disk. 310 311 """ 312 disk = objects.Disk.FromDict(params[0]) 313 314 result = backend.BlockdevFind(disk) 315 if result is None: 316 return None 317 318 return result.ToDict()
319 320 @staticmethod
321 - def perspective_blockdev_snapshot(params):
322 """Create a snapshot device. 323 324 Note that this is only valid for LVM disks, if we get passed 325 something else we raise an exception. The snapshot device can be 326 remove by calling the generic block device remove call. 327 328 """ 329 cfbd = objects.Disk.FromDict(params[0]) 330 return backend.BlockdevSnapshot(cfbd)
331 332 @staticmethod
333 - def perspective_blockdev_grow(params):
334 """Grow a stack of devices. 335 336 """ 337 if len(params) < 4: 338 raise ValueError("Received only 3 parameters in blockdev_grow," 339 " old master?") 340 cfbd = objects.Disk.FromDict(params[0]) 341 amount = params[1] 342 dryrun = params[2] 343 backingstore = params[3] 344 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
345 346 @staticmethod
347 - def perspective_blockdev_close(params):
348 """Closes the given block devices. 349 350 """ 351 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 352 return backend.BlockdevClose(params[0], disks)
353 354 @staticmethod
355 - def perspective_blockdev_getsize(params):
356 """Compute the sizes of the given block devices. 357 358 """ 359 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 360 return backend.BlockdevGetsize(disks)
361 362 @staticmethod
363 - def perspective_blockdev_export(params):
364 """Compute the sizes of the given block devices. 365 366 """ 367 disk = objects.Disk.FromDict(params[0]) 368 dest_node, dest_path, cluster_name = params[1:] 369 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
370 371 @staticmethod
372 - def perspective_blockdev_setinfo(params):
373 """Sets metadata information on the given block device. 374 375 """ 376 (disk, info) = params 377 disk = objects.Disk.FromDict(disk) 378 return backend.BlockdevSetInfo(disk, info)
379 380 # blockdev/drbd specific methods ---------- 381 382 @staticmethod
384 """Disconnects the network connection of drbd disks. 385 386 Note that this is only valid for drbd disks, so the members of the 387 disk list must all be drbd devices. 388 389 """ 390 nodes_ip, disks = params 391 disks = [objects.Disk.FromDict(cf) for cf in disks] 392 return backend.DrbdDisconnectNet(nodes_ip, disks)
393 394 @staticmethod
395 - def perspective_drbd_attach_net(params):
396 """Attaches the network connection of drbd disks. 397 398 Note that this is only valid for drbd disks, so the members of the 399 disk list must all be drbd devices. 400 401 """ 402 nodes_ip, disks, instance_name, multimaster = params 403 disks = [objects.Disk.FromDict(cf) for cf in disks] 404 return backend.DrbdAttachNet(nodes_ip, disks, 405 instance_name, multimaster)
406 407 @staticmethod
408 - def perspective_drbd_wait_sync(params):
409 """Wait until DRBD disks are synched. 410 411 Note that this is only valid for drbd disks, so the members of the 412 disk list must all be drbd devices. 413 414 """ 415 nodes_ip, disks = params 416 disks = [objects.Disk.FromDict(cf) for cf in disks] 417 return backend.DrbdWaitSync(nodes_ip, disks)
418 419 @staticmethod
420 - def perspective_drbd_helper(params):
421 """Query drbd helper. 422 423 """ 424 return backend.GetDrbdUsermodeHelper()
425 426 # export/import -------------------------- 427 428 @staticmethod
429 - def perspective_finalize_export(params):
430 """Expose the finalize export functionality. 431 432 """ 433 instance = objects.Instance.FromDict(params[0]) 434 435 snap_disks = [] 436 for disk in params[1]: 437 if isinstance(disk, bool): 438 snap_disks.append(disk) 439 else: 440 snap_disks.append(objects.Disk.FromDict(disk)) 441 442 return backend.FinalizeExport(instance, snap_disks)
443 444 @staticmethod
445 - def perspective_export_info(params):
446 """Query information about an existing export on this node. 447 448 The given path may not contain an export, in which case we return 449 None. 450 451 """ 452 path = params[0] 453 return backend.ExportInfo(path)
454 455 @staticmethod
456 - def perspective_export_list(params):
457 """List the available exports on this node. 458 459 Note that as opposed to export_info, which may query data about an 460 export in any path, this only queries the standard Ganeti path 461 (pathutils.EXPORT_DIR). 462 463 """ 464 return backend.ListExports()
465 466 @staticmethod
467 - def perspective_export_remove(params):
468 """Remove an export. 469 470 """ 471 export = params[0] 472 return backend.RemoveExport(export)
473 474 # block device --------------------- 475 @staticmethod
476 - def perspective_bdev_sizes(params):
477 """Query the list of block devices 478 479 """ 480 devices = params[0] 481 return backend.GetBlockDevSizes(devices)
482 483 # volume -------------------------- 484 485 @staticmethod
486 - def perspective_lv_list(params):
487 """Query the list of logical volumes in a given volume group. 488 489 """ 490 vgname = params[0] 491 return backend.GetVolumeList(vgname)
492 493 @staticmethod
494 - def perspective_vg_list(params):
495 """Query the list of volume groups. 496 497 """ 498 return backend.ListVolumeGroups()
499 500 # Storage -------------------------- 501 502 @staticmethod
503 - def perspective_storage_list(params):
504 """Get list of storage units. 505 506 """ 507 (su_name, su_args, name, fields) = params 508 return storage.GetStorage(su_name, *su_args).List(name, fields)
509 510 @staticmethod
511 - def perspective_storage_modify(params):
512 """Modify a storage unit. 513 514 """ 515 (su_name, su_args, name, changes) = params 516 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
517 518 @staticmethod
519 - def perspective_storage_execute(params):
520 """Execute an operation on a storage unit. 521 522 """ 523 (su_name, su_args, name, op) = params 524 return storage.GetStorage(su_name, *su_args).Execute(name, op)
525 526 # bridge -------------------------- 527 528 @staticmethod
529 - def perspective_bridges_exist(params):
530 """Check if all bridges given exist on this node. 531 532 """ 533 bridges_list = params[0] 534 return backend.BridgesExist(bridges_list)
535 536 # instance -------------------------- 537 538 @staticmethod
539 - def perspective_instance_os_add(params):
540 """Install an OS on a given instance. 541 542 """ 543 inst_s = params[0] 544 inst = objects.Instance.FromDict(inst_s) 545 reinstall = params[1] 546 debug = params[2] 547 return backend.InstanceOsAdd(inst, reinstall, debug)
548 549 @staticmethod
551 """Runs the OS rename script for an instance. 552 553 """ 554 inst_s, old_name, debug = params 555 inst = objects.Instance.FromDict(inst_s) 556 return backend.RunRenameInstance(inst, old_name, debug)
557 558 @staticmethod
559 - def perspective_instance_shutdown(params):
560 """Shutdown an instance. 561 562 """ 563 instance = objects.Instance.FromDict(params[0]) 564 timeout = params[1] 565 return backend.InstanceShutdown(instance, timeout)
566 567 @staticmethod
568 - def perspective_instance_start(params):
569 """Start an instance. 570 571 """ 572 (instance_name, startup_paused) = params 573 instance = objects.Instance.FromDict(instance_name) 574 return backend.StartInstance(instance, startup_paused)
575 576 @staticmethod
577 - def perspective_migration_info(params):
578 """Gather information about an instance to be migrated. 579 580 """ 581 instance = objects.Instance.FromDict(params[0]) 582 return backend.MigrationInfo(instance)
583 584 @staticmethod
585 - def perspective_accept_instance(params):
586 """Prepare the node to accept an instance. 587 588 """ 589 instance, info, target = params 590 instance = objects.Instance.FromDict(instance) 591 return backend.AcceptInstance(instance, info, target)
592 593 @staticmethod
595 """Finalize the instance migration on the destination node. 596 597 """ 598 instance, info, success = params 599 instance = objects.Instance.FromDict(instance) 600 return backend.FinalizeMigrationDst(instance, info, success)
601 602 @staticmethod
603 - def perspective_instance_migrate(params):
604 """Migrates an instance. 605 606 """ 607 instance, target, live = params 608 instance = objects.Instance.FromDict(instance) 609 return backend.MigrateInstance(instance, target, live)
610 611 @staticmethod
613 """Finalize the instance migration on the source node. 614 615 """ 616 instance, success, live = params 617 instance = objects.Instance.FromDict(instance) 618 return backend.FinalizeMigrationSource(instance, success, live)
619 620 @staticmethod
622 """Reports migration status. 623 624 """ 625 instance = objects.Instance.FromDict(params[0]) 626 return backend.GetMigrationStatus(instance).ToDict()
627 628 @staticmethod
629 - def perspective_instance_reboot(params):
630 """Reboot an instance. 631 632 """ 633 instance = objects.Instance.FromDict(params[0]) 634 reboot_type = params[1] 635 shutdown_timeout = params[2] 636 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
637 638 @staticmethod
640 """Modify instance runtime memory. 641 642 """ 643 instance_dict, memory = params 644 instance = objects.Instance.FromDict(instance_dict) 645 return backend.InstanceBalloonMemory(instance, memory)
646 647 @staticmethod
648 - def perspective_instance_info(params):
649 """Query instance information. 650 651 """ 652 return backend.GetInstanceInfo(params[0], params[1])
653 654 @staticmethod
656 """Query whether the specified instance can be migrated. 657 658 """ 659 instance = objects.Instance.FromDict(params[0]) 660 return backend.GetInstanceMigratable(instance)
661 662 @staticmethod
664 """Query information about all instances. 665 666 """ 667 return backend.GetAllInstancesInfo(params[0])
668 669 @staticmethod
670 - def perspective_instance_list(params):
671 """Query the list of running instances. 672 673 """ 674 return backend.GetInstanceList(params[0])
675 676 # node -------------------------- 677 678 @staticmethod
680 """Checks if a node has the given ip address. 681 682 """ 683 return netutils.IPAddress.Own(params[0])
684 685 @staticmethod
686 - def perspective_node_info(params):
687 """Query node information. 688 689 """ 690 (vg_names, hv_names, excl_stor) = params 691 return backend.GetNodeInfo(vg_names, hv_names, excl_stor)
692 693 @staticmethod
694 - def perspective_etc_hosts_modify(params):
695 """Modify a node entry in /etc/hosts. 696 697 """ 698 backend.EtcHostsModify(params[0], params[1], params[2]) 699 700 return True
701 702 @staticmethod
703 - def perspective_node_verify(params):
704 """Run a verify sequence on this node. 705 706 """ 707 return backend.VerifyNode(params[0], params[1])
708 709 @classmethod
710 - def perspective_node_verify_light(cls, params):
711 """Run a light verify sequence on this node. 712 713 """ 714 # So far it's the same as the normal node_verify 715 return cls.perspective_node_verify(params)
716 717 @staticmethod
719 """Start the master daemons on this node. 720 721 """ 722 return backend.StartMasterDaemons(params[0])
723 724 @staticmethod
726 """Activate the master IP on this node. 727 728 """ 729 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 730 return backend.ActivateMasterIp(master_params, params[1])
731 732 @staticmethod
734 """Deactivate the master IP on this node. 735 736 """ 737 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 738 return backend.DeactivateMasterIp(master_params, params[1])
739 740 @staticmethod
741 - def perspective_node_stop_master(params):
742 """Stops master daemons on this node. 743 744 """ 745 return backend.StopMasterDaemons()
746 747 @staticmethod
749 """Change the master IP netmask. 750 751 """ 752 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 753 params[3])
754 755 @staticmethod
757 """Cleanup after leaving a cluster. 758 759 """ 760 return backend.LeaveCluster(params[0])
761 762 @staticmethod
763 - def perspective_node_volumes(params):
764 """Query the list of all logical volume groups. 765 766 """ 767 return backend.NodeVolumes()
768 769 @staticmethod
771 """Demote a node from the master candidate role. 772 773 """ 774 return backend.DemoteFromMC()
775 776 @staticmethod
777 - def perspective_node_powercycle(params):
778 """Tries to powercycle the nod. 779 780 """ 781 hypervisor_type = params[0] 782 return backend.PowercycleNode(hypervisor_type)
783 784 # cluster -------------------------- 785 786 @staticmethod
787 - def perspective_version(params):
788 """Query version information. 789 790 """ 791 return constants.PROTOCOL_VERSION
792 793 @staticmethod
794 - def perspective_upload_file(params):
795 """Upload a file. 796 797 Note that the backend implementation imposes strict rules on which 798 files are accepted. 799 800 """ 801 return backend.UploadFile(*(params[0]))
802 803 @staticmethod
804 - def perspective_master_info(params):
805 """Query master information. 806 807 """ 808 return backend.GetMasterInfo()
809 810 @staticmethod
811 - def perspective_run_oob(params):
812 """Runs oob on node. 813 814 """ 815 output = backend.RunOob(params[0], params[1], params[2], params[3]) 816 if output: 817 result = serializer.LoadJson(output) 818 else: 819 result = None 820 return result
821 822 @staticmethod
824 """Runs a restricted command. 825 826 """ 827 (cmd, ) = params 828 829 return backend.RunRestrictedCmd(cmd)
830 831 @staticmethod
833 """Write ssconf files. 834 835 """ 836 (values,) = params 837 return ssconf.WriteSsconfFiles(values)
838 839 @staticmethod
840 - def perspective_get_watcher_pause(params):
841 """Get watcher pause end. 842 843 """ 844 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
845 846 @staticmethod
847 - def perspective_set_watcher_pause(params):
848 """Set watcher pause. 849 850 """ 851 (until, ) = params 852 return backend.SetWatcherPause(until)
853 854 # os ----------------------- 855 856 @staticmethod
857 - def perspective_os_diagnose(params):
858 """Query detailed information about existing OSes. 859 860 """ 861 return backend.DiagnoseOS()
862 863 @staticmethod
864 - def perspective_os_get(params):
865 """Query information about a given OS. 866 867 """ 868 name = params[0] 869 os_obj = backend.OSFromDisk(name) 870 return os_obj.ToDict()
871 872 @staticmethod
873 - def perspective_os_validate(params):
874 """Run a given OS' validation routine. 875 876 """ 877 required, name, checks, params = params 878 return backend.ValidateOS(required, name, checks, params)
879 880 # extstorage ----------------------- 881 882 @staticmethod
884 """Query detailed information about existing extstorage providers. 885 886 """ 887 return backend.DiagnoseExtStorage()
888 889 # hooks ----------------------- 890 891 @staticmethod
892 - def perspective_hooks_runner(params):
893 """Run hook scripts. 894 895 """ 896 hpath, phase, env = params 897 hr = backend.HooksRunner() 898 return hr.RunHooks(hpath, phase, env)
899 900 # iallocator ----------------- 901 902 @staticmethod
903 - def perspective_iallocator_runner(params):
904 """Run an iallocator script. 905 906 """ 907 name, idata = params 908 iar = backend.IAllocatorRunner() 909 return iar.Run(name, idata)
910 911 # test ----------------------- 912 913 @staticmethod
914 - def perspective_test_delay(params):
915 """Run test delay. 916 917 """ 918 duration = params[0] 919 status, rval = utils.TestDelay(duration) 920 if not status: 921 raise backend.RPCFail(rval) 922 return rval
923 924 # file storage --------------- 925 926 @staticmethod
928 """Create the file storage directory. 929 930 """ 931 file_storage_dir = params[0] 932 return backend.CreateFileStorageDir(file_storage_dir)
933 934 @staticmethod
936 """Remove the file storage directory. 937 938 """ 939 file_storage_dir = params[0] 940 return backend.RemoveFileStorageDir(file_storage_dir)
941 942 @staticmethod
944 """Rename the file storage directory. 945 946 """ 947 old_file_storage_dir = params[0] 948 new_file_storage_dir = params[1] 949 return backend.RenameFileStorageDir(old_file_storage_dir, 950 new_file_storage_dir)
951 952 # jobs ------------------------ 953 954 @staticmethod 955 @_RequireJobQueueLock
956 - def perspective_jobqueue_update(params):
957 """Update job queue. 958 959 """ 960 (file_name, content) = params 961 return backend.JobQueueUpdate(file_name, content)
962 963 @staticmethod 964 @_RequireJobQueueLock
965 - def perspective_jobqueue_purge(params):
966 """Purge job queue. 967 968 """ 969 return backend.JobQueuePurge()
970 971 @staticmethod 972 @_RequireJobQueueLock
973 - def perspective_jobqueue_rename(params):
974 """Rename a job queue file. 975 976 """ 977 # TODO: What if a file fails to rename? 978 return [backend.JobQueueRename(old, new) for old, new in params[0]]
979 980 @staticmethod 981 @_RequireJobQueueLock
983 """Set job queue's drain flag. 984 985 """ 986 (flag, ) = params 987 988 return jstore.SetDrainFlag(flag)
989 990 # hypervisor --------------- 991 992 @staticmethod
994 """Validate the hypervisor parameters. 995 996 """ 997 (hvname, hvparams) = params 998 return backend.ValidateHVParams(hvname, hvparams)
999 1000 # Crypto 1001 1002 @staticmethod
1003 - def perspective_x509_cert_create(params):
1004 """Creates a new X509 certificate for SSL/TLS. 1005 1006 """ 1007 (validity, ) = params 1008 return backend.CreateX509Certificate(validity)
1009 1010 @staticmethod
1011 - def perspective_x509_cert_remove(params):
1012 """Removes a X509 certificate. 1013 1014 """ 1015 (name, ) = params 1016 return backend.RemoveX509Certificate(name)
1017 1018 # Import and export 1019 1020 @staticmethod
1021 - def perspective_import_start(params):
1022 """Starts an import daemon. 1023 1024 """ 1025 (opts_s, instance, component, (dest, dest_args)) = params 1026 1027 opts = objects.ImportExportOptions.FromDict(opts_s) 1028 1029 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 1030 None, None, 1031 objects.Instance.FromDict(instance), 1032 component, dest, 1033 _DecodeImportExportIO(dest, 1034 dest_args))
1035 1036 @staticmethod
1037 - def perspective_export_start(params):
1038 """Starts an export daemon. 1039 1040 """ 1041 (opts_s, host, port, instance, component, (source, source_args)) = params 1042 1043 opts = objects.ImportExportOptions.FromDict(opts_s) 1044 1045 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 1046 host, port, 1047 objects.Instance.FromDict(instance), 1048 component, source, 1049 _DecodeImportExportIO(source, 1050 source_args))
1051 1052 @staticmethod
1053 - def perspective_impexp_status(params):
1054 """Retrieves the status of an import or export daemon. 1055 1056 """ 1057 return backend.GetImportExportStatus(params[0])
1058 1059 @staticmethod
1060 - def perspective_impexp_abort(params):
1061 """Aborts an import or export. 1062 1063 """ 1064 return backend.AbortImportExport(params[0])
1065 1066 @staticmethod
1067 - def perspective_impexp_cleanup(params):
1068 """Cleans up after an import or export. 1069 1070 """ 1071 return backend.CleanupImportExport(params[0])
1072
1073 1074 -def CheckNoded(_, args):
1075 """Initial checks whether to run or exit with a failure. 1076 1077 """ 1078 if args: # noded doesn't take any arguments 1079 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1080 sys.argv[0]) 1081 sys.exit(constants.EXIT_FAILURE) 1082 try: 1083 codecs.lookup("string-escape") 1084 except LookupError: 1085 print >> sys.stderr, ("Can't load the string-escape code which is part" 1086 " of the Python installation. Is your installation" 1087 " complete/correct? Aborting.") 1088 sys.exit(constants.EXIT_FAILURE)
1089
1090 1091 -def PrepNoded(options, _):
1092 """Preparation node daemon function, executed with the PID file held. 1093 1094 """ 1095 if options.mlock: 1096 request_executor_class = MlockallRequestExecutor 1097 try: 1098 utils.Mlockall() 1099 except errors.NoCtypesError: 1100 logging.warning("Cannot set memory lock, ctypes module not found") 1101 request_executor_class = http.server.HttpServerRequestExecutor 1102 else: 1103 request_executor_class = http.server.HttpServerRequestExecutor 1104 1105 # Read SSL certificate 1106 if options.ssl: 1107 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1108 ssl_cert_path=options.ssl_cert) 1109 else: 1110 ssl_params = None 1111 1112 err = _PrepareQueueLock() 1113 if err is not None: 1114 # this might be some kind of file-system/permission error; while 1115 # this breaks the job queue functionality, we shouldn't prevent 1116 # startup of the whole node daemon because of this 1117 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1118 1119 handler = NodeRequestHandler() 1120 1121 mainloop = daemon.Mainloop() 1122 server = \ 1123 http.server.HttpServer(mainloop, options.bind_address, options.port, 1124 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1125 request_executor_class=request_executor_class) 1126 server.Start() 1127 1128 return (mainloop, server)
1129
1130 1131 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1132 """Main node daemon function, executed with the PID file held. 1133 1134 """ 1135 (mainloop, server) = prep_data 1136 try: 1137 mainloop.Run() 1138 finally: 1139 server.Stop() 1140
1141 1142 -def Main():
1143 """Main function for the node daemon. 1144 1145 """ 1146 parser = OptionParser(description="Ganeti node daemon", 1147 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", 1148 version="%%prog (ganeti) %s" % 1149 constants.RELEASE_VERSION) 1150 parser.add_option("--no-mlock", dest="mlock", 1151 help="Do not mlock the node memory in ram", 1152 default=True, action="store_false") 1153 1154 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1155 default_ssl_cert=pathutils.NODED_CERT_FILE, 1156 default_ssl_key=pathutils.NODED_CERT_FILE, 1157 console_logging=True)
1158