Package ganeti :: Package server :: Module noded
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.noded

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Ganeti node daemon""" 
  23   
  24  # pylint: disable=C0103,W0142 
  25   
  26  # C0103: Functions in this module need to have a given name structure, 
  27  # and the name of the daemon doesn't match 
  28   
  29  # W0142: Used * or ** magic, since we do use it extensively in this 
  30  # module 
  31   
  32  import os 
  33  import sys 
  34  import logging 
  35  import signal 
  36  import codecs 
  37   
  38  from optparse import OptionParser 
  39   
  40  from ganeti import backend 
  41  from ganeti import constants 
  42  from ganeti import objects 
  43  from ganeti import errors 
  44  from ganeti import jstore 
  45  from ganeti import daemon 
  46  from ganeti import http 
  47  from ganeti import utils 
  48  from ganeti.storage import container 
  49  from ganeti import serializer 
  50  from ganeti import netutils 
  51  from ganeti import pathutils 
  52  from ganeti import ssconf 
  53   
  54  import ganeti.http.server # pylint: disable=W0611 
  55   
  56   
  57  queue_lock = None 
58 59 60 -def _extendReasonTrail(trail, source, reason=""):
61 """Extend the reason trail with noded information 62 63 The trail is extended by appending the name of the noded functionality 64 """ 65 assert trail is not None 66 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source) 67 trail.append((trail_source, reason, utils.EpochNano()))
68
69 70 -def _PrepareQueueLock():
71 """Try to prepare the queue lock. 72 73 @return: None for success, otherwise an exception object 74 75 """ 76 global queue_lock # pylint: disable=W0603 77 78 if queue_lock is not None: 79 return None 80 81 # Prepare job queue 82 try: 83 queue_lock = jstore.InitAndVerifyQueue(must_lock=False) 84 return None 85 except EnvironmentError, err: 86 return err
87
88 89 -def _RequireJobQueueLock(fn):
90 """Decorator for job queue manipulating functions. 91 92 """ 93 QUEUE_LOCK_TIMEOUT = 10 94 95 def wrapper(*args, **kwargs): 96 # Locking in exclusive, blocking mode because there could be several 97 # children running at the same time. Waiting up to 10 seconds. 98 if _PrepareQueueLock() is not None: 99 raise errors.JobQueueError("Job queue failed initialization," 100 " cannot update jobs") 101 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) 102 try: 103 return fn(*args, **kwargs) 104 finally: 105 queue_lock.Unlock()
106 107 return wrapper 108
109 110 -def _DecodeImportExportIO(ieio, ieioargs):
111 """Decodes import/export I/O information. 112 113 """ 114 if ieio == constants.IEIO_RAW_DISK: 115 assert len(ieioargs) == 1 116 return (objects.Disk.FromDict(ieioargs[0]), ) 117 118 if ieio == constants.IEIO_SCRIPT: 119 assert len(ieioargs) == 2 120 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) 121 122 return ieioargs
123
124 125 -def _DefaultAlternative(value, default):
126 """Returns value or, if evaluating to False, a default value. 127 128 Returns the given value, unless it evaluates to False. In the latter case the 129 default value is returned. 130 131 @param value: Value to return if it doesn't evaluate to False 132 @param default: Default value 133 @return: Given value or the default 134 135 """ 136 if value: 137 return value 138 139 return default
140
141 142 -class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143 """Subclass ensuring request handlers are locked in RAM. 144 145 """
146 - def __init__(self, *args, **kwargs):
147 utils.Mlockall() 148 149 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150
151 152 -class NodeRequestHandler(http.server.HttpServerHandler):
153 """The server implementation. 154 155 This class holds all methods exposed over the RPC interface. 156 157 """ 158 # too many public methods, and unused args - all methods get params 159 # due to the API 160 # pylint: disable=R0904,W0613
161 - def __init__(self):
162 http.server.HttpServerHandler.__init__(self) 163 self.noded_pid = os.getpid()
164
165 - def HandleRequest(self, req):
166 """Handle a request. 167 168 """ 169 if req.request_method.upper() != http.HTTP_POST: 170 raise http.HttpBadRequest("Only the POST method is supported") 171 172 path = req.request_path 173 if path.startswith("/"): 174 path = path[1:] 175 176 method = getattr(self, "perspective_%s" % path, None) 177 if method is None: 178 raise http.HttpNotFound() 179 180 try: 181 result = (True, method(serializer.LoadJson(req.request_body))) 182 183 except backend.RPCFail, err: 184 # our custom failure exception; str(err) works fine if the 185 # exception was constructed with a single argument, and in 186 # this case, err.message == err.args[0] == str(err) 187 result = (False, str(err)) 188 except errors.QuitGanetiException, err: 189 # Tell parent to quit 190 logging.info("Shutting down the node daemon, arguments: %s", 191 str(err.args)) 192 os.kill(self.noded_pid, signal.SIGTERM) 193 # And return the error's arguments, which must be already in 194 # correct tuple format 195 result = err.args 196 except Exception, err: 197 logging.exception("Error in RPC call") 198 result = (False, "Error while executing backend function: %s" % str(err)) 199 200 return serializer.DumpJson(result)
201 202 # the new block devices -------------------------- 203 204 @staticmethod
205 - def perspective_blockdev_create(params):
206 """Create a block device. 207 208 """ 209 (bdev_s, size, owner, on_primary, info, excl_stor) = params 210 bdev = objects.Disk.FromDict(bdev_s) 211 if bdev is None: 212 raise ValueError("can't unserialize data!") 213 return backend.BlockdevCreate(bdev, size, owner, on_primary, info, 214 excl_stor)
215 216 @staticmethod
218 """Pause/resume sync of a block device. 219 220 """ 221 disks_s, pause = params 222 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s] 223 return backend.BlockdevPauseResumeSync(disks, pause)
224 225 @staticmethod
226 - def perspective_blockdev_wipe(params):
227 """Wipe a block device. 228 229 """ 230 bdev_s, offset, size = params 231 bdev = objects.Disk.FromDict(bdev_s) 232 return backend.BlockdevWipe(bdev, offset, size)
233 234 @staticmethod
235 - def perspective_blockdev_remove(params):
236 """Remove a block device. 237 238 """ 239 bdev_s = params[0] 240 bdev = objects.Disk.FromDict(bdev_s) 241 return backend.BlockdevRemove(bdev)
242 243 @staticmethod
244 - def perspective_blockdev_rename(params):
245 """Remove a block device. 246 247 """ 248 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] 249 return backend.BlockdevRename(devlist)
250 251 @staticmethod
252 - def perspective_blockdev_assemble(params):
253 """Assemble a block device. 254 255 """ 256 bdev_s, owner, on_primary, idx = params 257 bdev = objects.Disk.FromDict(bdev_s) 258 if bdev is None: 259 raise ValueError("can't unserialize data!") 260 return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
261 262 @staticmethod
263 - def perspective_blockdev_shutdown(params):
264 """Shutdown a block device. 265 266 """ 267 bdev_s = params[0] 268 bdev = objects.Disk.FromDict(bdev_s) 269 if bdev is None: 270 raise ValueError("can't unserialize data!") 271 return backend.BlockdevShutdown(bdev)
272 273 @staticmethod
275 """Add a child to a mirror device. 276 277 Note: this is only valid for mirror devices. It's the caller's duty 278 to send a correct disk, otherwise we raise an error. 279 280 """ 281 bdev_s, ndev_s = params 282 bdev = objects.Disk.FromDict(bdev_s) 283 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 284 if bdev is None or ndevs.count(None) > 0: 285 raise ValueError("can't unserialize data!") 286 return backend.BlockdevAddchildren(bdev, ndevs)
287 288 @staticmethod
290 """Remove a child from a mirror device. 291 292 This is only valid for mirror devices, of course. It's the callers 293 duty to send a correct disk, otherwise we raise an error. 294 295 """ 296 bdev_s, ndev_s = params 297 bdev = objects.Disk.FromDict(bdev_s) 298 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] 299 if bdev is None or ndevs.count(None) > 0: 300 raise ValueError("can't unserialize data!") 301 return backend.BlockdevRemovechildren(bdev, ndevs)
302 303 @staticmethod
305 """Return the mirror status for a list of disks. 306 307 """ 308 disks = [objects.Disk.FromDict(dsk_s) 309 for dsk_s in params[0]] 310 return [status.ToDict() 311 for status in backend.BlockdevGetmirrorstatus(disks)]
312 313 @staticmethod
315 """Return the mirror status for a list of disks. 316 317 """ 318 (node_disks, ) = params 319 320 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] 321 322 result = [] 323 324 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks): 325 if success: 326 result.append((success, status.ToDict())) 327 else: 328 result.append((success, status)) 329 330 return result
331 332 @staticmethod
333 - def perspective_blockdev_find(params):
334 """Expose the FindBlockDevice functionality for a disk. 335 336 This will try to find but not activate a disk. 337 338 """ 339 disk = objects.Disk.FromDict(params[0]) 340 341 result = backend.BlockdevFind(disk) 342 if result is None: 343 return None 344 345 return result.ToDict()
346 347 @staticmethod
348 - def perspective_blockdev_snapshot(params):
349 """Create a snapshot device. 350 351 Note that this is only valid for LVM disks, if we get passed 352 something else we raise an exception. The snapshot device can be 353 remove by calling the generic block device remove call. 354 355 """ 356 cfbd = objects.Disk.FromDict(params[0]) 357 return backend.BlockdevSnapshot(cfbd)
358 359 @staticmethod
360 - def perspective_blockdev_grow(params):
361 """Grow a stack of devices. 362 363 """ 364 if len(params) < 5: 365 raise ValueError("Received only %s parameters in blockdev_grow," 366 " old master?" % len(params)) 367 cfbd = objects.Disk.FromDict(params[0]) 368 amount = params[1] 369 dryrun = params[2] 370 backingstore = params[3] 371 excl_stor = params[4] 372 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
373 374 @staticmethod
375 - def perspective_blockdev_close(params):
376 """Closes the given block devices. 377 378 """ 379 disks = [objects.Disk.FromDict(cf) for cf in params[1]] 380 return backend.BlockdevClose(params[0], disks)
381 382 @staticmethod
384 """Compute the sizes of the given block devices. 385 386 """ 387 disks = [objects.Disk.FromDict(cf) for cf in params[0]] 388 return backend.BlockdevGetdimensions(disks)
389 390 @staticmethod
391 - def perspective_blockdev_export(params):
392 """Compute the sizes of the given block devices. 393 394 """ 395 disk = objects.Disk.FromDict(params[0]) 396 dest_node_ip, dest_path, cluster_name = params[1:] 397 return backend.BlockdevExport(disk, dest_node_ip, dest_path, cluster_name)
398 399 @staticmethod
400 - def perspective_blockdev_setinfo(params):
401 """Sets metadata information on the given block device. 402 403 """ 404 (disk, info) = params 405 disk = objects.Disk.FromDict(disk) 406 return backend.BlockdevSetInfo(disk, info)
407 408 # blockdev/drbd specific methods ---------- 409 410 @staticmethod
412 """Disconnects the network connection of drbd disks. 413 414 Note that this is only valid for drbd disks, so the members of the 415 disk list must all be drbd devices. 416 417 """ 418 nodes_ip, disks, target_node_uuid = params 419 disks = [objects.Disk.FromDict(cf) for cf in disks] 420 return backend.DrbdDisconnectNet(target_node_uuid, nodes_ip, disks)
421 422 @staticmethod
423 - def perspective_drbd_attach_net(params):
424 """Attaches the network connection of drbd disks. 425 426 Note that this is only valid for drbd disks, so the members of the 427 disk list must all be drbd devices. 428 429 """ 430 nodes_ip, disks, instance_name, multimaster, target_node_uuid = params 431 disks = [objects.Disk.FromDict(cf) for cf in disks] 432 return backend.DrbdAttachNet(target_node_uuid, nodes_ip, disks, 433 instance_name, multimaster)
434 435 @staticmethod
436 - def perspective_drbd_wait_sync(params):
437 """Wait until DRBD disks are synched. 438 439 Note that this is only valid for drbd disks, so the members of the 440 disk list must all be drbd devices. 441 442 """ 443 nodes_ip, disks, target_node_uuid = params 444 disks = [objects.Disk.FromDict(cf) for cf in disks] 445 return backend.DrbdWaitSync(target_node_uuid, nodes_ip, disks)
446 447 @staticmethod
449 """Checks if the drbd devices need activation 450 451 Note that this is only valid for drbd disks, so the members of the 452 disk list must all be drbd devices. 453 454 """ 455 nodes_ip, disks, target_node_uuid = params 456 disks = [objects.Disk.FromDict(cf) for cf in disks] 457 return backend.DrbdNeedsActivation(target_node_uuid, nodes_ip, disks)
458 459 @staticmethod
460 - def perspective_drbd_helper(params):
461 """Query drbd helper. 462 463 """ 464 return backend.GetDrbdUsermodeHelper()
465 466 # export/import -------------------------- 467 468 @staticmethod
469 - def perspective_finalize_export(params):
470 """Expose the finalize export functionality. 471 472 """ 473 instance = objects.Instance.FromDict(params[0]) 474 475 snap_disks = [] 476 for disk in params[1]: 477 if isinstance(disk, bool): 478 snap_disks.append(disk) 479 else: 480 snap_disks.append(objects.Disk.FromDict(disk)) 481 482 return backend.FinalizeExport(instance, snap_disks)
483 484 @staticmethod
485 - def perspective_export_info(params):
486 """Query information about an existing export on this node. 487 488 The given path may not contain an export, in which case we return 489 None. 490 491 """ 492 path = params[0] 493 return backend.ExportInfo(path)
494 495 @staticmethod
496 - def perspective_export_list(params):
497 """List the available exports on this node. 498 499 Note that as opposed to export_info, which may query data about an 500 export in any path, this only queries the standard Ganeti path 501 (pathutils.EXPORT_DIR). 502 503 """ 504 return backend.ListExports()
505 506 @staticmethod
507 - def perspective_export_remove(params):
508 """Remove an export. 509 510 """ 511 export = params[0] 512 return backend.RemoveExport(export)
513 514 # block device --------------------- 515 @staticmethod
516 - def perspective_bdev_sizes(params):
517 """Query the list of block devices 518 519 """ 520 devices = params[0] 521 return backend.GetBlockDevSizes(devices)
522 523 # volume -------------------------- 524 525 @staticmethod
526 - def perspective_lv_list(params):
527 """Query the list of logical volumes in a given volume group. 528 529 """ 530 vgname = params[0] 531 return backend.GetVolumeList(vgname)
532 533 @staticmethod
534 - def perspective_vg_list(params):
535 """Query the list of volume groups. 536 537 """ 538 return backend.ListVolumeGroups()
539 540 # Storage -------------------------- 541 542 @staticmethod
543 - def perspective_storage_list(params):
544 """Get list of storage units. 545 546 """ 547 (su_name, su_args, name, fields) = params 548 return container.GetStorage(su_name, *su_args).List(name, fields)
549 550 @staticmethod
551 - def perspective_storage_modify(params):
552 """Modify a storage unit. 553 554 """ 555 (su_name, su_args, name, changes) = params 556 return container.GetStorage(su_name, *su_args).Modify(name, changes)
557 558 @staticmethod
559 - def perspective_storage_execute(params):
560 """Execute an operation on a storage unit. 561 562 """ 563 (su_name, su_args, name, op) = params 564 return container.GetStorage(su_name, *su_args).Execute(name, op)
565 566 # bridge -------------------------- 567 568 @staticmethod
569 - def perspective_bridges_exist(params):
570 """Check if all bridges given exist on this node. 571 572 """ 573 bridges_list = params[0] 574 return backend.BridgesExist(bridges_list)
575 576 # instance -------------------------- 577 578 @staticmethod
579 - def perspective_instance_os_add(params):
580 """Install an OS on a given instance. 581 582 """ 583 inst_s = params[0] 584 inst = objects.Instance.FromDict(inst_s) 585 reinstall = params[1] 586 debug = params[2] 587 return backend.InstanceOsAdd(inst, reinstall, debug)
588 589 @staticmethod
591 """Runs the OS rename script for an instance. 592 593 """ 594 inst_s, old_name, debug = params 595 inst = objects.Instance.FromDict(inst_s) 596 return backend.RunRenameInstance(inst, old_name, debug)
597 598 @staticmethod
599 - def perspective_instance_shutdown(params):
600 """Shutdown an instance. 601 602 """ 603 instance = objects.Instance.FromDict(params[0]) 604 timeout = params[1] 605 trail = params[2] 606 _extendReasonTrail(trail, "shutdown") 607 return backend.InstanceShutdown(instance, timeout, trail)
608 609 @staticmethod
610 - def perspective_instance_start(params):
611 """Start an instance. 612 613 """ 614 (instance_name, startup_paused, trail) = params 615 instance = objects.Instance.FromDict(instance_name) 616 _extendReasonTrail(trail, "start") 617 return backend.StartInstance(instance, startup_paused, trail)
618 619 @staticmethod
620 - def perspective_migration_info(params):
621 """Gather information about an instance to be migrated. 622 623 """ 624 instance = objects.Instance.FromDict(params[0]) 625 return backend.MigrationInfo(instance)
626 627 @staticmethod
628 - def perspective_accept_instance(params):
629 """Prepare the node to accept an instance. 630 631 """ 632 instance, info, target = params 633 instance = objects.Instance.FromDict(instance) 634 return backend.AcceptInstance(instance, info, target)
635 636 @staticmethod
638 """Finalize the instance migration on the destination node. 639 640 """ 641 instance, info, success = params 642 instance = objects.Instance.FromDict(instance) 643 return backend.FinalizeMigrationDst(instance, info, success)
644 645 @staticmethod
646 - def perspective_instance_migrate(params):
647 """Migrates an instance. 648 649 """ 650 cluster_name, instance, target, live = params 651 instance = objects.Instance.FromDict(instance) 652 return backend.MigrateInstance(cluster_name, instance, target, live)
653 654 @staticmethod
656 """Finalize the instance migration on the source node. 657 658 """ 659 instance, success, live = params 660 instance = objects.Instance.FromDict(instance) 661 return backend.FinalizeMigrationSource(instance, success, live)
662 663 @staticmethod
665 """Reports migration status. 666 667 """ 668 instance = objects.Instance.FromDict(params[0]) 669 return backend.GetMigrationStatus(instance).ToDict()
670 671 @staticmethod
672 - def perspective_instance_reboot(params):
673 """Reboot an instance. 674 675 """ 676 instance = objects.Instance.FromDict(params[0]) 677 reboot_type = params[1] 678 shutdown_timeout = params[2] 679 trail = params[3] 680 _extendReasonTrail(trail, "reboot") 681 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout, 682 trail)
683 684 @staticmethod
686 """Modify instance runtime memory. 687 688 """ 689 instance_dict, memory = params 690 instance = objects.Instance.FromDict(instance_dict) 691 return backend.InstanceBalloonMemory(instance, memory)
692 693 @staticmethod
694 - def perspective_instance_info(params):
695 """Query instance information. 696 697 """ 698 (instance_name, hypervisor_name, hvparams) = params 699 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
700 701 @staticmethod
703 """Query whether the specified instance can be migrated. 704 705 """ 706 instance = objects.Instance.FromDict(params[0]) 707 return backend.GetInstanceMigratable(instance)
708 709 @staticmethod
711 """Query information about all instances. 712 713 """ 714 (hypervisor_list, all_hvparams) = params 715 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
716 717 @staticmethod
718 - def perspective_instance_list(params):
719 """Query the list of running instances. 720 721 """ 722 (hypervisor_list, hvparams) = params 723 return backend.GetInstanceList(hypervisor_list, hvparams)
724 725 # node -------------------------- 726 727 @staticmethod
729 """Checks if a node has the given ip address. 730 731 """ 732 return netutils.IPAddress.Own(params[0])
733 734 @staticmethod
735 - def perspective_node_info(params):
736 """Query node information. 737 738 """ 739 (storage_units, hv_specs) = params 740 return backend.GetNodeInfo(storage_units, hv_specs)
741 742 @staticmethod
743 - def perspective_etc_hosts_modify(params):
744 """Modify a node entry in /etc/hosts. 745 746 """ 747 backend.EtcHostsModify(params[0], params[1], params[2]) 748 749 return True
750 751 @staticmethod
752 - def perspective_node_verify(params):
753 """Run a verify sequence on this node. 754 755 """ 756 (what, cluster_name, hvparams) = params 757 return backend.VerifyNode(what, cluster_name, hvparams)
758 759 @classmethod
760 - def perspective_node_verify_light(cls, params):
761 """Run a light verify sequence on this node. 762 763 """ 764 # So far it's the same as the normal node_verify 765 return cls.perspective_node_verify(params)
766 767 @staticmethod
769 """Start the master daemons on this node. 770 771 """ 772 return backend.StartMasterDaemons(params[0])
773 774 @staticmethod
776 """Activate the master IP on this node. 777 778 """ 779 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 780 return backend.ActivateMasterIp(master_params, params[1])
781 782 @staticmethod
784 """Deactivate the master IP on this node. 785 786 """ 787 master_params = objects.MasterNetworkParameters.FromDict(params[0]) 788 return backend.DeactivateMasterIp(master_params, params[1])
789 790 @staticmethod
791 - def perspective_node_stop_master(params):
792 """Stops master daemons on this node. 793 794 """ 795 return backend.StopMasterDaemons()
796 797 @staticmethod
799 """Change the master IP netmask. 800 801 """ 802 return backend.ChangeMasterNetmask(params[0], params[1], params[2], 803 params[3])
804 805 @staticmethod
807 """Cleanup after leaving a cluster. 808 809 """ 810 return backend.LeaveCluster(params[0])
811 812 @staticmethod
813 - def perspective_node_volumes(params):
814 """Query the list of all logical volume groups. 815 816 """ 817 return backend.NodeVolumes()
818 819 @staticmethod
821 """Demote a node from the master candidate role. 822 823 """ 824 return backend.DemoteFromMC()
825 826 @staticmethod
827 - def perspective_node_powercycle(params):
828 """Tries to powercycle the nod. 829 830 """ 831 (hypervisor_type, hvparams) = params 832 return backend.PowercycleNode(hypervisor_type, hvparams)
833 834 # cluster -------------------------- 835 836 @staticmethod
837 - def perspective_version(params):
838 """Query version information. 839 840 """ 841 return constants.PROTOCOL_VERSION
842 843 @staticmethod
844 - def perspective_upload_file(params):
845 """Upload a file. 846 847 Note that the backend implementation imposes strict rules on which 848 files are accepted. 849 850 """ 851 return backend.UploadFile(*(params[0]))
852 853 @staticmethod
854 - def perspective_master_info(params):
855 """Query master information. 856 857 """ 858 return backend.GetMasterInfo()
859 860 @staticmethod
861 - def perspective_run_oob(params):
862 """Runs oob on node. 863 864 """ 865 output = backend.RunOob(params[0], params[1], params[2], params[3]) 866 if output: 867 result = serializer.LoadJson(output) 868 else: 869 result = None 870 return result
871 872 @staticmethod
874 """Runs a restricted command. 875 876 """ 877 (cmd, ) = params 878 879 return backend.RunRestrictedCmd(cmd)
880 881 @staticmethod
883 """Write ssconf files. 884 885 """ 886 (values,) = params 887 return ssconf.WriteSsconfFiles(values)
888 889 @staticmethod
890 - def perspective_get_watcher_pause(params):
891 """Get watcher pause end. 892 893 """ 894 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
895 896 @staticmethod
897 - def perspective_set_watcher_pause(params):
898 """Set watcher pause. 899 900 """ 901 (until, ) = params 902 return backend.SetWatcherPause(until)
903 904 # os ----------------------- 905 906 @staticmethod
907 - def perspective_os_diagnose(params):
908 """Query detailed information about existing OSes. 909 910 """ 911 return backend.DiagnoseOS()
912 913 @staticmethod
914 - def perspective_os_get(params):
915 """Query information about a given OS. 916 917 """ 918 name = params[0] 919 os_obj = backend.OSFromDisk(name) 920 return os_obj.ToDict()
921 922 @staticmethod
923 - def perspective_os_validate(params):
924 """Run a given OS' validation routine. 925 926 """ 927 required, name, checks, params = params 928 return backend.ValidateOS(required, name, checks, params)
929 930 # extstorage ----------------------- 931 932 @staticmethod
934 """Query detailed information about existing extstorage providers. 935 936 """ 937 return backend.DiagnoseExtStorage()
938 939 # hooks ----------------------- 940 941 @staticmethod
942 - def perspective_hooks_runner(params):
943 """Run hook scripts. 944 945 """ 946 hpath, phase, env = params 947 hr = backend.HooksRunner() 948 return hr.RunHooks(hpath, phase, env)
949 950 # iallocator ----------------- 951 952 @staticmethod
953 - def perspective_iallocator_runner(params):
954 """Run an iallocator script. 955 956 """ 957 name, idata = params 958 iar = backend.IAllocatorRunner() 959 return iar.Run(name, idata)
960 961 # test ----------------------- 962 963 @staticmethod
964 - def perspective_test_delay(params):
965 """Run test delay. 966 967 """ 968 duration = params[0] 969 status, rval = utils.TestDelay(duration) 970 if not status: 971 raise backend.RPCFail(rval) 972 return rval
973 974 # file storage --------------- 975 976 @staticmethod
978 """Create the file storage directory. 979 980 """ 981 file_storage_dir = params[0] 982 return backend.CreateFileStorageDir(file_storage_dir)
983 984 @staticmethod
986 """Remove the file storage directory. 987 988 """ 989 file_storage_dir = params[0] 990 return backend.RemoveFileStorageDir(file_storage_dir)
991 992 @staticmethod
994 """Rename the file storage directory. 995 996 """ 997 old_file_storage_dir = params[0] 998 new_file_storage_dir = params[1] 999 return backend.RenameFileStorageDir(old_file_storage_dir, 1000 new_file_storage_dir)
1001 1002 # jobs ------------------------ 1003 1004 @staticmethod 1005 @_RequireJobQueueLock
1006 - def perspective_jobqueue_update(params):
1007 """Update job queue. 1008 1009 """ 1010 (file_name, content) = params 1011 return backend.JobQueueUpdate(file_name, content)
1012 1013 @staticmethod 1014 @_RequireJobQueueLock
1015 - def perspective_jobqueue_purge(params):
1016 """Purge job queue. 1017 1018 """ 1019 return backend.JobQueuePurge()
1020 1021 @staticmethod 1022 @_RequireJobQueueLock
1023 - def perspective_jobqueue_rename(params):
1024 """Rename a job queue file. 1025 1026 """ 1027 # TODO: What if a file fails to rename? 1028 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1029 1030 @staticmethod 1031 @_RequireJobQueueLock
1033 """Set job queue's drain flag. 1034 1035 """ 1036 (flag, ) = params 1037 1038 return jstore.SetDrainFlag(flag)
1039 1040 # hypervisor --------------- 1041 1042 @staticmethod
1044 """Validate the hypervisor parameters. 1045 1046 """ 1047 (hvname, hvparams) = params 1048 return backend.ValidateHVParams(hvname, hvparams)
1049 1050 # Crypto 1051 1052 @staticmethod
1053 - def perspective_x509_cert_create(params):
1054 """Creates a new X509 certificate for SSL/TLS. 1055 1056 """ 1057 (validity, ) = params 1058 return backend.CreateX509Certificate(validity)
1059 1060 @staticmethod
1061 - def perspective_x509_cert_remove(params):
1062 """Removes a X509 certificate. 1063 1064 """ 1065 (name, ) = params 1066 return backend.RemoveX509Certificate(name)
1067 1068 # Import and export 1069 1070 @staticmethod
1071 - def perspective_import_start(params):
1072 """Starts an import daemon. 1073 1074 """ 1075 (opts_s, instance, component, (dest, dest_args)) = params 1076 1077 opts = objects.ImportExportOptions.FromDict(opts_s) 1078 1079 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, 1080 None, None, 1081 objects.Instance.FromDict(instance), 1082 component, dest, 1083 _DecodeImportExportIO(dest, 1084 dest_args))
1085 1086 @staticmethod
1087 - def perspective_export_start(params):
1088 """Starts an export daemon. 1089 1090 """ 1091 (opts_s, host, port, instance, component, (source, source_args)) = params 1092 1093 opts = objects.ImportExportOptions.FromDict(opts_s) 1094 1095 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, 1096 host, port, 1097 objects.Instance.FromDict(instance), 1098 component, source, 1099 _DecodeImportExportIO(source, 1100 source_args))
1101 1102 @staticmethod
1103 - def perspective_impexp_status(params):
1104 """Retrieves the status of an import or export daemon. 1105 1106 """ 1107 return backend.GetImportExportStatus(params[0])
1108 1109 @staticmethod
1110 - def perspective_impexp_abort(params):
1111 """Aborts an import or export. 1112 1113 """ 1114 return backend.AbortImportExport(params[0])
1115 1116 @staticmethod
1117 - def perspective_impexp_cleanup(params):
1118 """Cleans up after an import or export. 1119 1120 """ 1121 return backend.CleanupImportExport(params[0])
1122
1123 1124 -def CheckNoded(_, args):
1125 """Initial checks whether to run or exit with a failure. 1126 1127 """ 1128 if args: # noded doesn't take any arguments 1129 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % 1130 sys.argv[0]) 1131 sys.exit(constants.EXIT_FAILURE) 1132 try: 1133 codecs.lookup("string-escape") 1134 except LookupError: 1135 print >> sys.stderr, ("Can't load the string-escape code which is part" 1136 " of the Python installation. Is your installation" 1137 " complete/correct? Aborting.") 1138 sys.exit(constants.EXIT_FAILURE)
1139
1140 1141 -def PrepNoded(options, _):
1142 """Preparation node daemon function, executed with the PID file held. 1143 1144 """ 1145 if options.mlock: 1146 request_executor_class = MlockallRequestExecutor 1147 try: 1148 utils.Mlockall() 1149 except errors.NoCtypesError: 1150 logging.warning("Cannot set memory lock, ctypes module not found") 1151 request_executor_class = http.server.HttpServerRequestExecutor 1152 else: 1153 request_executor_class = http.server.HttpServerRequestExecutor 1154 1155 # Read SSL certificate 1156 if options.ssl: 1157 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, 1158 ssl_cert_path=options.ssl_cert) 1159 else: 1160 ssl_params = None 1161 1162 err = _PrepareQueueLock() 1163 if err is not None: 1164 # this might be some kind of file-system/permission error; while 1165 # this breaks the job queue functionality, we shouldn't prevent 1166 # startup of the whole node daemon because of this 1167 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) 1168 1169 handler = NodeRequestHandler() 1170 1171 mainloop = daemon.Mainloop() 1172 server = \ 1173 http.server.HttpServer(mainloop, options.bind_address, options.port, 1174 handler, ssl_params=ssl_params, ssl_verify_peer=True, 1175 request_executor_class=request_executor_class) 1176 server.Start() 1177 1178 return (mainloop, server)
1179
1180 1181 -def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1182 """Main node daemon function, executed with the PID file held. 1183 1184 """ 1185 (mainloop, server) = prep_data 1186 try: 1187 mainloop.Run() 1188 finally: 1189 server.Stop() 1190
1191 1192 -def Main():
1193 """Main function for the node daemon. 1194 1195 """ 1196 parser = OptionParser(description="Ganeti node daemon", 1197 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]" 1198 " [-i INTERFACE]"), 1199 version="%%prog (ganeti) %s" % 1200 constants.RELEASE_VERSION) 1201 parser.add_option("--no-mlock", dest="mlock", 1202 help="Do not mlock the node memory in ram", 1203 default=True, action="store_false") 1204 1205 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, 1206 default_ssl_cert=pathutils.NODED_CERT_FILE, 1207 default_ssl_key=pathutils.NODED_CERT_FILE, 1208 console_logging=True)
1209