Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Ganeti node daemon""" 
  23   
  24  # pylint: disable=C0103,W0142 
  25   
  26  # C0103: Functions in this module need to have a given name structure, 
  27  # and the name of the daemon doesn't match 
  28   
  29  # W0142: Used * or ** magic, since we do use it extensively in this 
  30  # module 
  31   
  32  import os 
  33  import sys 
  34  import logging 
  35  import signal 
  36  import codecs 
  37   
  38  from optparse import OptionParser 
  39   
  40  from ganeti import backend 
  41  from ganeti import constants 
  42  from ganeti import objects 
  43  from ganeti import errors 
  44  from ganeti import jstore 
  45  from ganeti import daemon 
  46  from ganeti import http 
  47  from ganeti import utils 
  48  from ganeti import storage 
  49  from ganeti import serializer 
  50  from ganeti import netutils 
  51  from ganeti import pathutils 
  52  from ganeti import ssconf 
  53   
  54  import ganeti.http.server # pylint: disable=W0611 
  55   
  56   
  57  queue_lock = None 
58 59 60 -def _extendReasonTrail(trail, source, reason=""):
61 """Extend the reason trail with noded information 62 63 The trail is extended by appending the name of the noded functionality 64 """ 65 assert trail is not None 66 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source) 67 trail.append((trail_source, reason, utils.EpochNano()))
68
69 70 -def _PrepareQueueLock():
71 """Try to prepare the queue lock. 72 73 @return: None for success, otherwise an exception object 74 75 """ 76 global queue_lock # pylint: disable=W0603 77 78 if queue_lock is not None: 79 return None 80 81 # Prepare job queue 82 try: 83 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 84 return None 85 except EnvironmentError, err: 86 return err
87
88 89 -def _RequireJobQueueLock(fn):
90 """Decorator for job queue manipulating functions. 91 92 """ 93 QUEUE_LOCK_TIMEOUT = 10 94 95 def wrapper(*args, **kwargs): 96 # Locking in exclusive, blocking mode because there could be several 97 # children running at the same time. Waiting up to 10 seconds. 98 if _PrepareQueueLock() is not None: 99 raise errors.JobQueueError("Job queue failed initialization," 100 " cannot update jobs") 101 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 102 try: 103 return fn(*args, **kwargs) 104 finally: 105 queue_lock.Unlock()
106 107 return wrapper 108
109 110 -def _DecodeImportExportIO(ieio, ieioargs):
111 """Decodes import/export I/O information. 112 113 """ 114 if ieio == constants.IEIO_RAW_DISK: 115 assert len(ieioargs) == 1 116 return (objects.Disk.FromDict(ieioargs[0]), ) 117 118 if ieio == constants.IEIO_SCRIPT: 119 assert len(ieioargs) == 2 120 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 121 122 return ieioargs
123
124 125 -def _DefaultAlternative(value, default):
126 """Returns value or, if evaluating to False, a default value. 127 128 Returns the given value, unless it evaluates to False. In the latter case the 129 default value is returned. 130 131 @param value: Value to return if it doesn't evaluate to False 132 @param default: Default value 133 @return: Given value or the default 134 135 """ 136 if value: 137 return value 138 139 return default
140
141 142 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143 """Subclass ensuring request handlers are locked in RAM. 144 145 """
146 - def __init__(self, *args, **kwargs):
147 utils.Mlockall() 148 149 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150
151 152 -class NodeRequestHandler(http.server.HttpServerHandler):
153 """The server implementation. 154 155 This class holds all methods exposed over the RPC interface. 156 157 """ 158 # too many public methods, and unused args - all methods get params 159 # due to the API 160 # pylint: disable=R0904,W0613
161 - def __init__(self):
162 http.server.HttpServerHandler.__init__(self) 163 self.noded_pid = os.getpid()
164
165 - def HandleRequest(self, req):
166 """Handle a request. 167 168 """ 169 if req.request_method.upper() != http.HTTP_POST: 170 raise http.HttpBadRequest("Only the POST method is supported") 171 172 path = req.request_path 173 if path.startswith("/"): 174 path = path[1:] 175 176 method = getattr(self, "perspective_%s" % path, None) 177 if method is None: 178 raise http.HttpNotFound() 179 180 try: 181 result = (True, method(serializer.LoadJson(req.request_body))) 182 183 except backend.RPCFail, err: 184 # our custom failure exception; str(err) works fine if the 185 # exception was constructed with a single argument, and in 186 # this case, err.message == err.args[0] == str(err) 187 result = (False, str(err)) 188 except errors.QuitGanetiException, err: 189 # Tell parent to quit 190 logging.info("Shutting down the node daemon, arguments: %s", 191 str(err.args)) 192 os.kill(self.noded_pid, signal.SIGTERM) 193 # And return the error's arguments, which must be already in 194 # correct tuple format 195 result = err.args 196 except Exception, err: 197 logging.exception("Error in RPC call") 198 result = (False, "Error while executing backend function: %s" % str(err)) 199 200 return serializer.DumpJson(result)
201 202 # the new block devices -------------------------- 203 204 @staticmethod
205 - def perspective_blockdev_create(params):
206 """Create a block device. 207 208 """ 209 (bdev_s, size, owner, on_primary, info, excl_stor) = params 210 bdev = objects.Disk.FromDict(bdev_s) 211 if bdev is None: 212 raise ValueError("can't unserialize data!") 213 return backend.BlockdevCreate(bdev, size, owner, on_primary, info, 214 excl_stor)
215 216 @staticmethod
218 """Pause/resume sync of a block device. 219 220 """ 221 disks_s, pause = params 222 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 223 return backend.BlockdevPauseResumeSync(disks, pause)
224 225 @staticmethod
226 - def perspective_blockdev_wipe(params):
227 """Wipe a block device. 228 229 """ 230 bdev_s, offset, size = params 231 bdev = objects.Disk.FromDict(bdev_s) 232 return backend.BlockdevWipe(bdev, offset, size)
233 234 @staticmethod
235 - def perspective_blockdev_remove(params):
236 """Remove a block device. 237 238 """ 239 bdev_s = params[0] 240 bdev = objects.Disk.FromDict(bdev_s) 241 return backend.BlockdevRemove(bdev)
242 243 @staticmethod
244 - def perspective_blockdev_rename(params):
245 """Remove a block device. 246 247 """ 248 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 249 return backend.BlockdevRename(devlist)
250 251 @staticmethod
252 - def perspective_blockdev_assemble(params):
253 """Assemble a block device. 254 255 """ 256 bdev_s, owner, on_primary, idx = params 257 bdev = objects.Disk.FromDict(bdev_s) 258 if bdev is None: 259 raise ValueError("can't unserialize data!") 260 return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
261 262 @staticmethod
263 - def perspective_blockdev_shutdown(params):
264 """Shutdown a block device. 265 266 """ 267 bdev_s = params[0] 268 bdev = objects.Disk.FromDict(bdev_s) 269 if bdev is None: 270 raise ValueError("can't unserialize data!") 271 return backend.BlockdevShutdown(bdev)
272 273 @staticmethod
275 """Add a child to a mirror device. 276 277 Note: this is only valid for mirror devices. It's the caller's duty 278 to send a correct disk, otherwise we raise an error. 279 280 """ 281 bdev_s, ndev_s = params 282 bdev = objects.Disk.FromDict(bdev_s) 283 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 284 if bdev is None or ndevs.count(None) > 0: 285 raise ValueError("can't unserialize data!") 286 return backend.BlockdevAddchildren(bdev, ndevs)
287 288 @staticmethod
290 """Remove a child from a mirror device. 291 292 This is only valid for mirror devices, of course. It's the callers 293 duty to send a correct disk, otherwise we raise an error. 294 295 """ 296 bdev_s, ndev_s = params 297 bdev = objects.Disk.FromDict(bdev_s) 298 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 299 if bdev is None or ndevs.count(None) > 0: 300 raise ValueError("can't unserialize data!") 301 return backend.BlockdevRemovechildren(bdev, ndevs)
302 303 @staticmethod
305 """Return the mirror status for a list of disks. 306 307 """ 308 disks = [objects.Disk.FromDict(dsk_s) 309 for dsk_s in params[0]] 310 return [status.ToDict() 311 for status in backend.BlockdevGetmirrorstatus(disks)]
312 313 @staticmethod
315 """Return the mirror status for a list of disks. 316 317 """ 318 (node_disks, ) = params 319 320 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 321 322 result = [] 323 324 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 325 if success: 326 result.append((success, status.ToDict())) 327 else: 328 result.append((success, status)) 329 330 return result
331 332 @staticmethod
333 - def perspective_blockdev_find(params):
334 """Expose the FindBlockDevice functionality for a disk. 335 336 This will try to find but not activate a disk. 337 338 """ 339 disk = objects.Disk.FromDict(params[0]) 340 341 result = backend.BlockdevFind(disk) 342 if result is None: 343 return None 344 345 return result.ToDict()
346 347 @staticmethod
348 - def perspective_blockdev_snapshot(params):
349 """Create a snapshot device. 350 351 Note that this is only valid for LVM disks, if we get passed 352 something else we raise an exception. The snapshot device can be 353 remove by calling the generic block device remove call. 354 355 """ 356 cfbd = objects.Disk.FromDict(params[0]) 357 return backend.BlockdevSnapshot(cfbd)
358 359 @staticmethod
360 - def perspective_blockdev_grow(params):
361 """Grow a stack of devices. 362 363 """ 364 if len(params) < 4: 365 raise ValueError("Received only 3 parameters in blockdev_grow," 366 " old master?") 367 cfbd = objects.Disk.FromDict(params[0]) 368 amount = params[1] 369 dryrun = params[2] 370 backingstore = params[3] 371 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
372 373 @staticmethod
374 - def perspective_blockdev_close(params):
375 """Closes the given block devices. 376 377 """ 378 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 379 return backend.BlockdevClose(params[0], disks)
380 381 @staticmethod
382 - def perspective_blockdev_getsize(params):
383 """Compute the sizes of the given block devices. 384 385 """ 386 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 387 return backend.BlockdevGetsize(disks)
388 389 @staticmethod
390 - def perspective_blockdev_export(params):
391 """Compute the sizes of the given block devices. 392 393 """ 394 disk = objects.Disk.FromDict(params[0]) 395 dest_node, dest_path, cluster_name = params[1:] 396 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
397 398 @staticmethod
399 - def perspective_blockdev_setinfo(params):
400 """Sets metadata information on the given block device. 401 402 """ 403 (disk, info) = params 404 disk = objects.Disk.FromDict(disk) 405 return backend.BlockdevSetInfo(disk, info)
406 407 # blockdev/drbd specific methods ---------- 408 409 @staticmethod
411 """Disconnects the network connection of drbd disks. 412 413 Note that this is only valid for drbd disks, so the members of the 414 disk list must all be drbd devices. 415 416 """ 417 nodes_ip, disks = params 418 disks = [objects.Disk.FromDict(cf) for cf in disks] 419 return backend.DrbdDisconnectNet(nodes_ip, disks)
420 421 @staticmethod
422 - def perspective_drbd_attach_net(params):
423 """Attaches the network connection of drbd disks. 424 425 Note that this is only valid for drbd disks, so the members of the 426 disk list must all be drbd devices. 427 428 """ 429 nodes_ip, disks, instance_name, multimaster = params 430 disks = [objects.Disk.FromDict(cf) for cf in disks] 431 return backend.DrbdAttachNet(nodes_ip, disks, 432 instance_name, multimaster)
433 434 @staticmethod
435 - def perspective_drbd_wait_sync(params):
436 """Wait until DRBD disks are synched. 437 438 Note that this is only valid for drbd disks, so the members of the 439 disk list must all be drbd devices. 440 441 """ 442 nodes_ip, disks = params 443 disks = [objects.Disk.FromDict(cf) for cf in disks] 444 return backend.DrbdWaitSync(nodes_ip, disks)
445 446 @staticmethod
447 - def perspective_drbd_helper(params):
448 """Query drbd helper. 449 450 """ 451 return backend.GetDrbdUsermodeHelper()
452 453 # export/import -------------------------- 454 455 @staticmethod
456 - def perspective_finalize_export(params):
457 """Expose the finalize export functionality. 458 459 """ 460 instance = objects.Instance.FromDict(params[0]) 461 462 snap_disks = [] 463 for disk in params[1]: 464 if isinstance(disk, bool): 465 snap_disks.append(disk) 466 else: 467 snap_disks.append(objects.Disk.FromDict(disk)) 468 469 return backend.FinalizeExport(instance, snap_disks)
470 471 @staticmethod
472 - def perspective_export_info(params):
473 """Query information about an existing export on this node. 474 475 The given path may not contain an export, in which case we return 476 None. 477 478 """ 479 path = params[0] 480 return backend.ExportInfo(path)
481 482 @staticmethod
483 - def perspective_export_list(params):
484 """List the available exports on this node. 485 486 Note that as opposed to export_info, which may query data about an 487 export in any path, this only queries the standard Ganeti path 488 (pathutils.EXPORT_DIR). 489 490 """ 491 return backend.ListExports()
492 493 @staticmethod
494 - def perspective_export_remove(params):
495 """Remove an export. 496 497 """ 498 export = params[0] 499 return backend.RemoveExport(export)
500 501 # block device --------------------- 502 @staticmethod
503 - def perspective_bdev_sizes(params):
504 """Query the list of block devices 505 506 """ 507 devices = params[0] 508 return backend.GetBlockDevSizes(devices)
509 510 # volume -------------------------- 511 512 @staticmethod
513 - def perspective_lv_list(params):
514 """Query the list of logical volumes in a given volume group. 515 516 """ 517 vgname = params[0] 518 return backend.GetVolumeList(vgname)
519 520 @staticmethod
521 - def perspective_vg_list(params):
522 """Query the list of volume groups. 523 524 """ 525 return backend.ListVolumeGroups()
526 527 # Storage -------------------------- 528 529 @staticmethod
530 - def perspective_storage_list(params):
531 """Get list of storage units. 532 533 """ 534 (su_name, su_args, name, fields) = params 535 return storage.GetStorage(su_name, *su_args).List(name, fields)
536 537 @staticmethod
538 - def perspective_storage_modify(params):
539 """Modify a storage unit. 540 541 """ 542 (su_name, su_args, name, changes) = params 543 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
544 545 @staticmethod
546 - def perspective_storage_execute(params):
547 """Execute an operation on a storage unit. 548 549 """ 550 (su_name, su_args, name, op) = params 551 return storage.GetStorage(su_name, *su_args).Execute(name, op)
552 553 # bridge -------------------------- 554 555 @staticmethod
556 - def perspective_bridges_exist(params):
557 """Check if all bridges given exist on this node. 558 559 """ 560 bridges_list = params[0] 561 return backend.BridgesExist(bridges_list)
562 563 # instance -------------------------- 564 565 @staticmethod
566 - def perspective_instance_os_add(params):
567 """Install an OS on a given instance. 568 569 """ 570 inst_s = params[0] 571 inst = objects.Instance.FromDict(inst_s) 572 reinstall = params[1] 573 debug = params[2] 574 return backend.InstanceOsAdd(inst, reinstall, debug)
575 576 @staticmethod
578 """Runs the OS rename script for an instance. 579 580 """ 581 inst_s, old_name, debug = params 582 inst = objects.Instance.FromDict(inst_s) 583 return backend.RunRenameInstance(inst, old_name, debug)
584 585 @staticmethod
586 - def perspective_instance_shutdown(params):
587 """Shutdown an instance. 588 589 """ 590 instance = objects.Instance.FromDict(params[0]) 591 timeout = params[1] 592 trail = params[2] 593 _extendReasonTrail(trail, "shutdown") 594 return backend.InstanceShutdown(instance, timeout, trail)
595 596 @staticmethod
597 - def perspective_instance_start(params):
598 """Start an instance. 599 600 """ 601 (instance_name, startup_paused, trail) = params 602 instance = objects.Instance.FromDict(instance_name) 603 _extendReasonTrail(trail, "start") 604 return backend.StartInstance(instance, startup_paused, trail)
605 606 @staticmethod
607 - def perspective_migration_info(params):
608 """Gather information about an instance to be migrated. 609 610 """ 611 instance = objects.Instance.FromDict(params[0]) 612 return backend.MigrationInfo(instance)
613 614 @staticmethod
615 - def perspective_accept_instance(params):
616 """Prepare the node to accept an instance. 617 618 """ 619 instance, info, target = params 620 instance = objects.Instance.FromDict(instance) 621 return backend.AcceptInstance(instance, info, target)
622 623 @staticmethod
625 """Finalize the instance migration on the destination node. 626 627 """ 628 instance, info, success = params 629 instance = objects.Instance.FromDict(instance) 630 return backend.FinalizeMigrationDst(instance, info, success)
631 632 @staticmethod
633 - def perspective_instance_migrate(params):
634 """Migrates an instance. 635 636 """ 637 instance, target, live = params 638 instance = objects.Instance.FromDict(instance) 639 return backend.MigrateInstance(instance, target, live)
640 641 @staticmethod
643 """Finalize the instance migration on the source node. 644 645 """ 646 instance, success, live = params 647 instance = objects.Instance.FromDict(instance) 648 return backend.FinalizeMigrationSource(instance, success, live)
649 650 @staticmethod
652 """Reports migration status. 653 654 """ 655 instance = objects.Instance.FromDict(params[0]) 656 return backend.GetMigrationStatus(instance).ToDict()
657 658 @staticmethod
659 - def perspective_instance_reboot(params):
660 """Reboot an instance. 661 662 """ 663 instance = objects.Instance.FromDict(params[0]) 664 reboot_type = params[1] 665 shutdown_timeout = params[2] 666 trail = params[3] 667 _extendReasonTrail(trail, "reboot") 668 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout, 669 trail)
670 671 @staticmethod
673 """Modify instance runtime memory. 674 675 """ 676 instance_dict, memory = params 677 instance = objects.Instance.FromDict(instance_dict) 678 return backend.InstanceBalloonMemory(instance, memory)
679 680 @staticmethod
681 - def perspective_instance_info(params):
682 """Query instance information. 683 684 """ 685 return backend.GetInstanceInfo(params[0], params[1])
686 687 @staticmethod
689 """Query whether the specified instance can be migrated. 690 691 """ 692 instance = objects.Instance.FromDict(params[0]) 693 return backend.GetInstanceMigratable(instance)
694 695 @staticmethod
697 """Query information about all instances. 698 699 """ 700 return backend.GetAllInstancesInfo(params[0])
701 702 @staticmethod
703 - def perspective_instance_list(params):
704 """Query the list of running instances. 705 706 """ 707 return backend.GetInstanceList(params[0])
708 709 # node -------------------------- 710 711 @staticmethod
713 """Checks if a node has the given ip address. 714 715 """ 716 return netutils.IPAddress.Own(params[0])
717 718 @staticmethod
719 - def perspective_node_info(params):
720 """Query node information. 721 722 """ 723 (vg_names, hv_names, excl_stor) = params 724 return backend.GetNodeInfo(vg_names, hv_names, excl_stor)
725 726 @staticmethod
727 - def perspective_etc_hosts_modify(params):
728 """Modify a node entry in /etc/hosts. 729 730 """ 731 backend.EtcHostsModify(params[0], params[1], params[2]) 732 733 return True
734 735 @staticmethod
736 - def perspective_node_verify(params):
737 """Run a verify sequence on this node. 738 739 """ 740 return backend.VerifyNode(params[0], params[1])
741 742 @classmethod
743 - def perspective_node_verify_light(cls, params):
744 """Run a light verify sequence on this node. 745 746 """ 747 # So far it's the same as the normal node_verify 748 return cls.perspective_node_verify(params)
749 750 @staticmethod
752 """Start the master daemons on this node. 753 754 """ 755 return backend.StartMasterDaemons(params[0])
756 757 @staticmethod
759 """Activate the master IP on this node. 760 761 """ 762 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 763 return backend.ActivateMasterIp(master_params, params[1])
764 765 @staticmethod
767 """Deactivate the master IP on this node. 768 769 """ 770 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 771 return backend.DeactivateMasterIp(master_params, params[1])
772 773 @staticmethod
774 - def perspective_node_stop_master(params):
775 """Stops master daemons on this node. 776 777 """ 778 return backend.StopMasterDaemons()
779 780 @staticmethod
782 """Change the master IP netmask. 783 784 """ 785 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 786 params[3])
787 788 @staticmethod
790 """Cleanup after leaving a cluster. 791 792 """ 793 return backend.LeaveCluster(params[0])
794 795 @staticmethod
796 - def perspective_node_volumes(params):
797 """Query the list of all logical volume groups. 798 799 """ 800 return backend.NodeVolumes()
801 802 @staticmethod
804 """Demote a node from the master candidate role. 805 806 """ 807 return backend.DemoteFromMC()
808 809 @staticmethod
810 - def perspective_node_powercycle(params):
811 """Tries to powercycle the nod. 812 813 """ 814 hypervisor_type = params[0] 815 return backend.PowercycleNode(hypervisor_type)
816 817 # cluster -------------------------- 818 819 @staticmethod
820 - def perspective_version(params):
821 """Query version information. 822 823 """ 824 return constants.PROTOCOL_VERSION
825 826 @staticmethod
827 - def perspective_upload_file(params):
828 """Upload a file. 829 830 Note that the backend implementation imposes strict rules on which 831 files are accepted. 832 833 """ 834 return backend.UploadFile(*(params[0]))
835 836 @staticmethod
837 - def perspective_master_info(params):
838 """Query master information. 839 840 """ 841 return backend.GetMasterInfo()
842 843 @staticmethod
844 - def perspective_run_oob(params):
845 """Runs oob on node. 846 847 """ 848 output = backend.RunOob(params[0], params[1], params[2], params[3]) 849 if output: 850 result = serializer.LoadJson(output) 851 else: 852 result = None 853 return result
854 855 @staticmethod
857 """Runs a restricted command. 858 859 """ 860 (cmd, ) = params 861 862 return backend.RunRestrictedCmd(cmd)
863 864 @staticmethod
866 """Write ssconf files. 867 868 """ 869 (values,) = params 870 return ssconf.WriteSsconfFiles(values)
871 872 @staticmethod
873 - def perspective_get_watcher_pause(params):
874 """Get watcher pause end. 875 876 """ 877 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
878 879 @staticmethod
880 - def perspective_set_watcher_pause(params):
881 """Set watcher pause. 882 883 """ 884 (until, ) = params 885 return backend.SetWatcherPause(until)
886 887 # os ----------------------- 888 889 @staticmethod
890 - def perspective_os_diagnose(params):
891 """Query detailed information about existing OSes. 892 893 """ 894 return backend.DiagnoseOS()
895 896 @staticmethod
897 - def perspective_os_get(params):
898 """Query information about a given OS. 899 900 """ 901 name = params[0] 902 os_obj = backend.OSFromDisk(name) 903 return os_obj.ToDict()
904 905 @staticmethod
906 - def perspective_os_validate(params):
907 """Run a given OS' validation routine. 908 909 """ 910 required, name, checks, params = params 911 return backend.ValidateOS(required, name, checks, params)
912 913 # extstorage ----------------------- 914 915 @staticmethod
917 """Query detailed information about existing extstorage providers. 918 919 """ 920 return backend.DiagnoseExtStorage()
921 922 # hooks ----------------------- 923 924 @staticmethod
925 - def perspective_hooks_runner(params):
926 """Run hook scripts. 927 928 """ 929 hpath, phase, env = params 930 hr = backend.HooksRunner() 931 return hr.RunHooks(hpath, phase, env)
932 933 # iallocator ----------------- 934 935 @staticmethod
936 - def perspective_iallocator_runner(params):
937 """Run an iallocator script. 938 939 """ 940 name, idata = params 941 iar = backend.IAllocatorRunner() 942 return iar.Run(name, idata)
943 944 # test ----------------------- 945 946 @staticmethod
947 - def perspective_test_delay(params):
948 """Run test delay. 949 950 """ 951 duration = params[0] 952 status, rval = utils.TestDelay(duration) 953 if not status: 954 raise backend.RPCFail(rval) 955 return rval
956 957 # file storage --------------- 958 959 @staticmethod
961 """Create the file storage directory. 962 963 """ 964 file_storage_dir = params[0] 965 return backend.CreateFileStorageDir(file_storage_dir)
966 967 @staticmethod
969 """Remove the file storage directory. 970 971 """ 972 file_storage_dir = params[0] 973 return backend.RemoveFileStorageDir(file_storage_dir)
974 975 @staticmethod
977 """Rename the file storage directory. 978 979 """ 980 old_file_storage_dir = params[0] 981 new_file_storage_dir = params[1] 982 return backend.RenameFileStorageDir(old_file_storage_dir, 983 new_file_storage_dir)
984 985 # jobs ------------------------ 986 987 @staticmethod 988 @_RequireJobQueueLock
989 - def perspective_jobqueue_update(params):
990 """Update job queue. 991 992 """ 993 (file_name, content) = params 994 return backend.JobQueueUpdate(file_name, content)
995 996 @staticmethod 997 @_RequireJobQueueLock
998 - def perspective_jobqueue_purge(params):
999 """Purge job queue. 1000 1001 """ 1002 return backend.JobQueuePurge()
1003 1004 @staticmethod 1005 @_RequireJobQueueLock
1006 - def perspective_jobqueue_rename(params):
1007 """Rename a job queue file. 1008 1009 """ 1010 # TODO: What if a file fails to rename? 1011 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1012 1013 @staticmethod 1014 @_RequireJobQueueLock
1016 """Set job queue's drain flag. 1017 1018 """ 1019 (flag, ) = params 1020 1021 return jstore.SetDrainFlag(flag)
1022 1023 # hypervisor --------------- 1024 1025 @staticmethod
1027 """Validate the hypervisor parameters. 1028 1029 """ 1030 (hvname, hvparams) = params 1031 return backend.ValidateHVParams(hvname, hvparams)
1032 1033 # Crypto 1034 1035 @staticmethod
1036 - def perspective_x509_cert_create(params):
1037 """Creates a new X509 certificate for SSL/TLS. 1038 1039 """ 1040 (validity, ) = params 1041 return backend.CreateX509Certificate(validity)
1042 1043 @staticmethod
1044 - def perspective_x509_cert_remove(params):
1045 """Removes a X509 certificate. 1046 1047 """ 1048 (name, ) = params 1049 return backend.RemoveX509Certificate(name)
1050 1051 # Import and export 1052 1053 @staticmethod
1054 - def perspective_import_start(params):
1055 """Starts an import daemon. 1056 1057 """ 1058 (opts_s, instance, component, (dest, dest_args)) = params 1059 1060 opts = objects.ImportExportOptions.FromDict(opts_s) 1061 1062 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 1063 None, None, 1064 objects.Instance.FromDict(instance), 1065 component, dest, 1066 _DecodeImportExportIO(dest, 1067 dest_args))
1068 1069 @staticmethod
1070 - def perspective_export_start(params):
1071 """Starts an export daemon. 1072 1073 """ 1074 (opts_s, host, port, instance, component, (source, source_args)) = params 1075 1076 opts = objects.ImportExportOptions.FromDict(opts_s) 1077 1078 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 1079 host, port, 1080 objects.Instance.FromDict(instance), 1081 component, source, 1082 _DecodeImportExportIO(source, 1083 source_args))
1084 1085 @staticmethod
1086 - def perspective_impexp_status(params):
1087 """Retrieves the status of an import or export daemon. 1088 1089 """ 1090 return backend.GetImportExportStatus(params[0])
1091 1092 @staticmethod
1093 - def perspective_impexp_abort(params):
1094 """Aborts an import or export. 1095 1096 """ 1097 return backend.AbortImportExport(params[0])
1098 1099 @staticmethod
1100 - def perspective_impexp_cleanup(params):
1101 """Cleans up after an import or export. 1102 1103 """ 1104 return backend.CleanupImportExport(params[0])
1105
1106 1107 -def CheckNoded(_, args):
1108 """Initial checks whether to run or exit with a failure. 1109 1110 """ 1111 if args: # noded doesn't take any arguments 1112 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1113 sys.argv[0]) 1114 sys.exit(constants.EXIT_FAILURE) 1115 try: 1116 codecs.lookup("string-escape") 1117 except LookupError: 1118 print >> sys.stderr, ("Can't load the string-escape code which is part" 1119 " of the Python installation. Is your installation" 1120 " complete/correct? Aborting.") 1121 sys.exit(constants.EXIT_FAILURE)
1122
1123 1124 -def PrepNoded(options, _):
1125 """Preparation node daemon function, executed with the PID file held. 1126 1127 """ 1128 if options.mlock: 1129 request_executor_class = MlockallRequestExecutor 1130 try: 1131 utils.Mlockall() 1132 except errors.NoCtypesError: 1133 logging.warning("Cannot set memory lock, ctypes module not found") 1134 request_executor_class = http.server.HttpServerRequestExecutor 1135 else: 1136 request_executor_class = http.server.HttpServerRequestExecutor 1137 1138 # Read SSL certificate 1139 if options.ssl: 1140 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1141 ssl_cert_path=options.ssl_cert) 1142 else: 1143 ssl_params = None 1144 1145 err = _PrepareQueueLock() 1146 if err is not None: 1147 # this might be some kind of file-system/permission error; while 1148 # this breaks the job queue functionality, we shouldn't prevent 1149 # startup of the whole node daemon because of this 1150 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1151 1152 handler = NodeRequestHandler() 1153 1154 mainloop = daemon.Mainloop() 1155 server = \ 1156 http.server.HttpServer(mainloop, options.bind_address, options.port, 1157 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1158 request_executor_class=request_executor_class) 1159 server.Start() 1160 1161 return (mainloop, server)
1162
1163 1164 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1165 """Main node daemon function, executed with the PID file held. 1166 1167 """ 1168 (mainloop, server) = prep_data 1169 try: 1170 mainloop.Run() 1171 finally: 1172 server.Stop() 1173
1174 1175 -def Main():
1176 """Main function for the node daemon. 1177 1178 """ 1179 parser = OptionParser(description="Ganeti node daemon", 1180 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", 1181 version="%%prog (ganeti) %s" % 1182 constants.RELEASE_VERSION) 1183 parser.add_option("--no-mlock", dest="mlock", 1184 help="Do not mlock the node memory in ram", 1185 default=True, action="store_false") 1186 1187 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1188 default_ssl_cert=pathutils.NODED_CERT_FILE, 1189 default_ssl_key=pathutils.NODED_CERT_FILE, 1190 console_logging=True)
1191