Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Ganeti node daemon""" 
  23   
  24  # pylint: disable-msg=C0103,W0142 
  25   
  26  # C0103: Functions in this module need to have a given name structure, 
  27  # and the name of the daemon doesn't match 
  28   
  29  # W0142: Used * or ** magic, since we do use it extensively in this 
  30  # module 
  31   
  32  import os 
  33  import sys 
  34  import logging 
  35  import signal 
  36  import codecs 
  37   
  38  from optparse import OptionParser 
  39   
  40  from ganeti import backend 
  41  from ganeti import constants 
  42  from ganeti import objects 
  43  from ganeti import errors 
  44  from ganeti import jstore 
  45  from ganeti import daemon 
  46  from ganeti import http 
  47  from ganeti import utils 
  48  from ganeti import storage 
  49  from ganeti import serializer 
  50  from ganeti import netutils 
  51   
  52  import ganeti.http.server # pylint: disable-msg=W0611 
  53   
  54   
  55  queue_lock = None 
56 57 58 -def _PrepareQueueLock():
59 """Try to prepare the queue lock. 60 61 @return: None for success, otherwise an exception object 62 63 """ 64 global queue_lock # pylint: disable-msg=W0603 65 66 if queue_lock is not None: 67 return None 68 69 # Prepare job queue 70 try: 71 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 72 return None 73 except EnvironmentError, err: 74 return err
75
76 77 -def _RequireJobQueueLock(fn):
78 """Decorator for job queue manipulating functions. 79 80 """ 81 QUEUE_LOCK_TIMEOUT = 10 82 83 def wrapper(*args, **kwargs): 84 # Locking in exclusive, blocking mode because there could be several 85 # children running at the same time. Waiting up to 10 seconds. 86 if _PrepareQueueLock() is not None: 87 raise errors.JobQueueError("Job queue failed initialization," 88 " cannot update jobs") 89 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 90 try: 91 return fn(*args, **kwargs) 92 finally: 93 queue_lock.Unlock()
94 95 return wrapper 96
97 98 -def _DecodeImportExportIO(ieio, ieioargs):
99 """Decodes import/export I/O information. 100 101 """ 102 if ieio == constants.IEIO_RAW_DISK: 103 assert len(ieioargs) == 1 104 return (objects.Disk.FromDict(ieioargs[0]), ) 105 106 if ieio == constants.IEIO_SCRIPT: 107 assert len(ieioargs) == 2 108 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 109 110 return ieioargs
111
112 113 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114 """Custom Request Executor class that ensures NodeHttpServer children are 115 locked in ram. 116 117 """
118 - def __init__(self, *args, **kwargs):
119 utils.Mlockall() 120 121 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122
123 124 -class NodeHttpServer(http.server.HttpServer):
125 """The server implementation. 126 127 This class holds all methods exposed over the RPC interface. 128 129 """ 130 # too many public methods, and unused args - all methods get params 131 # due to the API 132 # pylint: disable-msg=R0904,W0613
133 - def __init__(self, *args, **kwargs):
134 http.server.HttpServer.__init__(self, *args, **kwargs) 135 self.noded_pid = os.getpid()
136
137 - def HandleRequest(self, req):
138 """Handle a request. 139 140 """ 141 if req.request_method.upper() != http.HTTP_PUT: 142 raise http.HttpBadRequest() 143 144 path = req.request_path 145 if path.startswith("/"): 146 path = path[1:] 147 148 method = getattr(self, "perspective_%s" % path, None) 149 if method is None: 150 raise http.HttpNotFound() 151 152 try: 153 result = (True, method(serializer.LoadJson(req.request_body))) 154 155 except backend.RPCFail, err: 156 # our custom failure exception; str(err) works fine if the 157 # exception was constructed with a single argument, and in 158 # this case, err.message == err.args[0] == str(err) 159 result = (False, str(err)) 160 except errors.QuitGanetiException, err: 161 # Tell parent to quit 162 logging.info("Shutting down the node daemon, arguments: %s", 163 str(err.args)) 164 os.kill(self.noded_pid, signal.SIGTERM) 165 # And return the error's arguments, which must be already in 166 # correct tuple format 167 result = err.args 168 except Exception, err: 169 logging.exception("Error in RPC call") 170 result = (False, "Error while executing backend function: %s" % str(err)) 171 172 return serializer.DumpJson(result, indent=False)
173 174 # the new block devices -------------------------- 175 176 @staticmethod
177 - def perspective_blockdev_create(params):
178 """Create a block device. 179 180 """ 181 bdev_s, size, owner, on_primary, info = params 182 bdev = objects.Disk.FromDict(bdev_s) 183 if bdev is None: 184 raise ValueError("can't unserialize data!") 185 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186 187 @staticmethod
189 """Pause/resume sync of a block device. 190 191 """ 192 disks_s, pause = params 193 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 194 return backend.BlockdevPauseResumeSync(disks, pause)
195 196 @staticmethod
197 - def perspective_blockdev_wipe(params):
198 """Wipe a block device. 199 200 """ 201 bdev_s, offset, size = params 202 bdev = objects.Disk.FromDict(bdev_s) 203 return backend.BlockdevWipe(bdev, offset, size)
204 205 @staticmethod
206 - def perspective_blockdev_remove(params):
207 """Remove a block device. 208 209 """ 210 bdev_s = params[0] 211 bdev = objects.Disk.FromDict(bdev_s) 212 return backend.BlockdevRemove(bdev)
213 214 @staticmethod
215 - def perspective_blockdev_rename(params):
216 """Remove a block device. 217 218 """ 219 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params] 220 return backend.BlockdevRename(devlist)
221 222 @staticmethod
223 - def perspective_blockdev_assemble(params):
224 """Assemble a block device. 225 226 """ 227 bdev_s, owner, on_primary, idx = params 228 bdev = objects.Disk.FromDict(bdev_s) 229 if bdev is None: 230 raise ValueError("can't unserialize data!") 231 return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
232 233 @staticmethod
234 - def perspective_blockdev_shutdown(params):
235 """Shutdown a block device. 236 237 """ 238 bdev_s = params[0] 239 bdev = objects.Disk.FromDict(bdev_s) 240 if bdev is None: 241 raise ValueError("can't unserialize data!") 242 return backend.BlockdevShutdown(bdev)
243 244 @staticmethod
246 """Add a child to a mirror device. 247 248 Note: this is only valid for mirror devices. It's the caller's duty 249 to send a correct disk, otherwise we raise an error. 250 251 """ 252 bdev_s, ndev_s = params 253 bdev = objects.Disk.FromDict(bdev_s) 254 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 255 if bdev is None or ndevs.count(None) > 0: 256 raise ValueError("can't unserialize data!") 257 return backend.BlockdevAddchildren(bdev, ndevs)
258 259 @staticmethod
261 """Remove a child from a mirror device. 262 263 This is only valid for mirror devices, of course. It's the callers 264 duty to send a correct disk, otherwise we raise an error. 265 266 """ 267 bdev_s, ndev_s = params 268 bdev = objects.Disk.FromDict(bdev_s) 269 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 270 if bdev is None or ndevs.count(None) > 0: 271 raise ValueError("can't unserialize data!") 272 return backend.BlockdevRemovechildren(bdev, ndevs)
273 274 @staticmethod
276 """Return the mirror status for a list of disks. 277 278 """ 279 disks = [objects.Disk.FromDict(dsk_s) 280 for dsk_s in params] 281 return [status.ToDict() 282 for status in backend.BlockdevGetmirrorstatus(disks)]
283 284 @staticmethod
286 """Return the mirror status for a list of disks. 287 288 """ 289 (node_disks, ) = params 290 291 node_name = netutils.Hostname.GetSysName() 292 293 disks = [objects.Disk.FromDict(dsk_s) 294 for dsk_s in node_disks.get(node_name, [])] 295 296 result = [] 297 298 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 299 if success: 300 result.append((success, status.ToDict())) 301 else: 302 result.append((success, status)) 303 304 return result
305 306 @staticmethod
307 - def perspective_blockdev_find(params):
308 """Expose the FindBlockDevice functionality for a disk. 309 310 This will try to find but not activate a disk. 311 312 """ 313 disk = objects.Disk.FromDict(params[0]) 314 315 result = backend.BlockdevFind(disk) 316 if result is None: 317 return None 318 319 return result.ToDict()
320 321 @staticmethod
322 - def perspective_blockdev_snapshot(params):
323 """Create a snapshot device. 324 325 Note that this is only valid for LVM disks, if we get passed 326 something else we raise an exception. The snapshot device can be 327 remove by calling the generic block device remove call. 328 329 """ 330 cfbd = objects.Disk.FromDict(params[0]) 331 return backend.BlockdevSnapshot(cfbd)
332 333 @staticmethod
334 - def perspective_blockdev_grow(params):
335 """Grow a stack of devices. 336 337 """ 338 cfbd = objects.Disk.FromDict(params[0]) 339 amount = params[1] 340 return backend.BlockdevGrow(cfbd, amount)
341 342 @staticmethod
343 - def perspective_blockdev_close(params):
344 """Closes the given block devices. 345 346 """ 347 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 348 return backend.BlockdevClose(params[0], disks)
349 350 @staticmethod
351 - def perspective_blockdev_getsize(params):
352 """Compute the sizes of the given block devices. 353 354 """ 355 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 356 return backend.BlockdevGetsize(disks)
357 358 @staticmethod
359 - def perspective_blockdev_export(params):
360 """Compute the sizes of the given block devices. 361 362 """ 363 disk = objects.Disk.FromDict(params[0]) 364 dest_node, dest_path, cluster_name = params[1:] 365 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
366 367 # blockdev/drbd specific methods ---------- 368 369 @staticmethod
371 """Disconnects the network connection of drbd disks. 372 373 Note that this is only valid for drbd disks, so the members of the 374 disk list must all be drbd devices. 375 376 """ 377 nodes_ip, disks = params 378 disks = [objects.Disk.FromDict(cf) for cf in disks] 379 return backend.DrbdDisconnectNet(nodes_ip, disks)
380 381 @staticmethod
382 - def perspective_drbd_attach_net(params):
383 """Attaches the network connection of drbd disks. 384 385 Note that this is only valid for drbd disks, so the members of the 386 disk list must all be drbd devices. 387 388 """ 389 nodes_ip, disks, instance_name, multimaster = params 390 disks = [objects.Disk.FromDict(cf) for cf in disks] 391 return backend.DrbdAttachNet(nodes_ip, disks, 392 instance_name, multimaster)
393 394 @staticmethod
395 - def perspective_drbd_wait_sync(params):
396 """Wait until DRBD disks are synched. 397 398 Note that this is only valid for drbd disks, so the members of the 399 disk list must all be drbd devices. 400 401 """ 402 nodes_ip, disks = params 403 disks = [objects.Disk.FromDict(cf) for cf in disks] 404 return backend.DrbdWaitSync(nodes_ip, disks)
405 406 @staticmethod
407 - def perspective_drbd_helper(params):
408 """Query drbd helper. 409 410 """ 411 return backend.GetDrbdUsermodeHelper()
412 413 # export/import -------------------------- 414 415 @staticmethod
416 - def perspective_finalize_export(params):
417 """Expose the finalize export functionality. 418 419 """ 420 instance = objects.Instance.FromDict(params[0]) 421 422 snap_disks = [] 423 for disk in params[1]: 424 if isinstance(disk, bool): 425 snap_disks.append(disk) 426 else: 427 snap_disks.append(objects.Disk.FromDict(disk)) 428 429 return backend.FinalizeExport(instance, snap_disks)
430 431 @staticmethod
432 - def perspective_export_info(params):
433 """Query information about an existing export on this node. 434 435 The given path may not contain an export, in which case we return 436 None. 437 438 """ 439 path = params[0] 440 return backend.ExportInfo(path)
441 442 @staticmethod
443 - def perspective_export_list(params):
444 """List the available exports on this node. 445 446 Note that as opposed to export_info, which may query data about an 447 export in any path, this only queries the standard Ganeti path 448 (constants.EXPORT_DIR). 449 450 """ 451 return backend.ListExports()
452 453 @staticmethod
454 - def perspective_export_remove(params):
455 """Remove an export. 456 457 """ 458 export = params[0] 459 return backend.RemoveExport(export)
460 461 # volume -------------------------- 462 463 @staticmethod
464 - def perspective_lv_list(params):
465 """Query the list of logical volumes in a given volume group. 466 467 """ 468 vgname = params[0] 469 return backend.GetVolumeList(vgname)
470 471 @staticmethod
472 - def perspective_vg_list(params):
473 """Query the list of volume groups. 474 475 """ 476 return backend.ListVolumeGroups()
477 478 # Storage -------------------------- 479 480 @staticmethod
481 - def perspective_storage_list(params):
482 """Get list of storage units. 483 484 """ 485 (su_name, su_args, name, fields) = params 486 return storage.GetStorage(su_name, *su_args).List(name, fields)
487 488 @staticmethod
489 - def perspective_storage_modify(params):
490 """Modify a storage unit. 491 492 """ 493 (su_name, su_args, name, changes) = params 494 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
495 496 @staticmethod
497 - def perspective_storage_execute(params):
498 """Execute an operation on a storage unit. 499 500 """ 501 (su_name, su_args, name, op) = params 502 return storage.GetStorage(su_name, *su_args).Execute(name, op)
503 504 # bridge -------------------------- 505 506 @staticmethod
507 - def perspective_bridges_exist(params):
508 """Check if all bridges given exist on this node. 509 510 """ 511 bridges_list = params[0] 512 return backend.BridgesExist(bridges_list)
513 514 # instance -------------------------- 515 516 @staticmethod
517 - def perspective_instance_os_add(params):
518 """Install an OS on a given instance. 519 520 """ 521 inst_s = params[0] 522 inst = objects.Instance.FromDict(inst_s) 523 reinstall = params[1] 524 debug = params[2] 525 return backend.InstanceOsAdd(inst, reinstall, debug)
526 527 @staticmethod
529 """Runs the OS rename script for an instance. 530 531 """ 532 inst_s, old_name, debug = params 533 inst = objects.Instance.FromDict(inst_s) 534 return backend.RunRenameInstance(inst, old_name, debug)
535 536 @staticmethod
537 - def perspective_instance_shutdown(params):
538 """Shutdown an instance. 539 540 """ 541 instance = objects.Instance.FromDict(params[0]) 542 timeout = params[1] 543 return backend.InstanceShutdown(instance, timeout)
544 545 @staticmethod
546 - def perspective_instance_start(params):
547 """Start an instance. 548 549 """ 550 instance = objects.Instance.FromDict(params[0]) 551 return backend.StartInstance(instance)
552 553 @staticmethod
554 - def perspective_migration_info(params):
555 """Gather information about an instance to be migrated. 556 557 """ 558 instance = objects.Instance.FromDict(params[0]) 559 return backend.MigrationInfo(instance)
560 561 @staticmethod
562 - def perspective_accept_instance(params):
563 """Prepare the node to accept an instance. 564 565 """ 566 instance, info, target = params 567 instance = objects.Instance.FromDict(instance) 568 return backend.AcceptInstance(instance, info, target)
569 570 @staticmethod
572 """Finalize the instance migration. 573 574 """ 575 instance, info, success = params 576 instance = objects.Instance.FromDict(instance) 577 return backend.FinalizeMigration(instance, info, success)
578 579 @staticmethod
580 - def perspective_instance_migrate(params):
581 """Migrates an instance. 582 583 """ 584 instance, target, live = params 585 instance = objects.Instance.FromDict(instance) 586 return backend.MigrateInstance(instance, target, live)
587 588 @staticmethod
589 - def perspective_instance_reboot(params):
590 """Reboot an instance. 591 592 """ 593 instance = objects.Instance.FromDict(params[0]) 594 reboot_type = params[1] 595 shutdown_timeout = params[2] 596 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
597 598 @staticmethod
599 - def perspective_instance_info(params):
600 """Query instance information. 601 602 """ 603 return backend.GetInstanceInfo(params[0], params[1])
604 605 @staticmethod
607 """Query whether the specified instance can be migrated. 608 609 """ 610 instance = objects.Instance.FromDict(params[0]) 611 return backend.GetInstanceMigratable(instance)
612 613 @staticmethod
615 """Query information about all instances. 616 617 """ 618 return backend.GetAllInstancesInfo(params[0])
619 620 @staticmethod
621 - def perspective_instance_list(params):
622 """Query the list of running instances. 623 624 """ 625 return backend.GetInstanceList(params[0])
626 627 # node -------------------------- 628 629 @staticmethod
630 - def perspective_node_tcp_ping(params):
631 """Do a TcpPing on the remote node. 632 633 """ 634 return netutils.TcpPing(params[1], params[2], timeout=params[3], 635 live_port_needed=params[4], source=params[0])
636 637 @staticmethod
639 """Checks if a node has the given ip address. 640 641 """ 642 return netutils.IPAddress.Own(params[0])
643 644 @staticmethod
645 - def perspective_node_info(params):
646 """Query node information. 647 648 """ 649 vgname, hypervisor_type = params 650 return backend.GetNodeInfo(vgname, hypervisor_type)
651 652 @staticmethod
653 - def perspective_etc_hosts_modify(params):
654 """Modify a node entry in /etc/hosts. 655 656 """ 657 backend.EtcHostsModify(params[0], params[1], params[2]) 658 659 return True
660 661 @staticmethod
662 - def perspective_node_verify(params):
663 """Run a verify sequence on this node. 664 665 """ 666 return backend.VerifyNode(params[0], params[1])
667 668 @staticmethod
669 - def perspective_node_start_master(params):
670 """Promote this node to master status. 671 672 """ 673 return backend.StartMaster(params[0], params[1])
674 675 @staticmethod
676 - def perspective_node_stop_master(params):
677 """Demote this node from master status. 678 679 """ 680 return backend.StopMaster(params[0])
681 682 @staticmethod
684 """Cleanup after leaving a cluster. 685 686 """ 687 return backend.LeaveCluster(params[0])
688 689 @staticmethod
690 - def perspective_node_volumes(params):
691 """Query the list of all logical volume groups. 692 693 """ 694 return backend.NodeVolumes()
695 696 @staticmethod
698 """Demote a node from the master candidate role. 699 700 """ 701 return backend.DemoteFromMC()
702 703 704 @staticmethod
705 - def perspective_node_powercycle(params):
706 """Tries to powercycle the nod. 707 708 """ 709 hypervisor_type = params[0] 710 return backend.PowercycleNode(hypervisor_type)
711 712 713 # cluster -------------------------- 714 715 @staticmethod
716 - def perspective_version(params):
717 """Query version information. 718 719 """ 720 return constants.PROTOCOL_VERSION
721 722 @staticmethod
723 - def perspective_upload_file(params):
724 """Upload a file. 725 726 Note that the backend implementation imposes strict rules on which 727 files are accepted. 728 729 """ 730 return backend.UploadFile(*params)
731 732 @staticmethod
733 - def perspective_master_info(params):
734 """Query master information. 735 736 """ 737 return backend.GetMasterInfo()
738 739 @staticmethod
740 - def perspective_run_oob(params):
741 """Runs oob on node. 742 743 """ 744 output = backend.RunOob(params[0], params[1], params[2], params[3]) 745 if output: 746 result = serializer.LoadJson(output) 747 else: 748 result = None 749 return result
750 751 @staticmethod
753 """Write ssconf files. 754 755 """ 756 (values,) = params 757 return backend.WriteSsconfFiles(values)
758 759 # os ----------------------- 760 761 @staticmethod
762 - def perspective_os_diagnose(params):
763 """Query detailed information about existing OSes. 764 765 """ 766 return backend.DiagnoseOS()
767 768 @staticmethod
769 - def perspective_os_get(params):
770 """Query information about a given OS. 771 772 """ 773 name = params[0] 774 os_obj = backend.OSFromDisk(name) 775 return os_obj.ToDict()
776 777 @staticmethod
778 - def perspective_os_validate(params):
779 """Run a given OS' validation routine. 780 781 """ 782 required, name, checks, params = params 783 return backend.ValidateOS(required, name, checks, params)
784 785 # hooks ----------------------- 786 787 @staticmethod
788 - def perspective_hooks_runner(params):
789 """Run hook scripts. 790 791 """ 792 hpath, phase, env = params 793 hr = backend.HooksRunner() 794 return hr.RunHooks(hpath, phase, env)
795 796 # iallocator ----------------- 797 798 @staticmethod
799 - def perspective_iallocator_runner(params):
800 """Run an iallocator script. 801 802 """ 803 name, idata = params 804 iar = backend.IAllocatorRunner() 805 return iar.Run(name, idata)
806 807 # test ----------------------- 808 809 @staticmethod
810 - def perspective_test_delay(params):
811 """Run test delay. 812 813 """ 814 duration = params[0] 815 status, rval = utils.TestDelay(duration) 816 if not status: 817 raise backend.RPCFail(rval) 818 return rval
819 820 # file storage --------------- 821 822 @staticmethod
824 """Create the file storage directory. 825 826 """ 827 file_storage_dir = params[0] 828 return backend.CreateFileStorageDir(file_storage_dir)
829 830 @staticmethod
832 """Remove the file storage directory. 833 834 """ 835 file_storage_dir = params[0] 836 return backend.RemoveFileStorageDir(file_storage_dir)
837 838 @staticmethod
840 """Rename the file storage directory. 841 842 """ 843 old_file_storage_dir = params[0] 844 new_file_storage_dir = params[1] 845 return backend.RenameFileStorageDir(old_file_storage_dir, 846 new_file_storage_dir)
847 848 # jobs ------------------------ 849 850 @staticmethod 851 @_RequireJobQueueLock
852 - def perspective_jobqueue_update(params):
853 """Update job queue. 854 855 """ 856 (file_name, content) = params 857 return backend.JobQueueUpdate(file_name, content)
858 859 @staticmethod 860 @_RequireJobQueueLock
861 - def perspective_jobqueue_purge(params):
862 """Purge job queue. 863 864 """ 865 return backend.JobQueuePurge()
866 867 @staticmethod 868 @_RequireJobQueueLock
869 - def perspective_jobqueue_rename(params):
870 """Rename a job queue file. 871 872 """ 873 # TODO: What if a file fails to rename? 874 return [backend.JobQueueRename(old, new) for old, new in params]
875 876 # hypervisor --------------- 877 878 @staticmethod
880 """Validate the hypervisor parameters. 881 882 """ 883 (hvname, hvparams) = params 884 return backend.ValidateHVParams(hvname, hvparams)
885 886 # Crypto 887 888 @staticmethod
889 - def perspective_x509_cert_create(params):
890 """Creates a new X509 certificate for SSL/TLS. 891 892 """ 893 (validity, ) = params 894 return backend.CreateX509Certificate(validity)
895 896 @staticmethod
897 - def perspective_x509_cert_remove(params):
898 """Removes a X509 certificate. 899 900 """ 901 (name, ) = params 902 return backend.RemoveX509Certificate(name)
903 904 # Import and export 905 906 @staticmethod
907 - def perspective_import_start(params):
908 """Starts an import daemon. 909 910 """ 911 (opts_s, instance, dest, dest_args) = params 912 913 opts = objects.ImportExportOptions.FromDict(opts_s) 914 915 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 916 None, None, 917 objects.Instance.FromDict(instance), 918 dest, 919 _DecodeImportExportIO(dest, 920 dest_args))
921 922 @staticmethod
923 - def perspective_export_start(params):
924 """Starts an export daemon. 925 926 """ 927 (opts_s, host, port, instance, source, source_args) = params 928 929 opts = objects.ImportExportOptions.FromDict(opts_s) 930 931 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 932 host, port, 933 objects.Instance.FromDict(instance), 934 source, 935 _DecodeImportExportIO(source, 936 source_args))
937 938 @staticmethod
939 - def perspective_impexp_status(params):
940 """Retrieves the status of an import or export daemon. 941 942 """ 943 return backend.GetImportExportStatus(params[0])
944 945 @staticmethod
946 - def perspective_impexp_abort(params):
947 """Aborts an import or export. 948 949 """ 950 return backend.AbortImportExport(params[0])
951 952 @staticmethod
953 - def perspective_impexp_cleanup(params):
954 """Cleans up after an import or export. 955 956 """ 957 return backend.CleanupImportExport(params[0])
958
959 960 -def CheckNoded(_, args):
961 """Initial checks whether to run or exit with a failure. 962 963 """ 964 if args: # noded doesn't take any arguments 965 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 966 sys.argv[0]) 967 sys.exit(constants.EXIT_FAILURE) 968 try: 969 codecs.lookup("string-escape") 970 except LookupError: 971 print >> sys.stderr, ("Can't load the string-escape code which is part" 972 " of the Python installation. Is your installation" 973 " complete/correct? Aborting.") 974 sys.exit(constants.EXIT_FAILURE)
975
976 977 -def PrepNoded(options, _):
978 """Preparation node daemon function, executed with the PID file held. 979 980 """ 981 if options.mlock: 982 request_executor_class = MlockallRequestExecutor 983 try: 984 utils.Mlockall() 985 except errors.NoCtypesError: 986 logging.warning("Cannot set memory lock, ctypes module not found") 987 request_executor_class = http.server.HttpServerRequestExecutor 988 else: 989 request_executor_class = http.server.HttpServerRequestExecutor 990 991 # Read SSL certificate 992 if options.ssl: 993 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 994 ssl_cert_path=options.ssl_cert) 995 else: 996 ssl_params = None 997 998 err = _PrepareQueueLock() 999 if err is not None: 1000 # this might be some kind of file-system/permission error; while 1001 # this breaks the job queue functionality, we shouldn't prevent 1002 # startup of the whole node daemon because of this 1003 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1004 1005 mainloop = daemon.Mainloop() 1006 server = NodeHttpServer(mainloop, options.bind_address, options.port, 1007 ssl_params=ssl_params, ssl_verify_peer=True, 1008 request_executor_class=request_executor_class) 1009 server.Start() 1010 return (mainloop, server)
1011
1012 1013 -def ExecNoded(options, args, prep_data): # pylint: disable-msg=W0613
1014 """Main node daemon function, executed with the PID file held. 1015 1016 """ 1017 (mainloop, server) = prep_data 1018 try: 1019 mainloop.Run() 1020 finally: 1021 server.Stop() 1022
1023 1024 -def Main():
1025 """Main function for the node daemon. 1026 1027 """ 1028 parser = OptionParser(description="Ganeti node daemon", 1029 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", 1030 version="%%prog (ganeti) %s" % 1031 constants.RELEASE_VERSION) 1032 parser.add_option("--no-mlock", dest="mlock", 1033 help="Do not mlock the node memory in ram", 1034 default=True, action="store_false") 1035 1036 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1037 default_ssl_cert=constants.NODED_CERT_FILE, 1038 default_ssl_key=constants.NODED_CERT_FILE, 1039 console_logging=True)
1040