Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Ganeti node daemon""" 
  23   
  24  # pylint: disable-msg=C0103,W0142 
  25   
  26  # C0103: Functions in this module need to have a given name structure, 
  27  # and the name of the daemon doesn't match 
  28   
  29  # W0142: Used * or ** magic, since we do use it extensively in this 
  30  # module 
  31   
  32  import os 
  33  import sys 
  34  import logging 
  35  import signal 
  36   
  37  from optparse import OptionParser 
  38   
  39  from ganeti import backend 
  40  from ganeti import constants 
  41  from ganeti import objects 
  42  from ganeti import errors 
  43  from ganeti import jstore 
  44  from ganeti import daemon 
  45  from ganeti import http 
  46  from ganeti import utils 
  47  from ganeti import storage 
  48  from ganeti import serializer 
  49  from ganeti import netutils 
  50   
  51  import ganeti.http.server # pylint: disable-msg=W0611 
  52   
  53   
  54  queue_lock = None 
55 56 57 -def _PrepareQueueLock():
58 """Try to prepare the queue lock. 59 60 @return: None for success, otherwise an exception object 61 62 """ 63 global queue_lock # pylint: disable-msg=W0603 64 65 if queue_lock is not None: 66 return None 67 68 # Prepare job queue 69 try: 70 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 71 return None 72 except EnvironmentError, err: 73 return err
74
75 76 -def _RequireJobQueueLock(fn):
77 """Decorator for job queue manipulating functions. 78 79 """ 80 QUEUE_LOCK_TIMEOUT = 10 81 82 def wrapper(*args, **kwargs): 83 # Locking in exclusive, blocking mode because there could be several 84 # children running at the same time. Waiting up to 10 seconds. 85 if _PrepareQueueLock() is not None: 86 raise errors.JobQueueError("Job queue failed initialization," 87 " cannot update jobs") 88 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 89 try: 90 return fn(*args, **kwargs) 91 finally: 92 queue_lock.Unlock()
93 94 return wrapper 95
96 97 -def _DecodeImportExportIO(ieio, ieioargs):
98 """Decodes import/export I/O information. 99 100 """ 101 if ieio == constants.IEIO_RAW_DISK: 102 assert len(ieioargs) == 1 103 return (objects.Disk.FromDict(ieioargs[0]), ) 104 105 if ieio == constants.IEIO_SCRIPT: 106 assert len(ieioargs) == 2 107 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 108 109 return ieioargs
110
111 112 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
113 """Custom Request Executor class that ensures NodeHttpServer children are 114 locked in ram. 115 116 """
117 - def __init__(self, *args, **kwargs):
118 utils.Mlockall() 119 120 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
121
122 123 -class NodeHttpServer(http.server.HttpServer):
124 """The server implementation. 125 126 This class holds all methods exposed over the RPC interface. 127 128 """ 129 # too many public methods, and unused args - all methods get params 130 # due to the API 131 # pylint: disable-msg=R0904,W0613
132 - def __init__(self, *args, **kwargs):
133 http.server.HttpServer.__init__(self, *args, **kwargs) 134 self.noded_pid = os.getpid()
135
136 - def HandleRequest(self, req):
137 """Handle a request. 138 139 """ 140 if req.request_method.upper() != http.HTTP_PUT: 141 raise http.HttpBadRequest() 142 143 path = req.request_path 144 if path.startswith("/"): 145 path = path[1:] 146 147 method = getattr(self, "perspective_%s" % path, None) 148 if method is None: 149 raise http.HttpNotFound() 150 151 try: 152 result = (True, method(serializer.LoadJson(req.request_body))) 153 154 except backend.RPCFail, err: 155 # our custom failure exception; str(err) works fine if the 156 # exception was constructed with a single argument, and in 157 # this case, err.message == err.args[0] == str(err) 158 result = (False, str(err)) 159 except errors.QuitGanetiException, err: 160 # Tell parent to quit 161 logging.info("Shutting down the node daemon, arguments: %s", 162 str(err.args)) 163 os.kill(self.noded_pid, signal.SIGTERM) 164 # And return the error's arguments, which must be already in 165 # correct tuple format 166 result = err.args 167 except Exception, err: 168 logging.exception("Error in RPC call") 169 result = (False, "Error while executing backend function: %s" % str(err)) 170 171 return serializer.DumpJson(result, indent=False)
172 173 # the new block devices -------------------------- 174 175 @staticmethod
176 - def perspective_blockdev_create(params):
177 """Create a block device. 178 179 """ 180 bdev_s, size, owner, on_primary, info = params 181 bdev = objects.Disk.FromDict(bdev_s) 182 if bdev is None: 183 raise ValueError("can't unserialize data!") 184 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
185 186 @staticmethod
187 - def perspective_blockdev_wipe(params):
188 """Wipe a block device. 189 190 """ 191 bdev_s, offset, size = params 192 bdev = objects.Disk.FromDict(bdev_s) 193 return backend.BlockdevWipe(bdev, offset, size)
194 195 @staticmethod
196 - def perspective_blockdev_remove(params):
197 """Remove a block device. 198 199 """ 200 bdev_s = params[0] 201 bdev = objects.Disk.FromDict(bdev_s) 202 return backend.BlockdevRemove(bdev)
203 204 @staticmethod
205 - def perspective_blockdev_rename(params):
206 """Remove a block device. 207 208 """ 209 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params] 210 return backend.BlockdevRename(devlist)
211 212 @staticmethod
213 - def perspective_blockdev_assemble(params):
214 """Assemble a block device. 215 216 """ 217 bdev_s, owner, on_primary = params 218 bdev = objects.Disk.FromDict(bdev_s) 219 if bdev is None: 220 raise ValueError("can't unserialize data!") 221 return backend.BlockdevAssemble(bdev, owner, on_primary)
222 223 @staticmethod
224 - def perspective_blockdev_shutdown(params):
225 """Shutdown a block device. 226 227 """ 228 bdev_s = params[0] 229 bdev = objects.Disk.FromDict(bdev_s) 230 if bdev is None: 231 raise ValueError("can't unserialize data!") 232 return backend.BlockdevShutdown(bdev)
233 234 @staticmethod
236 """Add a child to a mirror device. 237 238 Note: this is only valid for mirror devices. It's the caller's duty 239 to send a correct disk, otherwise we raise an error. 240 241 """ 242 bdev_s, ndev_s = params 243 bdev = objects.Disk.FromDict(bdev_s) 244 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 245 if bdev is None or ndevs.count(None) > 0: 246 raise ValueError("can't unserialize data!") 247 return backend.BlockdevAddchildren(bdev, ndevs)
248 249 @staticmethod
251 """Remove a child from a mirror device. 252 253 This is only valid for mirror devices, of course. It's the callers 254 duty to send a correct disk, otherwise we raise an error. 255 256 """ 257 bdev_s, ndev_s = params 258 bdev = objects.Disk.FromDict(bdev_s) 259 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 260 if bdev is None or ndevs.count(None) > 0: 261 raise ValueError("can't unserialize data!") 262 return backend.BlockdevRemovechildren(bdev, ndevs)
263 264 @staticmethod
266 """Return the mirror status for a list of disks. 267 268 """ 269 disks = [objects.Disk.FromDict(dsk_s) 270 for dsk_s in params] 271 return [status.ToDict() 272 for status in backend.BlockdevGetmirrorstatus(disks)]
273 274 @staticmethod
276 """Return the mirror status for a list of disks. 277 278 """ 279 (node_disks, ) = params 280 281 node_name = netutils.Hostname.GetSysName() 282 283 disks = [objects.Disk.FromDict(dsk_s) 284 for dsk_s in node_disks.get(node_name, [])] 285 286 result = [] 287 288 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 289 if success: 290 result.append((success, status.ToDict())) 291 else: 292 result.append((success, status)) 293 294 return result
295 296 @staticmethod
297 - def perspective_blockdev_find(params):
298 """Expose the FindBlockDevice functionality for a disk. 299 300 This will try to find but not activate a disk. 301 302 """ 303 disk = objects.Disk.FromDict(params[0]) 304 305 result = backend.BlockdevFind(disk) 306 if result is None: 307 return None 308 309 return result.ToDict()
310 311 @staticmethod
312 - def perspective_blockdev_snapshot(params):
313 """Create a snapshot device. 314 315 Note that this is only valid for LVM disks, if we get passed 316 something else we raise an exception. The snapshot device can be 317 remove by calling the generic block device remove call. 318 319 """ 320 cfbd = objects.Disk.FromDict(params[0]) 321 return backend.BlockdevSnapshot(cfbd)
322 323 @staticmethod
324 - def perspective_blockdev_grow(params):
325 """Grow a stack of devices. 326 327 """ 328 cfbd = objects.Disk.FromDict(params[0]) 329 amount = params[1] 330 return backend.BlockdevGrow(cfbd, amount)
331 332 @staticmethod
333 - def perspective_blockdev_close(params):
334 """Closes the given block devices. 335 336 """ 337 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 338 return backend.BlockdevClose(params[0], disks)
339 340 @staticmethod
341 - def perspective_blockdev_getsize(params):
342 """Compute the sizes of the given block devices. 343 344 """ 345 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 346 return backend.BlockdevGetsize(disks)
347 348 @staticmethod
349 - def perspective_blockdev_export(params):
350 """Compute the sizes of the given block devices. 351 352 """ 353 disk = objects.Disk.FromDict(params[0]) 354 dest_node, dest_path, cluster_name = params[1:] 355 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
356 357 # blockdev/drbd specific methods ---------- 358 359 @staticmethod
361 """Disconnects the network connection of drbd disks. 362 363 Note that this is only valid for drbd disks, so the members of the 364 disk list must all be drbd devices. 365 366 """ 367 nodes_ip, disks = params 368 disks = [objects.Disk.FromDict(cf) for cf in disks] 369 return backend.DrbdDisconnectNet(nodes_ip, disks)
370 371 @staticmethod
372 - def perspective_drbd_attach_net(params):
373 """Attaches the network connection of drbd disks. 374 375 Note that this is only valid for drbd disks, so the members of the 376 disk list must all be drbd devices. 377 378 """ 379 nodes_ip, disks, instance_name, multimaster = params 380 disks = [objects.Disk.FromDict(cf) for cf in disks] 381 return backend.DrbdAttachNet(nodes_ip, disks, 382 instance_name, multimaster)
383 384 @staticmethod
385 - def perspective_drbd_wait_sync(params):
386 """Wait until DRBD disks are synched. 387 388 Note that this is only valid for drbd disks, so the members of the 389 disk list must all be drbd devices. 390 391 """ 392 nodes_ip, disks = params 393 disks = [objects.Disk.FromDict(cf) for cf in disks] 394 return backend.DrbdWaitSync(nodes_ip, disks)
395 396 @staticmethod
397 - def perspective_drbd_helper(params):
398 """Query drbd helper. 399 400 """ 401 return backend.GetDrbdUsermodeHelper()
402 403 # export/import -------------------------- 404 405 @staticmethod
406 - def perspective_finalize_export(params):
407 """Expose the finalize export functionality. 408 409 """ 410 instance = objects.Instance.FromDict(params[0]) 411 412 snap_disks = [] 413 for disk in params[1]: 414 if isinstance(disk, bool): 415 snap_disks.append(disk) 416 else: 417 snap_disks.append(objects.Disk.FromDict(disk)) 418 419 return backend.FinalizeExport(instance, snap_disks)
420 421 @staticmethod
422 - def perspective_export_info(params):
423 """Query information about an existing export on this node. 424 425 The given path may not contain an export, in which case we return 426 None. 427 428 """ 429 path = params[0] 430 return backend.ExportInfo(path)
431 432 @staticmethod
433 - def perspective_export_list(params):
434 """List the available exports on this node. 435 436 Note that as opposed to export_info, which may query data about an 437 export in any path, this only queries the standard Ganeti path 438 (constants.EXPORT_DIR). 439 440 """ 441 return backend.ListExports()
442 443 @staticmethod
444 - def perspective_export_remove(params):
445 """Remove an export. 446 447 """ 448 export = params[0] 449 return backend.RemoveExport(export)
450 451 # volume -------------------------- 452 453 @staticmethod
454 - def perspective_lv_list(params):
455 """Query the list of logical volumes in a given volume group. 456 457 """ 458 vgname = params[0] 459 return backend.GetVolumeList(vgname)
460 461 @staticmethod
462 - def perspective_vg_list(params):
463 """Query the list of volume groups. 464 465 """ 466 return backend.ListVolumeGroups()
467 468 # Storage -------------------------- 469 470 @staticmethod
471 - def perspective_storage_list(params):
472 """Get list of storage units. 473 474 """ 475 (su_name, su_args, name, fields) = params 476 return storage.GetStorage(su_name, *su_args).List(name, fields)
477 478 @staticmethod
479 - def perspective_storage_modify(params):
480 """Modify a storage unit. 481 482 """ 483 (su_name, su_args, name, changes) = params 484 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
485 486 @staticmethod
487 - def perspective_storage_execute(params):
488 """Execute an operation on a storage unit. 489 490 """ 491 (su_name, su_args, name, op) = params 492 return storage.GetStorage(su_name, *su_args).Execute(name, op)
493 494 # bridge -------------------------- 495 496 @staticmethod
497 - def perspective_bridges_exist(params):
498 """Check if all bridges given exist on this node. 499 500 """ 501 bridges_list = params[0] 502 return backend.BridgesExist(bridges_list)
503 504 # instance -------------------------- 505 506 @staticmethod
507 - def perspective_instance_os_add(params):
508 """Install an OS on a given instance. 509 510 """ 511 inst_s = params[0] 512 inst = objects.Instance.FromDict(inst_s) 513 reinstall = params[1] 514 debug = params[2] 515 return backend.InstanceOsAdd(inst, reinstall, debug)
516 517 @staticmethod
519 """Runs the OS rename script for an instance. 520 521 """ 522 inst_s, old_name, debug = params 523 inst = objects.Instance.FromDict(inst_s) 524 return backend.RunRenameInstance(inst, old_name, debug)
525 526 @staticmethod
527 - def perspective_instance_shutdown(params):
528 """Shutdown an instance. 529 530 """ 531 instance = objects.Instance.FromDict(params[0]) 532 timeout = params[1] 533 return backend.InstanceShutdown(instance, timeout)
534 535 @staticmethod
536 - def perspective_instance_start(params):
537 """Start an instance. 538 539 """ 540 instance = objects.Instance.FromDict(params[0]) 541 return backend.StartInstance(instance)
542 543 @staticmethod
544 - def perspective_migration_info(params):
545 """Gather information about an instance to be migrated. 546 547 """ 548 instance = objects.Instance.FromDict(params[0]) 549 return backend.MigrationInfo(instance)
550 551 @staticmethod
552 - def perspective_accept_instance(params):
553 """Prepare the node to accept an instance. 554 555 """ 556 instance, info, target = params 557 instance = objects.Instance.FromDict(instance) 558 return backend.AcceptInstance(instance, info, target)
559 560 @staticmethod
562 """Finalize the instance migration. 563 564 """ 565 instance, info, success = params 566 instance = objects.Instance.FromDict(instance) 567 return backend.FinalizeMigration(instance, info, success)
568 569 @staticmethod
570 - def perspective_instance_migrate(params):
571 """Migrates an instance. 572 573 """ 574 instance, target, live = params 575 instance = objects.Instance.FromDict(instance) 576 return backend.MigrateInstance(instance, target, live)
577 578 @staticmethod
579 - def perspective_instance_reboot(params):
580 """Reboot an instance. 581 582 """ 583 instance = objects.Instance.FromDict(params[0]) 584 reboot_type = params[1] 585 shutdown_timeout = params[2] 586 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
587 588 @staticmethod
589 - def perspective_instance_info(params):
590 """Query instance information. 591 592 """ 593 return backend.GetInstanceInfo(params[0], params[1])
594 595 @staticmethod
597 """Query whether the specified instance can be migrated. 598 599 """ 600 instance = objects.Instance.FromDict(params[0]) 601 return backend.GetInstanceMigratable(instance)
602 603 @staticmethod
605 """Query information about all instances. 606 607 """ 608 return backend.GetAllInstancesInfo(params[0])
609 610 @staticmethod
611 - def perspective_instance_list(params):
612 """Query the list of running instances. 613 614 """ 615 return backend.GetInstanceList(params[0])
616 617 # node -------------------------- 618 619 @staticmethod
620 - def perspective_node_tcp_ping(params):
621 """Do a TcpPing on the remote node. 622 623 """ 624 return netutils.TcpPing(params[1], params[2], timeout=params[3], 625 live_port_needed=params[4], source=params[0])
626 627 @staticmethod
629 """Checks if a node has the given ip address. 630 631 """ 632 return netutils.IPAddress.Own(params[0])
633 634 @staticmethod
635 - def perspective_node_info(params):
636 """Query node information. 637 638 """ 639 vgname, hypervisor_type = params 640 return backend.GetNodeInfo(vgname, hypervisor_type)
641 642 @staticmethod
643 - def perspective_etc_hosts_modify(params):
644 """Modify a node entry in /etc/hosts. 645 646 """ 647 backend.EtcHostsModify(params[0], params[1], params[2]) 648 649 return True
650 651 @staticmethod
652 - def perspective_node_verify(params):
653 """Run a verify sequence on this node. 654 655 """ 656 return backend.VerifyNode(params[0], params[1])
657 658 @staticmethod
659 - def perspective_node_start_master(params):
660 """Promote this node to master status. 661 662 """ 663 return backend.StartMaster(params[0], params[1])
664 665 @staticmethod
666 - def perspective_node_stop_master(params):
667 """Demote this node from master status. 668 669 """ 670 return backend.StopMaster(params[0])
671 672 @staticmethod
674 """Cleanup after leaving a cluster. 675 676 """ 677 return backend.LeaveCluster(params[0])
678 679 @staticmethod
680 - def perspective_node_volumes(params):
681 """Query the list of all logical volume groups. 682 683 """ 684 return backend.NodeVolumes()
685 686 @staticmethod
688 """Demote a node from the master candidate role. 689 690 """ 691 return backend.DemoteFromMC()
692 693 694 @staticmethod
695 - def perspective_node_powercycle(params):
696 """Tries to powercycle the nod. 697 698 """ 699 hypervisor_type = params[0] 700 return backend.PowercycleNode(hypervisor_type)
701 702 703 # cluster -------------------------- 704 705 @staticmethod
706 - def perspective_version(params):
707 """Query version information. 708 709 """ 710 return constants.PROTOCOL_VERSION
711 712 @staticmethod
713 - def perspective_upload_file(params):
714 """Upload a file. 715 716 Note that the backend implementation imposes strict rules on which 717 files are accepted. 718 719 """ 720 return backend.UploadFile(*params)
721 722 @staticmethod
723 - def perspective_master_info(params):
724 """Query master information. 725 726 """ 727 return backend.GetMasterInfo()
728 729 @staticmethod
731 """Write ssconf files. 732 733 """ 734 (values,) = params 735 return backend.WriteSsconfFiles(values)
736 737 # os ----------------------- 738 739 @staticmethod
740 - def perspective_os_diagnose(params):
741 """Query detailed information about existing OSes. 742 743 """ 744 return backend.DiagnoseOS()
745 746 @staticmethod
747 - def perspective_os_get(params):
748 """Query information about a given OS. 749 750 """ 751 name = params[0] 752 os_obj = backend.OSFromDisk(name) 753 return os_obj.ToDict()
754 755 @staticmethod
756 - def perspective_os_validate(params):
757 """Run a given OS' validation routine. 758 759 """ 760 required, name, checks, params = params 761 return backend.ValidateOS(required, name, checks, params)
762 763 # hooks ----------------------- 764 765 @staticmethod
766 - def perspective_hooks_runner(params):
767 """Run hook scripts. 768 769 """ 770 hpath, phase, env = params 771 hr = backend.HooksRunner() 772 return hr.RunHooks(hpath, phase, env)
773 774 # iallocator ----------------- 775 776 @staticmethod
777 - def perspective_iallocator_runner(params):
778 """Run an iallocator script. 779 780 """ 781 name, idata = params 782 iar = backend.IAllocatorRunner() 783 return iar.Run(name, idata)
784 785 # test ----------------------- 786 787 @staticmethod
788 - def perspective_test_delay(params):
789 """Run test delay. 790 791 """ 792 duration = params[0] 793 status, rval = utils.TestDelay(duration) 794 if not status: 795 raise backend.RPCFail(rval) 796 return rval
797 798 # file storage --------------- 799 800 @staticmethod
802 """Create the file storage directory. 803 804 """ 805 file_storage_dir = params[0] 806 return backend.CreateFileStorageDir(file_storage_dir)
807 808 @staticmethod
810 """Remove the file storage directory. 811 812 """ 813 file_storage_dir = params[0] 814 return backend.RemoveFileStorageDir(file_storage_dir)
815 816 @staticmethod
818 """Rename the file storage directory. 819 820 """ 821 old_file_storage_dir = params[0] 822 new_file_storage_dir = params[1] 823 return backend.RenameFileStorageDir(old_file_storage_dir, 824 new_file_storage_dir)
825 826 # jobs ------------------------ 827 828 @staticmethod 829 @_RequireJobQueueLock
830 - def perspective_jobqueue_update(params):
831 """Update job queue. 832 833 """ 834 (file_name, content) = params 835 return backend.JobQueueUpdate(file_name, content)
836 837 @staticmethod 838 @_RequireJobQueueLock
839 - def perspective_jobqueue_purge(params):
840 """Purge job queue. 841 842 """ 843 return backend.JobQueuePurge()
844 845 @staticmethod 846 @_RequireJobQueueLock
847 - def perspective_jobqueue_rename(params):
848 """Rename a job queue file. 849 850 """ 851 # TODO: What if a file fails to rename? 852 return [backend.JobQueueRename(old, new) for old, new in params]
853 854 # hypervisor --------------- 855 856 @staticmethod
858 """Validate the hypervisor parameters. 859 860 """ 861 (hvname, hvparams) = params 862 return backend.ValidateHVParams(hvname, hvparams)
863 864 # Crypto 865 866 @staticmethod
867 - def perspective_x509_cert_create(params):
868 """Creates a new X509 certificate for SSL/TLS. 869 870 """ 871 (validity, ) = params 872 return backend.CreateX509Certificate(validity)
873 874 @staticmethod
875 - def perspective_x509_cert_remove(params):
876 """Removes a X509 certificate. 877 878 """ 879 (name, ) = params 880 return backend.RemoveX509Certificate(name)
881 882 # Import and export 883 884 @staticmethod
885 - def perspective_import_start(params):
886 """Starts an import daemon. 887 888 """ 889 (opts_s, instance, dest, dest_args) = params 890 891 opts = objects.ImportExportOptions.FromDict(opts_s) 892 893 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 894 None, None, 895 objects.Instance.FromDict(instance), 896 dest, 897 _DecodeImportExportIO(dest, 898 dest_args))
899 900 @staticmethod
901 - def perspective_export_start(params):
902 """Starts an export daemon. 903 904 """ 905 (opts_s, host, port, instance, source, source_args) = params 906 907 opts = objects.ImportExportOptions.FromDict(opts_s) 908 909 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 910 host, port, 911 objects.Instance.FromDict(instance), 912 source, 913 _DecodeImportExportIO(source, 914 source_args))
915 916 @staticmethod
917 - def perspective_impexp_status(params):
918 """Retrieves the status of an import or export daemon. 919 920 """ 921 return backend.GetImportExportStatus(params[0])
922 923 @staticmethod
924 - def perspective_impexp_abort(params):
925 """Aborts an import or export. 926 927 """ 928 return backend.AbortImportExport(params[0])
929 930 @staticmethod
931 - def perspective_impexp_cleanup(params):
932 """Cleans up after an import or export. 933 934 """ 935 return backend.CleanupImportExport(params[0])
936
937 938 -def CheckNoded(_, args):
939 """Initial checks whether to run or exit with a failure. 940 941 """ 942 if args: # noded doesn't take any arguments 943 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 944 sys.argv[0]) 945 sys.exit(constants.EXIT_FAILURE)
946
947 948 -def PrepNoded(options, _):
949 """Preparation node daemon function, executed with the PID file held. 950 951 """ 952 if options.mlock: 953 request_executor_class = MlockallRequestExecutor 954 try: 955 utils.Mlockall() 956 except errors.NoCtypesError: 957 logging.warning("Cannot set memory lock, ctypes module not found") 958 request_executor_class = http.server.HttpServerRequestExecutor 959 else: 960 request_executor_class = http.server.HttpServerRequestExecutor 961 962 # Read SSL certificate 963 if options.ssl: 964 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 965 ssl_cert_path=options.ssl_cert) 966 else: 967 ssl_params = None 968 969 err = _PrepareQueueLock() 970 if err is not None: 971 # this might be some kind of file-system/permission error; while 972 # this breaks the job queue functionality, we shouldn't prevent 973 # startup of the whole node daemon because of this 974 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 975 976 mainloop = daemon.Mainloop() 977 server = NodeHttpServer(mainloop, options.bind_address, options.port, 978 ssl_params=ssl_params, ssl_verify_peer=True, 979 request_executor_class=request_executor_class) 980 server.Start() 981 return (mainloop, server)
982
983 984 -def ExecNoded(options, args, prep_data): # pylint: disable-msg=W0613
985 """Main node daemon function, executed with the PID file held. 986 987 """ 988 (mainloop, server) = prep_data 989 try: 990 mainloop.Run() 991 finally: 992 server.Stop() 993
994 995 -def Main():
996 """Main function for the node daemon. 997 998 """ 999 parser = OptionParser(description="Ganeti node daemon", 1000 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", 1001 version="%%prog (ganeti) %s" % 1002 constants.RELEASE_VERSION) 1003 parser.add_option("--no-mlock", dest="mlock", 1004 help="Do not mlock the node memory in ram", 1005 default=True, action="store_false") 1006 1007 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1008 default_ssl_cert=constants.NODED_CERT_FILE, 1009 default_ssl_key=constants.NODED_CERT_FILE, 1010 console_logging=True)
1011