Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Ganeti node daemon""" 
  23   
  24  # pylint: disable=C0103,W0142 
  25   
  26  # C0103: Functions in this module need to have a given name structure, 
  27  # and the name of the daemon doesn't match 
  28   
  29  # W0142: Used * or ** magic, since we do use it extensively in this 
  30  # module 
  31   
  32  import os 
  33  import sys 
  34  import logging 
  35  import signal 
  36  import codecs 
  37   
  38  from optparse import OptionParser 
  39   
  40  from ganeti import backend 
  41  from ganeti import constants 
  42  from ganeti import objects 
  43  from ganeti import errors 
  44  from ganeti import jstore 
  45  from ganeti import daemon 
  46  from ganeti import http 
  47  from ganeti import utils 
  48  from ganeti import storage 
  49  from ganeti import serializer 
  50  from ganeti import netutils 
  51   
  52  import ganeti.http.server # pylint: disable=W0611 
  53   
  54   
  55  queue_lock = None 
56 57 58 -def _PrepareQueueLock():
59 """Try to prepare the queue lock. 60 61 @return: None for success, otherwise an exception object 62 63 """ 64 global queue_lock # pylint: disable=W0603 65 66 if queue_lock is not None: 67 return None 68 69 # Prepare job queue 70 try: 71 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 72 return None 73 except EnvironmentError, err: 74 return err
75
76 77 -def _RequireJobQueueLock(fn):
78 """Decorator for job queue manipulating functions. 79 80 """ 81 QUEUE_LOCK_TIMEOUT = 10 82 83 def wrapper(*args, **kwargs): 84 # Locking in exclusive, blocking mode because there could be several 85 # children running at the same time. Waiting up to 10 seconds. 86 if _PrepareQueueLock() is not None: 87 raise errors.JobQueueError("Job queue failed initialization," 88 " cannot update jobs") 89 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 90 try: 91 return fn(*args, **kwargs) 92 finally: 93 queue_lock.Unlock()
94 95 return wrapper 96
97 98 -def _DecodeImportExportIO(ieio, ieioargs):
99 """Decodes import/export I/O information. 100 101 """ 102 if ieio == constants.IEIO_RAW_DISK: 103 assert len(ieioargs) == 1 104 return (objects.Disk.FromDict(ieioargs[0]), ) 105 106 if ieio == constants.IEIO_SCRIPT: 107 assert len(ieioargs) == 2 108 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 109 110 return ieioargs
111
112 113 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114 """Custom Request Executor class that ensures NodeHttpServer children are 115 locked in ram. 116 117 """
118 - def __init__(self, *args, **kwargs):
119 utils.Mlockall() 120 121 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122
123 124 -class NodeHttpServer(http.server.HttpServer):
125 """The server implementation. 126 127 This class holds all methods exposed over the RPC interface. 128 129 """ 130 # too many public methods, and unused args - all methods get params 131 # due to the API 132 # pylint: disable=R0904,W0613
133 - def __init__(self, *args, **kwargs):
134 http.server.HttpServer.__init__(self, *args, **kwargs) 135 self.noded_pid = os.getpid()
136
137 - def HandleRequest(self, req):
138 """Handle a request. 139 140 """ 141 # FIXME: Remove HTTP_PUT in Ganeti 2.7 142 if req.request_method.upper() not in (http.HTTP_PUT, http.HTTP_POST): 143 raise http.HttpBadRequest("Only PUT and POST methods are supported") 144 145 path = req.request_path 146 if path.startswith("/"): 147 path = path[1:] 148 149 method = getattr(self, "perspective_%s" % path, None) 150 if method is None: 151 raise http.HttpNotFound() 152 153 try: 154 result = (True, method(serializer.LoadJson(req.request_body))) 155 156 except backend.RPCFail, err: 157 # our custom failure exception; str(err) works fine if the 158 # exception was constructed with a single argument, and in 159 # this case, err.message == err.args[0] == str(err) 160 result = (False, str(err)) 161 except errors.QuitGanetiException, err: 162 # Tell parent to quit 163 logging.info("Shutting down the node daemon, arguments: %s", 164 str(err.args)) 165 os.kill(self.noded_pid, signal.SIGTERM) 166 # And return the error's arguments, which must be already in 167 # correct tuple format 168 result = err.args 169 except Exception, err: 170 logging.exception("Error in RPC call") 171 result = (False, "Error while executing backend function: %s" % str(err)) 172 173 return serializer.DumpJson(result, indent=False)
174 175 # the new block devices -------------------------- 176 177 @staticmethod
178 - def perspective_blockdev_create(params):
179 """Create a block device. 180 181 """ 182 bdev_s, size, owner, on_primary, info = params 183 bdev = objects.Disk.FromDict(bdev_s) 184 if bdev is None: 185 raise ValueError("can't unserialize data!") 186 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
187 188 @staticmethod
190 """Pause/resume sync of a block device. 191 192 """ 193 disks_s, pause = params 194 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 195 return backend.BlockdevPauseResumeSync(disks, pause)
196 197 @staticmethod
198 - def perspective_blockdev_wipe(params):
199 """Wipe a block device. 200 201 """ 202 bdev_s, offset, size = params 203 bdev = objects.Disk.FromDict(bdev_s) 204 return backend.BlockdevWipe(bdev, offset, size)
205 206 @staticmethod
207 - def perspective_blockdev_remove(params):
208 """Remove a block device. 209 210 """ 211 bdev_s = params[0] 212 bdev = objects.Disk.FromDict(bdev_s) 213 return backend.BlockdevRemove(bdev)
214 215 @staticmethod
216 - def perspective_blockdev_rename(params):
217 """Remove a block device. 218 219 """ 220 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params] 221 return backend.BlockdevRename(devlist)
222 223 @staticmethod
224 - def perspective_blockdev_assemble(params):
225 """Assemble a block device. 226 227 """ 228 bdev_s, owner, on_primary, idx = params 229 bdev = objects.Disk.FromDict(bdev_s) 230 if bdev is None: 231 raise ValueError("can't unserialize data!") 232 return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
233 234 @staticmethod
235 - def perspective_blockdev_shutdown(params):
236 """Shutdown a block device. 237 238 """ 239 bdev_s = params[0] 240 bdev = objects.Disk.FromDict(bdev_s) 241 if bdev is None: 242 raise ValueError("can't unserialize data!") 243 return backend.BlockdevShutdown(bdev)
244 245 @staticmethod
247 """Add a child to a mirror device. 248 249 Note: this is only valid for mirror devices. It's the caller's duty 250 to send a correct disk, otherwise we raise an error. 251 252 """ 253 bdev_s, ndev_s = params 254 bdev = objects.Disk.FromDict(bdev_s) 255 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 256 if bdev is None or ndevs.count(None) > 0: 257 raise ValueError("can't unserialize data!") 258 return backend.BlockdevAddchildren(bdev, ndevs)
259 260 @staticmethod
262 """Remove a child from a mirror device. 263 264 This is only valid for mirror devices, of course. It's the callers 265 duty to send a correct disk, otherwise we raise an error. 266 267 """ 268 bdev_s, ndev_s = params 269 bdev = objects.Disk.FromDict(bdev_s) 270 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 271 if bdev is None or ndevs.count(None) > 0: 272 raise ValueError("can't unserialize data!") 273 return backend.BlockdevRemovechildren(bdev, ndevs)
274 275 @staticmethod
277 """Return the mirror status for a list of disks. 278 279 """ 280 disks = [objects.Disk.FromDict(dsk_s) 281 for dsk_s in params] 282 return [status.ToDict() 283 for status in backend.BlockdevGetmirrorstatus(disks)]
284 285 @staticmethod
287 """Return the mirror status for a list of disks. 288 289 """ 290 (node_disks, ) = params 291 292 node_name = netutils.Hostname.GetSysName() 293 294 disks = [objects.Disk.FromDict(dsk_s) 295 for dsk_s in node_disks.get(node_name, [])] 296 297 result = [] 298 299 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 300 if success: 301 result.append((success, status.ToDict())) 302 else: 303 result.append((success, status)) 304 305 return result
306 307 @staticmethod
308 - def perspective_blockdev_find(params):
309 """Expose the FindBlockDevice functionality for a disk. 310 311 This will try to find but not activate a disk. 312 313 """ 314 disk = objects.Disk.FromDict(params[0]) 315 316 result = backend.BlockdevFind(disk) 317 if result is None: 318 return None 319 320 return result.ToDict()
321 322 @staticmethod
323 - def perspective_blockdev_snapshot(params):
324 """Create a snapshot device. 325 326 Note that this is only valid for LVM disks, if we get passed 327 something else we raise an exception. The snapshot device can be 328 remove by calling the generic block device remove call. 329 330 """ 331 cfbd = objects.Disk.FromDict(params[0]) 332 return backend.BlockdevSnapshot(cfbd)
333 334 @staticmethod
335 - def perspective_blockdev_grow(params):
336 """Grow a stack of devices. 337 338 """ 339 cfbd = objects.Disk.FromDict(params[0]) 340 amount = params[1] 341 dryrun = params[2] 342 return backend.BlockdevGrow(cfbd, amount, dryrun)
343 344 @staticmethod
345 - def perspective_blockdev_close(params):
346 """Closes the given block devices. 347 348 """ 349 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 350 return backend.BlockdevClose(params[0], disks)
351 352 @staticmethod
353 - def perspective_blockdev_getsize(params):
354 """Compute the sizes of the given block devices. 355 356 """ 357 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 358 return backend.BlockdevGetsize(disks)
359 360 @staticmethod
361 - def perspective_blockdev_export(params):
362 """Compute the sizes of the given block devices. 363 364 """ 365 disk = objects.Disk.FromDict(params[0]) 366 dest_node, dest_path, cluster_name = params[1:] 367 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
368 369 # blockdev/drbd specific methods ---------- 370 371 @staticmethod
373 """Disconnects the network connection of drbd disks. 374 375 Note that this is only valid for drbd disks, so the members of the 376 disk list must all be drbd devices. 377 378 """ 379 nodes_ip, disks = params 380 disks = [objects.Disk.FromDict(cf) for cf in disks] 381 return backend.DrbdDisconnectNet(nodes_ip, disks)
382 383 @staticmethod
384 - def perspective_drbd_attach_net(params):
385 """Attaches the network connection of drbd disks. 386 387 Note that this is only valid for drbd disks, so the members of the 388 disk list must all be drbd devices. 389 390 """ 391 nodes_ip, disks, instance_name, multimaster = params 392 disks = [objects.Disk.FromDict(cf) for cf in disks] 393 return backend.DrbdAttachNet(nodes_ip, disks, 394 instance_name, multimaster)
395 396 @staticmethod
397 - def perspective_drbd_wait_sync(params):
398 """Wait until DRBD disks are synched. 399 400 Note that this is only valid for drbd disks, so the members of the 401 disk list must all be drbd devices. 402 403 """ 404 nodes_ip, disks = params 405 disks = [objects.Disk.FromDict(cf) for cf in disks] 406 return backend.DrbdWaitSync(nodes_ip, disks)
407 408 @staticmethod
409 - def perspective_drbd_helper(params):
410 """Query drbd helper. 411 412 """ 413 return backend.GetDrbdUsermodeHelper()
414 415 # export/import -------------------------- 416 417 @staticmethod
418 - def perspective_finalize_export(params):
419 """Expose the finalize export functionality. 420 421 """ 422 instance = objects.Instance.FromDict(params[0]) 423 424 snap_disks = [] 425 for disk in params[1]: 426 if isinstance(disk, bool): 427 snap_disks.append(disk) 428 else: 429 snap_disks.append(objects.Disk.FromDict(disk)) 430 431 return backend.FinalizeExport(instance, snap_disks)
432 433 @staticmethod
434 - def perspective_export_info(params):
435 """Query information about an existing export on this node. 436 437 The given path may not contain an export, in which case we return 438 None. 439 440 """ 441 path = params[0] 442 return backend.ExportInfo(path)
443 444 @staticmethod
445 - def perspective_export_list(params):
446 """List the available exports on this node. 447 448 Note that as opposed to export_info, which may query data about an 449 export in any path, this only queries the standard Ganeti path 450 (constants.EXPORT_DIR). 451 452 """ 453 return backend.ListExports()
454 455 @staticmethod
456 - def perspective_export_remove(params):
457 """Remove an export. 458 459 """ 460 export = params[0] 461 return backend.RemoveExport(export)
462 463 # block device --------------------- 464 @staticmethod
465 - def perspective_bdev_sizes(params):
466 """Query the list of block devices 467 468 """ 469 devices = params[0] 470 return backend.GetBlockDevSizes(devices)
471 472 # volume -------------------------- 473 474 @staticmethod
475 - def perspective_lv_list(params):
476 """Query the list of logical volumes in a given volume group. 477 478 """ 479 vgname = params[0] 480 return backend.GetVolumeList(vgname)
481 482 @staticmethod
483 - def perspective_vg_list(params):
484 """Query the list of volume groups. 485 486 """ 487 return backend.ListVolumeGroups()
488 489 # Storage -------------------------- 490 491 @staticmethod
492 - def perspective_storage_list(params):
493 """Get list of storage units. 494 495 """ 496 (su_name, su_args, name, fields) = params 497 return storage.GetStorage(su_name, *su_args).List(name, fields)
498 499 @staticmethod
500 - def perspective_storage_modify(params):
501 """Modify a storage unit. 502 503 """ 504 (su_name, su_args, name, changes) = params 505 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
506 507 @staticmethod
508 - def perspective_storage_execute(params):
509 """Execute an operation on a storage unit. 510 511 """ 512 (su_name, su_args, name, op) = params 513 return storage.GetStorage(su_name, *su_args).Execute(name, op)
514 515 # bridge -------------------------- 516 517 @staticmethod
518 - def perspective_bridges_exist(params):
519 """Check if all bridges given exist on this node. 520 521 """ 522 bridges_list = params[0] 523 return backend.BridgesExist(bridges_list)
524 525 # instance -------------------------- 526 527 @staticmethod
528 - def perspective_instance_os_add(params):
529 """Install an OS on a given instance. 530 531 """ 532 inst_s = params[0] 533 inst = objects.Instance.FromDict(inst_s) 534 reinstall = params[1] 535 debug = params[2] 536 return backend.InstanceOsAdd(inst, reinstall, debug)
537 538 @staticmethod
540 """Runs the OS rename script for an instance. 541 542 """ 543 inst_s, old_name, debug = params 544 inst = objects.Instance.FromDict(inst_s) 545 return backend.RunRenameInstance(inst, old_name, debug)
546 547 @staticmethod
548 - def perspective_instance_shutdown(params):
549 """Shutdown an instance. 550 551 """ 552 instance = objects.Instance.FromDict(params[0]) 553 timeout = params[1] 554 return backend.InstanceShutdown(instance, timeout)
555 556 @staticmethod
557 - def perspective_instance_start(params):
558 """Start an instance. 559 560 """ 561 (instance_name, startup_paused) = params 562 instance = objects.Instance.FromDict(instance_name) 563 return backend.StartInstance(instance, startup_paused)
564 565 @staticmethod
566 - def perspective_migration_info(params):
567 """Gather information about an instance to be migrated. 568 569 """ 570 instance = objects.Instance.FromDict(params[0]) 571 return backend.MigrationInfo(instance)
572 573 @staticmethod
574 - def perspective_accept_instance(params):
575 """Prepare the node to accept an instance. 576 577 """ 578 instance, info, target = params 579 instance = objects.Instance.FromDict(instance) 580 return backend.AcceptInstance(instance, info, target)
581 582 @staticmethod
584 """Finalize the instance migration. 585 586 """ 587 instance, info, success = params 588 instance = objects.Instance.FromDict(instance) 589 return backend.FinalizeMigration(instance, info, success)
590 591 @staticmethod
592 - def perspective_instance_migrate(params):
593 """Migrates an instance. 594 595 """ 596 instance, target, live = params 597 instance = objects.Instance.FromDict(instance) 598 return backend.MigrateInstance(instance, target, live)
599 600 @staticmethod
601 - def perspective_instance_reboot(params):
602 """Reboot an instance. 603 604 """ 605 instance = objects.Instance.FromDict(params[0]) 606 reboot_type = params[1] 607 shutdown_timeout = params[2] 608 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
609 610 @staticmethod
611 - def perspective_instance_info(params):
612 """Query instance information. 613 614 """ 615 return backend.GetInstanceInfo(params[0], params[1])
616 617 @staticmethod
619 """Query whether the specified instance can be migrated. 620 621 """ 622 instance = objects.Instance.FromDict(params[0]) 623 return backend.GetInstanceMigratable(instance)
624 625 @staticmethod
627 """Query information about all instances. 628 629 """ 630 return backend.GetAllInstancesInfo(params[0])
631 632 @staticmethod
633 - def perspective_instance_list(params):
634 """Query the list of running instances. 635 636 """ 637 return backend.GetInstanceList(params[0])
638 639 # node -------------------------- 640 641 @staticmethod
642 - def perspective_node_tcp_ping(params):
643 """Do a TcpPing on the remote node. 644 645 """ 646 return netutils.TcpPing(params[1], params[2], timeout=params[3], 647 live_port_needed=params[4], source=params[0])
648 649 @staticmethod
651 """Checks if a node has the given ip address. 652 653 """ 654 return netutils.IPAddress.Own(params[0])
655 656 @staticmethod
657 - def perspective_node_info(params):
658 """Query node information. 659 660 """ 661 vgname, hypervisor_type = params 662 return backend.GetNodeInfo(vgname, hypervisor_type)
663 664 @staticmethod
665 - def perspective_etc_hosts_modify(params):
666 """Modify a node entry in /etc/hosts. 667 668 """ 669 backend.EtcHostsModify(params[0], params[1], params[2]) 670 671 return True
672 673 @staticmethod
674 - def perspective_node_verify(params):
675 """Run a verify sequence on this node. 676 677 """ 678 return backend.VerifyNode(params[0], params[1])
679 680 @staticmethod
681 - def perspective_node_start_master(params):
682 """Promote this node to master status. 683 684 """ 685 return backend.StartMaster(params[0], params[1])
686 687 @staticmethod
688 - def perspective_node_stop_master(params):
689 """Demote this node from master status. 690 691 """ 692 return backend.StopMaster(params[0])
693 694 @staticmethod
696 """Cleanup after leaving a cluster. 697 698 """ 699 return backend.LeaveCluster(params[0])
700 701 @staticmethod
702 - def perspective_node_volumes(params):
703 """Query the list of all logical volume groups. 704 705 """ 706 return backend.NodeVolumes()
707 708 @staticmethod
710 """Demote a node from the master candidate role. 711 712 """ 713 return backend.DemoteFromMC()
714 715 @staticmethod
716 - def perspective_node_powercycle(params):
717 """Tries to powercycle the nod. 718 719 """ 720 hypervisor_type = params[0] 721 return backend.PowercycleNode(hypervisor_type)
722 723 # cluster -------------------------- 724 725 @staticmethod
726 - def perspective_version(params):
727 """Query version information. 728 729 """ 730 return constants.PROTOCOL_VERSION
731 732 @staticmethod
733 - def perspective_upload_file(params):
734 """Upload a file. 735 736 Note that the backend implementation imposes strict rules on which 737 files are accepted. 738 739 """ 740 return backend.UploadFile(*params)
741 742 @staticmethod
743 - def perspective_master_info(params):
744 """Query master information. 745 746 """ 747 return backend.GetMasterInfo()
748 749 @staticmethod
750 - def perspective_run_oob(params):
751 """Runs oob on node. 752 753 """ 754 output = backend.RunOob(params[0], params[1], params[2], params[3]) 755 if output: 756 result = serializer.LoadJson(output) 757 else: 758 result = None 759 return result
760 761 @staticmethod
763 """Write ssconf files. 764 765 """ 766 (values,) = params 767 return backend.WriteSsconfFiles(values)
768 769 # os ----------------------- 770 771 @staticmethod
772 - def perspective_os_diagnose(params):
773 """Query detailed information about existing OSes. 774 775 """ 776 return backend.DiagnoseOS()
777 778 @staticmethod
779 - def perspective_os_get(params):
780 """Query information about a given OS. 781 782 """ 783 name = params[0] 784 os_obj = backend.OSFromDisk(name) 785 return os_obj.ToDict()
786 787 @staticmethod
788 - def perspective_os_validate(params):
789 """Run a given OS' validation routine. 790 791 """ 792 required, name, checks, params = params 793 return backend.ValidateOS(required, name, checks, params)
794 795 # hooks ----------------------- 796 797 @staticmethod
798 - def perspective_hooks_runner(params):
799 """Run hook scripts. 800 801 """ 802 hpath, phase, env = params 803 hr = backend.HooksRunner() 804 return hr.RunHooks(hpath, phase, env)
805 806 # iallocator ----------------- 807 808 @staticmethod
809 - def perspective_iallocator_runner(params):
810 """Run an iallocator script. 811 812 """ 813 name, idata = params 814 iar = backend.IAllocatorRunner() 815 return iar.Run(name, idata)
816 817 # test ----------------------- 818 819 @staticmethod
820 - def perspective_test_delay(params):
821 """Run test delay. 822 823 """ 824 duration = params[0] 825 status, rval = utils.TestDelay(duration) 826 if not status: 827 raise backend.RPCFail(rval) 828 return rval
829 830 # file storage --------------- 831 832 @staticmethod
834 """Create the file storage directory. 835 836 """ 837 file_storage_dir = params[0] 838 return backend.CreateFileStorageDir(file_storage_dir)
839 840 @staticmethod
842 """Remove the file storage directory. 843 844 """ 845 file_storage_dir = params[0] 846 return backend.RemoveFileStorageDir(file_storage_dir)
847 848 @staticmethod
850 """Rename the file storage directory. 851 852 """ 853 old_file_storage_dir = params[0] 854 new_file_storage_dir = params[1] 855 return backend.RenameFileStorageDir(old_file_storage_dir, 856 new_file_storage_dir)
857 858 # jobs ------------------------ 859 860 @staticmethod 861 @_RequireJobQueueLock
862 - def perspective_jobqueue_update(params):
863 """Update job queue. 864 865 """ 866 (file_name, content) = params 867 return backend.JobQueueUpdate(file_name, content)
868 869 @staticmethod 870 @_RequireJobQueueLock
871 - def perspective_jobqueue_purge(params):
872 """Purge job queue. 873 874 """ 875 return backend.JobQueuePurge()
876 877 @staticmethod 878 @_RequireJobQueueLock
879 - def perspective_jobqueue_rename(params):
880 """Rename a job queue file. 881 882 """ 883 # TODO: What if a file fails to rename? 884 return [backend.JobQueueRename(old, new) for old, new in params]
885 886 # hypervisor --------------- 887 888 @staticmethod
890 """Validate the hypervisor parameters. 891 892 """ 893 (hvname, hvparams) = params 894 return backend.ValidateHVParams(hvname, hvparams)
895 896 # Crypto 897 898 @staticmethod
899 - def perspective_x509_cert_create(params):
900 """Creates a new X509 certificate for SSL/TLS. 901 902 """ 903 (validity, ) = params 904 return backend.CreateX509Certificate(validity)
905 906 @staticmethod
907 - def perspective_x509_cert_remove(params):
908 """Removes a X509 certificate. 909 910 """ 911 (name, ) = params 912 return backend.RemoveX509Certificate(name)
913 914 # Import and export 915 916 @staticmethod
917 - def perspective_import_start(params):
918 """Starts an import daemon. 919 920 """ 921 (opts_s, instance, component, dest, dest_args) = params 922 923 opts = objects.ImportExportOptions.FromDict(opts_s) 924 925 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 926 None, None, 927 objects.Instance.FromDict(instance), 928 component, dest, 929 _DecodeImportExportIO(dest, 930 dest_args))
931 932 @staticmethod
933 - def perspective_export_start(params):
934 """Starts an export daemon. 935 936 """ 937 (opts_s, host, port, instance, component, source, source_args) = params 938 939 opts = objects.ImportExportOptions.FromDict(opts_s) 940 941 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 942 host, port, 943 objects.Instance.FromDict(instance), 944 component, source, 945 _DecodeImportExportIO(source, 946 source_args))
947 948 @staticmethod
949 - def perspective_impexp_status(params):
950 """Retrieves the status of an import or export daemon. 951 952 """ 953 return backend.GetImportExportStatus(params[0])
954 955 @staticmethod
956 - def perspective_impexp_abort(params):
957 """Aborts an import or export. 958 959 """ 960 return backend.AbortImportExport(params[0])
961 962 @staticmethod
963 - def perspective_impexp_cleanup(params):
964 """Cleans up after an import or export. 965 966 """ 967 return backend.CleanupImportExport(params[0])
968
969 970 -def CheckNoded(_, args):
971 """Initial checks whether to run or exit with a failure. 972 973 """ 974 if args: # noded doesn't take any arguments 975 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 976 sys.argv[0]) 977 sys.exit(constants.EXIT_FAILURE) 978 try: 979 codecs.lookup("string-escape") 980 except LookupError: 981 print >> sys.stderr, ("Can't load the string-escape code which is part" 982 " of the Python installation. Is your installation" 983 " complete/correct? Aborting.") 984 sys.exit(constants.EXIT_FAILURE)
985
986 987 -def PrepNoded(options, _):
988 """Preparation node daemon function, executed with the PID file held. 989 990 """ 991 if options.mlock: 992 request_executor_class = MlockallRequestExecutor 993 try: 994 utils.Mlockall() 995 except errors.NoCtypesError: 996 logging.warning("Cannot set memory lock, ctypes module not found") 997 request_executor_class = http.server.HttpServerRequestExecutor 998 else: 999 request_executor_class = http.server.HttpServerRequestExecutor 1000 1001 # Read SSL certificate 1002 if options.ssl: 1003 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1004 ssl_cert_path=options.ssl_cert) 1005 else: 1006 ssl_params = None 1007 1008 err = _PrepareQueueLock() 1009 if err is not None: 1010 # this might be some kind of file-system/permission error; while 1011 # this breaks the job queue functionality, we shouldn't prevent 1012 # startup of the whole node daemon because of this 1013 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1014 1015 mainloop = daemon.Mainloop() 1016 server = NodeHttpServer(mainloop, options.bind_address, options.port, 1017 ssl_params=ssl_params, ssl_verify_peer=True, 1018 request_executor_class=request_executor_class) 1019 server.Start() 1020 return (mainloop, server)
1021
1022 1023 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1024 """Main node daemon function, executed with the PID file held. 1025 1026 """ 1027 (mainloop, server) = prep_data 1028 try: 1029 mainloop.Run() 1030 finally: 1031 server.Stop() 1032
1033 1034 -def Main():
1035 """Main function for the node daemon. 1036 1037 """ 1038 parser = OptionParser(description="Ganeti node daemon", 1039 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", 1040 version="%%prog (ganeti) %s" % 1041 constants.RELEASE_VERSION) 1042 parser.add_option("--no-mlock", dest="mlock", 1043 help="Do not mlock the node memory in ram", 1044 default=True, action="store_false") 1045 1046 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1047 default_ssl_cert=constants.NODED_CERT_FILE, 1048 default_ssl_key=constants.NODED_CERT_FILE, 1049 console_logging=True)
1050