Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Ganeti node daemon""" 
  23   
  24  # pylint: disable=C0103,W0142 
  25   
  26  # C0103: Functions in this module need to have a given name structure, 
  27  # and the name of the daemon doesn't match 
  28   
  29  # W0142: Used * or ** magic, since we do use it extensively in this 
  30  # module 
  31   
  32  import os 
  33  import sys 
  34  import logging 
  35  import signal 
  36  import codecs 
  37   
  38  from optparse import OptionParser 
  39   
  40  from ganeti import backend 
  41  from ganeti import constants 
  42  from ganeti import objects 
  43  from ganeti import errors 
  44  from ganeti import jstore 
  45  from ganeti import daemon 
  46  from ganeti import http 
  47  from ganeti import utils 
  48  from ganeti import storage 
  49  from ganeti import serializer 
  50  from ganeti import netutils 
  51   
  52  import ganeti.http.server # pylint: disable=W0611 
  53   
  54   
  55  queue_lock = None 
56 57 58 -def _PrepareQueueLock():
59 """Try to prepare the queue lock. 60 61 @return: None for success, otherwise an exception object 62 63 """ 64 global queue_lock # pylint: disable=W0603 65 66 if queue_lock is not None: 67 return None 68 69 # Prepare job queue 70 try: 71 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 72 return None 73 except EnvironmentError, err: 74 return err
75
76 77 -def _RequireJobQueueLock(fn):
78 """Decorator for job queue manipulating functions. 79 80 """ 81 QUEUE_LOCK_TIMEOUT = 10 82 83 def wrapper(*args, **kwargs): 84 # Locking in exclusive, blocking mode because there could be several 85 # children running at the same time. Waiting up to 10 seconds. 86 if _PrepareQueueLock() is not None: 87 raise errors.JobQueueError("Job queue failed initialization," 88 " cannot update jobs") 89 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 90 try: 91 return fn(*args, **kwargs) 92 finally: 93 queue_lock.Unlock()
94 95 return wrapper 96
97 98 -def _DecodeImportExportIO(ieio, ieioargs):
99 """Decodes import/export I/O information. 100 101 """ 102 if ieio == constants.IEIO_RAW_DISK: 103 assert len(ieioargs) == 1 104 return (objects.Disk.FromDict(ieioargs[0]), ) 105 106 if ieio == constants.IEIO_SCRIPT: 107 assert len(ieioargs) == 2 108 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 109 110 return ieioargs
111
112 113 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114 """Subclass ensuring request handlers are locked in RAM. 115 116 """
117 - def __init__(self, *args, **kwargs):
118 utils.Mlockall() 119 120 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
121
122 123 -class NodeRequestHandler(http.server.HttpServerHandler):
124 """The server implementation. 125 126 This class holds all methods exposed over the RPC interface. 127 128 """ 129 # too many public methods, and unused args - all methods get params 130 # due to the API 131 # pylint: disable=R0904,W0613
132 - def __init__(self):
133 http.server.HttpServerHandler.__init__(self) 134 self.noded_pid = os.getpid()
135
136 - def HandleRequest(self, req):
137 """Handle a request. 138 139 """ 140 # FIXME: Remove HTTP_PUT in Ganeti 2.7 141 if req.request_method.upper() not in (http.HTTP_PUT, http.HTTP_POST): 142 raise http.HttpBadRequest("Only PUT and POST methods are supported") 143 144 path = req.request_path 145 if path.startswith("/"): 146 path = path[1:] 147 148 method = getattr(self, "perspective_%s" % path, None) 149 if method is None: 150 raise http.HttpNotFound() 151 152 try: 153 result = (True, method(serializer.LoadJson(req.request_body))) 154 155 except backend.RPCFail, err: 156 # our custom failure exception; str(err) works fine if the 157 # exception was constructed with a single argument, and in 158 # this case, err.message == err.args[0] == str(err) 159 result = (False, str(err)) 160 except errors.QuitGanetiException, err: 161 # Tell parent to quit 162 logging.info("Shutting down the node daemon, arguments: %s", 163 str(err.args)) 164 os.kill(self.noded_pid, signal.SIGTERM) 165 # And return the error's arguments, which must be already in 166 # correct tuple format 167 result = err.args 168 except Exception, err: 169 logging.exception("Error in RPC call") 170 result = (False, "Error while executing backend function: %s" % str(err)) 171 172 return serializer.DumpJson(result)
173 174 # the new block devices -------------------------- 175 176 @staticmethod
177 - def perspective_blockdev_create(params):
178 """Create a block device. 179 180 """ 181 bdev_s, size, owner, on_primary, info = params 182 bdev = objects.Disk.FromDict(bdev_s) 183 if bdev is None: 184 raise ValueError("can't unserialize data!") 185 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186 187 @staticmethod
189 """Pause/resume sync of a block device. 190 191 """ 192 disks_s, pause = params 193 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 194 return backend.BlockdevPauseResumeSync(disks, pause)
195 196 @staticmethod
197 - def perspective_blockdev_wipe(params):
198 """Wipe a block device. 199 200 """ 201 bdev_s, offset, size = params 202 bdev = objects.Disk.FromDict(bdev_s) 203 return backend.BlockdevWipe(bdev, offset, size)
204 205 @staticmethod
206 - def perspective_blockdev_remove(params):
207 """Remove a block device. 208 209 """ 210 bdev_s = params[0] 211 bdev = objects.Disk.FromDict(bdev_s) 212 return backend.BlockdevRemove(bdev)
213 214 @staticmethod
215 - def perspective_blockdev_rename(params):
216 """Remove a block device. 217 218 """ 219 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 220 return backend.BlockdevRename(devlist)
221 222 @staticmethod
223 - def perspective_blockdev_assemble(params):
224 """Assemble a block device. 225 226 """ 227 bdev_s, owner, on_primary, idx = params 228 bdev = objects.Disk.FromDict(bdev_s) 229 if bdev is None: 230 raise ValueError("can't unserialize data!") 231 return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
232 233 @staticmethod
234 - def perspective_blockdev_shutdown(params):
235 """Shutdown a block device. 236 237 """ 238 bdev_s = params[0] 239 bdev = objects.Disk.FromDict(bdev_s) 240 if bdev is None: 241 raise ValueError("can't unserialize data!") 242 return backend.BlockdevShutdown(bdev)
243 244 @staticmethod
246 """Add a child to a mirror device. 247 248 Note: this is only valid for mirror devices. It's the caller's duty 249 to send a correct disk, otherwise we raise an error. 250 251 """ 252 bdev_s, ndev_s = params 253 bdev = objects.Disk.FromDict(bdev_s) 254 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 255 if bdev is None or ndevs.count(None) > 0: 256 raise ValueError("can't unserialize data!") 257 return backend.BlockdevAddchildren(bdev, ndevs)
258 259 @staticmethod
261 """Remove a child from a mirror device. 262 263 This is only valid for mirror devices, of course. It's the callers 264 duty to send a correct disk, otherwise we raise an error. 265 266 """ 267 bdev_s, ndev_s = params 268 bdev = objects.Disk.FromDict(bdev_s) 269 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 270 if bdev is None or ndevs.count(None) > 0: 271 raise ValueError("can't unserialize data!") 272 return backend.BlockdevRemovechildren(bdev, ndevs)
273 274 @staticmethod
276 """Return the mirror status for a list of disks. 277 278 """ 279 disks = [objects.Disk.FromDict(dsk_s) 280 for dsk_s in params[0]] 281 return [status.ToDict() 282 for status in backend.BlockdevGetmirrorstatus(disks)]
283 284 @staticmethod
286 """Return the mirror status for a list of disks. 287 288 """ 289 (node_disks, ) = params 290 291 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 292 293 result = [] 294 295 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 296 if success: 297 result.append((success, status.ToDict())) 298 else: 299 result.append((success, status)) 300 301 return result
302 303 @staticmethod
304 - def perspective_blockdev_find(params):
305 """Expose the FindBlockDevice functionality for a disk. 306 307 This will try to find but not activate a disk. 308 309 """ 310 disk = objects.Disk.FromDict(params[0]) 311 312 result = backend.BlockdevFind(disk) 313 if result is None: 314 return None 315 316 return result.ToDict()
317 318 @staticmethod
319 - def perspective_blockdev_snapshot(params):
320 """Create a snapshot device. 321 322 Note that this is only valid for LVM disks, if we get passed 323 something else we raise an exception. The snapshot device can be 324 remove by calling the generic block device remove call. 325 326 """ 327 cfbd = objects.Disk.FromDict(params[0]) 328 return backend.BlockdevSnapshot(cfbd)
329 330 @staticmethod
331 - def perspective_blockdev_grow(params):
332 """Grow a stack of devices. 333 334 """ 335 cfbd = objects.Disk.FromDict(params[0]) 336 amount = params[1] 337 dryrun = params[2] 338 return backend.BlockdevGrow(cfbd, amount, dryrun)
339 340 @staticmethod
341 - def perspective_blockdev_close(params):
342 """Closes the given block devices. 343 344 """ 345 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 346 return backend.BlockdevClose(params[0], disks)
347 348 @staticmethod
349 - def perspective_blockdev_getsize(params):
350 """Compute the sizes of the given block devices. 351 352 """ 353 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 354 return backend.BlockdevGetsize(disks)
355 356 @staticmethod
357 - def perspective_blockdev_export(params):
358 """Compute the sizes of the given block devices. 359 360 """ 361 disk = objects.Disk.FromDict(params[0]) 362 dest_node, dest_path, cluster_name = params[1:] 363 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
364 365 # blockdev/drbd specific methods ---------- 366 367 @staticmethod
369 """Disconnects the network connection of drbd disks. 370 371 Note that this is only valid for drbd disks, so the members of the 372 disk list must all be drbd devices. 373 374 """ 375 nodes_ip, disks = params 376 disks = [objects.Disk.FromDict(cf) for cf in disks] 377 return backend.DrbdDisconnectNet(nodes_ip, disks)
378 379 @staticmethod
380 - def perspective_drbd_attach_net(params):
381 """Attaches the network connection of drbd disks. 382 383 Note that this is only valid for drbd disks, so the members of the 384 disk list must all be drbd devices. 385 386 """ 387 nodes_ip, disks, instance_name, multimaster = params 388 disks = [objects.Disk.FromDict(cf) for cf in disks] 389 return backend.DrbdAttachNet(nodes_ip, disks, 390 instance_name, multimaster)
391 392 @staticmethod
393 - def perspective_drbd_wait_sync(params):
394 """Wait until DRBD disks are synched. 395 396 Note that this is only valid for drbd disks, so the members of the 397 disk list must all be drbd devices. 398 399 """ 400 nodes_ip, disks = params 401 disks = [objects.Disk.FromDict(cf) for cf in disks] 402 return backend.DrbdWaitSync(nodes_ip, disks)
403 404 @staticmethod
405 - def perspective_drbd_helper(params):
406 """Query drbd helper. 407 408 """ 409 return backend.GetDrbdUsermodeHelper()
410 411 # export/import -------------------------- 412 413 @staticmethod
414 - def perspective_finalize_export(params):
415 """Expose the finalize export functionality. 416 417 """ 418 instance = objects.Instance.FromDict(params[0]) 419 420 snap_disks = [] 421 for disk in params[1]: 422 if isinstance(disk, bool): 423 snap_disks.append(disk) 424 else: 425 snap_disks.append(objects.Disk.FromDict(disk)) 426 427 return backend.FinalizeExport(instance, snap_disks)
428 429 @staticmethod
430 - def perspective_export_info(params):
431 """Query information about an existing export on this node. 432 433 The given path may not contain an export, in which case we return 434 None. 435 436 """ 437 path = params[0] 438 return backend.ExportInfo(path)
439 440 @staticmethod
441 - def perspective_export_list(params):
442 """List the available exports on this node. 443 444 Note that as opposed to export_info, which may query data about an 445 export in any path, this only queries the standard Ganeti path 446 (constants.EXPORT_DIR). 447 448 """ 449 return backend.ListExports()
450 451 @staticmethod
452 - def perspective_export_remove(params):
453 """Remove an export. 454 455 """ 456 export = params[0] 457 return backend.RemoveExport(export)
458 459 # block device --------------------- 460 @staticmethod
461 - def perspective_bdev_sizes(params):
462 """Query the list of block devices 463 464 """ 465 devices = params[0] 466 return backend.GetBlockDevSizes(devices)
467 468 # volume -------------------------- 469 470 @staticmethod
471 - def perspective_lv_list(params):
472 """Query the list of logical volumes in a given volume group. 473 474 """ 475 vgname = params[0] 476 return backend.GetVolumeList(vgname)
477 478 @staticmethod
479 - def perspective_vg_list(params):
480 """Query the list of volume groups. 481 482 """ 483 return backend.ListVolumeGroups()
484 485 # Storage -------------------------- 486 487 @staticmethod
488 - def perspective_storage_list(params):
489 """Get list of storage units. 490 491 """ 492 (su_name, su_args, name, fields) = params 493 return storage.GetStorage(su_name, *su_args).List(name, fields)
494 495 @staticmethod
496 - def perspective_storage_modify(params):
497 """Modify a storage unit. 498 499 """ 500 (su_name, su_args, name, changes) = params 501 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
502 503 @staticmethod
504 - def perspective_storage_execute(params):
505 """Execute an operation on a storage unit. 506 507 """ 508 (su_name, su_args, name, op) = params 509 return storage.GetStorage(su_name, *su_args).Execute(name, op)
510 511 # bridge -------------------------- 512 513 @staticmethod
514 - def perspective_bridges_exist(params):
515 """Check if all bridges given exist on this node. 516 517 """ 518 bridges_list = params[0] 519 return backend.BridgesExist(bridges_list)
520 521 # instance -------------------------- 522 523 @staticmethod
524 - def perspective_instance_os_add(params):
525 """Install an OS on a given instance. 526 527 """ 528 inst_s = params[0] 529 inst = objects.Instance.FromDict(inst_s) 530 reinstall = params[1] 531 debug = params[2] 532 return backend.InstanceOsAdd(inst, reinstall, debug)
533 534 @staticmethod
536 """Runs the OS rename script for an instance. 537 538 """ 539 inst_s, old_name, debug = params 540 inst = objects.Instance.FromDict(inst_s) 541 return backend.RunRenameInstance(inst, old_name, debug)
542 543 @staticmethod
544 - def perspective_instance_shutdown(params):
545 """Shutdown an instance. 546 547 """ 548 instance = objects.Instance.FromDict(params[0]) 549 timeout = params[1] 550 return backend.InstanceShutdown(instance, timeout)
551 552 @staticmethod
553 - def perspective_instance_start(params):
554 """Start an instance. 555 556 """ 557 (instance_name, startup_paused) = params 558 instance = objects.Instance.FromDict(instance_name) 559 return backend.StartInstance(instance, startup_paused)
560 561 @staticmethod
562 - def perspective_migration_info(params):
563 """Gather information about an instance to be migrated. 564 565 """ 566 instance = objects.Instance.FromDict(params[0]) 567 return backend.MigrationInfo(instance)
568 569 @staticmethod
570 - def perspective_accept_instance(params):
571 """Prepare the node to accept an instance. 572 573 """ 574 instance, info, target = params 575 instance = objects.Instance.FromDict(instance) 576 return backend.AcceptInstance(instance, info, target)
577 578 @staticmethod
580 """Finalize the instance migration on the destination node. 581 582 """ 583 instance, info, success = params 584 instance = objects.Instance.FromDict(instance) 585 return backend.FinalizeMigrationDst(instance, info, success)
586 587 @staticmethod
588 - def perspective_instance_migrate(params):
589 """Migrates an instance. 590 591 """ 592 instance, target, live = params 593 instance = objects.Instance.FromDict(instance) 594 return backend.MigrateInstance(instance, target, live)
595 596 @staticmethod
598 """Finalize the instance migration on the source node. 599 600 """ 601 instance, success, live = params 602 instance = objects.Instance.FromDict(instance) 603 return backend.FinalizeMigrationSource(instance, success, live)
604 605 @staticmethod
607 """Reports migration status. 608 609 """ 610 instance = objects.Instance.FromDict(params[0]) 611 return backend.GetMigrationStatus(instance).ToDict()
612 613 @staticmethod
614 - def perspective_instance_reboot(params):
615 """Reboot an instance. 616 617 """ 618 instance = objects.Instance.FromDict(params[0]) 619 reboot_type = params[1] 620 shutdown_timeout = params[2] 621 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
622 623 @staticmethod
625 """Modify instance runtime memory. 626 627 """ 628 instance_dict, memory = params 629 instance = objects.Instance.FromDict(instance_dict) 630 return backend.InstanceBalloonMemory(instance, memory)
631 632 @staticmethod
633 - def perspective_instance_info(params):
634 """Query instance information. 635 636 """ 637 return backend.GetInstanceInfo(params[0], params[1])
638 639 @staticmethod
641 """Query whether the specified instance can be migrated. 642 643 """ 644 instance = objects.Instance.FromDict(params[0]) 645 return backend.GetInstanceMigratable(instance)
646 647 @staticmethod
649 """Query information about all instances. 650 651 """ 652 return backend.GetAllInstancesInfo(params[0])
653 654 @staticmethod
655 - def perspective_instance_list(params):
656 """Query the list of running instances. 657 658 """ 659 return backend.GetInstanceList(params[0])
660 661 # node -------------------------- 662 663 @staticmethod
665 """Checks if a node has the given ip address. 666 667 """ 668 return netutils.IPAddress.Own(params[0])
669 670 @staticmethod
671 - def perspective_node_info(params):
672 """Query node information. 673 674 """ 675 (vg_names, hv_names) = params 676 return backend.GetNodeInfo(vg_names, hv_names)
677 678 @staticmethod
679 - def perspective_etc_hosts_modify(params):
680 """Modify a node entry in /etc/hosts. 681 682 """ 683 backend.EtcHostsModify(params[0], params[1], params[2]) 684 685 return True
686 687 @staticmethod
688 - def perspective_node_verify(params):
689 """Run a verify sequence on this node. 690 691 """ 692 return backend.VerifyNode(params[0], params[1])
693 694 @staticmethod
696 """Start the master daemons on this node. 697 698 """ 699 return backend.StartMasterDaemons(params[0])
700 701 @staticmethod
703 """Activate the master IP on this node. 704 705 """ 706 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 707 return backend.ActivateMasterIp(master_params, params[1])
708 709 @staticmethod
711 """Deactivate the master IP on this node. 712 713 """ 714 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 715 return backend.DeactivateMasterIp(master_params, params[1])
716 717 @staticmethod
718 - def perspective_node_stop_master(params):
719 """Stops master daemons on this node. 720 721 """ 722 return backend.StopMasterDaemons()
723 724 @staticmethod
726 """Change the master IP netmask. 727 728 """ 729 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 730 params[3])
731 732 @staticmethod
734 """Cleanup after leaving a cluster. 735 736 """ 737 return backend.LeaveCluster(params[0])
738 739 @staticmethod
740 - def perspective_node_volumes(params):
741 """Query the list of all logical volume groups. 742 743 """ 744 return backend.NodeVolumes()
745 746 @staticmethod
748 """Demote a node from the master candidate role. 749 750 """ 751 return backend.DemoteFromMC()
752 753 @staticmethod
754 - def perspective_node_powercycle(params):
755 """Tries to powercycle the nod. 756 757 """ 758 hypervisor_type = params[0] 759 return backend.PowercycleNode(hypervisor_type)
760 761 # cluster -------------------------- 762 763 @staticmethod
764 - def perspective_version(params):
765 """Query version information. 766 767 """ 768 return constants.PROTOCOL_VERSION
769 770 @staticmethod
771 - def perspective_upload_file(params):
772 """Upload a file. 773 774 Note that the backend implementation imposes strict rules on which 775 files are accepted. 776 777 """ 778 return backend.UploadFile(*(params[0]))
779 780 @staticmethod
781 - def perspective_master_info(params):
782 """Query master information. 783 784 """ 785 return backend.GetMasterInfo()
786 787 @staticmethod
788 - def perspective_run_oob(params):
789 """Runs oob on node. 790 791 """ 792 output = backend.RunOob(params[0], params[1], params[2], params[3]) 793 if output: 794 result = serializer.LoadJson(output) 795 else: 796 result = None 797 return result
798 799 @staticmethod
801 """Write ssconf files. 802 803 """ 804 (values,) = params 805 return backend.WriteSsconfFiles(values)
806 807 # os ----------------------- 808 809 @staticmethod
810 - def perspective_os_diagnose(params):
811 """Query detailed information about existing OSes. 812 813 """ 814 return backend.DiagnoseOS()
815 816 @staticmethod
817 - def perspective_os_get(params):
818 """Query information about a given OS. 819 820 """ 821 name = params[0] 822 os_obj = backend.OSFromDisk(name) 823 return os_obj.ToDict()
824 825 @staticmethod
826 - def perspective_os_validate(params):
827 """Run a given OS' validation routine. 828 829 """ 830 required, name, checks, params = params 831 return backend.ValidateOS(required, name, checks, params)
832 833 # hooks ----------------------- 834 835 @staticmethod
836 - def perspective_hooks_runner(params):
837 """Run hook scripts. 838 839 """ 840 hpath, phase, env = params 841 hr = backend.HooksRunner() 842 return hr.RunHooks(hpath, phase, env)
843 844 # iallocator ----------------- 845 846 @staticmethod
847 - def perspective_iallocator_runner(params):
848 """Run an iallocator script. 849 850 """ 851 name, idata = params 852 iar = backend.IAllocatorRunner() 853 return iar.Run(name, idata)
854 855 # test ----------------------- 856 857 @staticmethod
858 - def perspective_test_delay(params):
859 """Run test delay. 860 861 """ 862 duration = params[0] 863 status, rval = utils.TestDelay(duration) 864 if not status: 865 raise backend.RPCFail(rval) 866 return rval
867 868 # file storage --------------- 869 870 @staticmethod
872 """Create the file storage directory. 873 874 """ 875 file_storage_dir = params[0] 876 return backend.CreateFileStorageDir(file_storage_dir)
877 878 @staticmethod
880 """Remove the file storage directory. 881 882 """ 883 file_storage_dir = params[0] 884 return backend.RemoveFileStorageDir(file_storage_dir)
885 886 @staticmethod
888 """Rename the file storage directory. 889 890 """ 891 old_file_storage_dir = params[0] 892 new_file_storage_dir = params[1] 893 return backend.RenameFileStorageDir(old_file_storage_dir, 894 new_file_storage_dir)
895 896 # jobs ------------------------ 897 898 @staticmethod 899 @_RequireJobQueueLock
900 - def perspective_jobqueue_update(params):
901 """Update job queue. 902 903 """ 904 (file_name, content) = params 905 return backend.JobQueueUpdate(file_name, content)
906 907 @staticmethod 908 @_RequireJobQueueLock
909 - def perspective_jobqueue_purge(params):
910 """Purge job queue. 911 912 """ 913 return backend.JobQueuePurge()
914 915 @staticmethod 916 @_RequireJobQueueLock
917 - def perspective_jobqueue_rename(params):
918 """Rename a job queue file. 919 920 """ 921 # TODO: What if a file fails to rename? 922 return [backend.JobQueueRename(old, new) for old, new in params[0]]
923 924 # hypervisor --------------- 925 926 @staticmethod
928 """Validate the hypervisor parameters. 929 930 """ 931 (hvname, hvparams) = params 932 return backend.ValidateHVParams(hvname, hvparams)
933 934 # Crypto 935 936 @staticmethod
937 - def perspective_x509_cert_create(params):
938 """Creates a new X509 certificate for SSL/TLS. 939 940 """ 941 (validity, ) = params 942 return backend.CreateX509Certificate(validity)
943 944 @staticmethod
945 - def perspective_x509_cert_remove(params):
946 """Removes a X509 certificate. 947 948 """ 949 (name, ) = params 950 return backend.RemoveX509Certificate(name)
951 952 # Import and export 953 954 @staticmethod
955 - def perspective_import_start(params):
956 """Starts an import daemon. 957 958 """ 959 (opts_s, instance, component, (dest, dest_args)) = params 960 961 opts = objects.ImportExportOptions.FromDict(opts_s) 962 963 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 964 None, None, 965 objects.Instance.FromDict(instance), 966 component, dest, 967 _DecodeImportExportIO(dest, 968 dest_args))
969 970 @staticmethod
971 - def perspective_export_start(params):
972 """Starts an export daemon. 973 974 """ 975 (opts_s, host, port, instance, component, (source, source_args)) = params 976 977 opts = objects.ImportExportOptions.FromDict(opts_s) 978 979 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 980 host, port, 981 objects.Instance.FromDict(instance), 982 component, source, 983 _DecodeImportExportIO(source, 984 source_args))
985 986 @staticmethod
987 - def perspective_impexp_status(params):
988 """Retrieves the status of an import or export daemon. 989 990 """ 991 return backend.GetImportExportStatus(params[0])
992 993 @staticmethod
994 - def perspective_impexp_abort(params):
995 """Aborts an import or export. 996 997 """ 998 return backend.AbortImportExport(params[0])
999 1000 @staticmethod
1001 - def perspective_impexp_cleanup(params):
1002 """Cleans up after an import or export. 1003 1004 """ 1005 return backend.CleanupImportExport(params[0])
1006
1007 1008 -def CheckNoded(_, args):
1009 """Initial checks whether to run or exit with a failure. 1010 1011 """ 1012 if args: # noded doesn't take any arguments 1013 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1014 sys.argv[0]) 1015 sys.exit(constants.EXIT_FAILURE) 1016 try: 1017 codecs.lookup("string-escape") 1018 except LookupError: 1019 print >> sys.stderr, ("Can't load the string-escape code which is part" 1020 " of the Python installation. Is your installation" 1021 " complete/correct? Aborting.") 1022 sys.exit(constants.EXIT_FAILURE)
1023
1024 1025 -def PrepNoded(options, _):
1026 """Preparation node daemon function, executed with the PID file held. 1027 1028 """ 1029 if options.mlock: 1030 request_executor_class = MlockallRequestExecutor 1031 try: 1032 utils.Mlockall() 1033 except errors.NoCtypesError: 1034 logging.warning("Cannot set memory lock, ctypes module not found") 1035 request_executor_class = http.server.HttpServerRequestExecutor 1036 else: 1037 request_executor_class = http.server.HttpServerRequestExecutor 1038 1039 # Read SSL certificate 1040 if options.ssl: 1041 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1042 ssl_cert_path=options.ssl_cert) 1043 else: 1044 ssl_params = None 1045 1046 err = _PrepareQueueLock() 1047 if err is not None: 1048 # this might be some kind of file-system/permission error; while 1049 # this breaks the job queue functionality, we shouldn't prevent 1050 # startup of the whole node daemon because of this 1051 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1052 1053 handler = NodeRequestHandler() 1054 1055 mainloop = daemon.Mainloop() 1056 server = \ 1057 http.server.HttpServer(mainloop, options.bind_address, options.port, 1058 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1059 request_executor_class=request_executor_class) 1060 server.Start() 1061 1062 return (mainloop, server)
1063
1064 1065 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1066 """Main node daemon function, executed with the PID file held. 1067 1068 """ 1069 (mainloop, server) = prep_data 1070 try: 1071 mainloop.Run() 1072 finally: 1073 server.Stop() 1074
1075 1076 -def Main():
1077 """Main function for the node daemon. 1078 1079 """ 1080 parser = OptionParser(description="Ganeti node daemon", 1081 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", 1082 version="%%prog (ganeti) %s" % 1083 constants.RELEASE_VERSION) 1084 parser.add_option("--no-mlock", dest="mlock", 1085 help="Do not mlock the node memory in ram", 1086 default=True, action="store_false") 1087 1088 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1089 default_ssl_cert=constants.NODED_CERT_FILE, 1090 default_ssl_key=constants.NODED_CERT_FILE, 1091 console_logging=True)
1092