1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti node daemon"""
23
24
25
26
27
28
29
30
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server
53
54
55 queue_lock = None
59 """Try to prepare the queue lock.
60
61 @return: None for success, otherwise an exception object
62
63 """
64 global queue_lock
65
66 if queue_lock is not None:
67 return None
68
69
70 try:
71 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72 return None
73 except EnvironmentError, err:
74 return err
75
78 """Decorator for job queue manipulating functions.
79
80 """
81 QUEUE_LOCK_TIMEOUT = 10
82
83 def wrapper(*args, **kwargs):
84
85
86 if _PrepareQueueLock() is not None:
87 raise errors.JobQueueError("Job queue failed initialization,"
88 " cannot update jobs")
89 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90 try:
91 return fn(*args, **kwargs)
92 finally:
93 queue_lock.Unlock()
94
95 return wrapper
96
111
114 """Custom Request Executor class that ensures NodeHttpServer children are
115 locked in ram.
116
117 """
122
125 """The server implementation.
126
127 This class holds all methods exposed over the RPC interface.
128
129 """
130
131
132
136
138 """Handle a request.
139
140 """
141
142 if req.request_method.upper() not in (http.HTTP_PUT, http.HTTP_POST):
143 raise http.HttpBadRequest("Only PUT and POST methods are supported")
144
145 path = req.request_path
146 if path.startswith("/"):
147 path = path[1:]
148
149 method = getattr(self, "perspective_%s" % path, None)
150 if method is None:
151 raise http.HttpNotFound()
152
153 try:
154 result = (True, method(serializer.LoadJson(req.request_body)))
155
156 except backend.RPCFail, err:
157
158
159
160 result = (False, str(err))
161 except errors.QuitGanetiException, err:
162
163 logging.info("Shutting down the node daemon, arguments: %s",
164 str(err.args))
165 os.kill(self.noded_pid, signal.SIGTERM)
166
167
168 result = err.args
169 except Exception, err:
170 logging.exception("Error in RPC call")
171 result = (False, "Error while executing backend function: %s" % str(err))
172
173 return serializer.DumpJson(result, indent=False)
174
175
176
177 @staticmethod
179 """Create a block device.
180
181 """
182 bdev_s, size, owner, on_primary, info = params
183 bdev = objects.Disk.FromDict(bdev_s)
184 if bdev is None:
185 raise ValueError("can't unserialize data!")
186 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
187
188 @staticmethod
196
197 @staticmethod
205
206 @staticmethod
214
215 @staticmethod
222
223 @staticmethod
233
234 @staticmethod
244
245 @staticmethod
247 """Add a child to a mirror device.
248
249 Note: this is only valid for mirror devices. It's the caller's duty
250 to send a correct disk, otherwise we raise an error.
251
252 """
253 bdev_s, ndev_s = params
254 bdev = objects.Disk.FromDict(bdev_s)
255 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
256 if bdev is None or ndevs.count(None) > 0:
257 raise ValueError("can't unserialize data!")
258 return backend.BlockdevAddchildren(bdev, ndevs)
259
260 @staticmethod
262 """Remove a child from a mirror device.
263
264 This is only valid for mirror devices, of course. It's the callers
265 duty to send a correct disk, otherwise we raise an error.
266
267 """
268 bdev_s, ndev_s = params
269 bdev = objects.Disk.FromDict(bdev_s)
270 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
271 if bdev is None or ndevs.count(None) > 0:
272 raise ValueError("can't unserialize data!")
273 return backend.BlockdevRemovechildren(bdev, ndevs)
274
275 @staticmethod
284
285 @staticmethod
287 """Return the mirror status for a list of disks.
288
289 """
290 (node_disks, ) = params
291
292 node_name = netutils.Hostname.GetSysName()
293
294 disks = [objects.Disk.FromDict(dsk_s)
295 for dsk_s in node_disks.get(node_name, [])]
296
297 result = []
298
299 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
300 if success:
301 result.append((success, status.ToDict()))
302 else:
303 result.append((success, status))
304
305 return result
306
307 @staticmethod
309 """Expose the FindBlockDevice functionality for a disk.
310
311 This will try to find but not activate a disk.
312
313 """
314 disk = objects.Disk.FromDict(params[0])
315
316 result = backend.BlockdevFind(disk)
317 if result is None:
318 return None
319
320 return result.ToDict()
321
322 @staticmethod
324 """Create a snapshot device.
325
326 Note that this is only valid for LVM disks, if we get passed
327 something else we raise an exception. The snapshot device can be
328 remove by calling the generic block device remove call.
329
330 """
331 cfbd = objects.Disk.FromDict(params[0])
332 return backend.BlockdevSnapshot(cfbd)
333
334 @staticmethod
343
344 @staticmethod
351
352 @staticmethod
359
360 @staticmethod
362 """Compute the sizes of the given block devices.
363
364 """
365 disk = objects.Disk.FromDict(params[0])
366 dest_node, dest_path, cluster_name = params[1:]
367 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
368
369
370
371 @staticmethod
373 """Disconnects the network connection of drbd disks.
374
375 Note that this is only valid for drbd disks, so the members of the
376 disk list must all be drbd devices.
377
378 """
379 nodes_ip, disks = params
380 disks = [objects.Disk.FromDict(cf) for cf in disks]
381 return backend.DrbdDisconnectNet(nodes_ip, disks)
382
383 @staticmethod
385 """Attaches the network connection of drbd disks.
386
387 Note that this is only valid for drbd disks, so the members of the
388 disk list must all be drbd devices.
389
390 """
391 nodes_ip, disks, instance_name, multimaster = params
392 disks = [objects.Disk.FromDict(cf) for cf in disks]
393 return backend.DrbdAttachNet(nodes_ip, disks,
394 instance_name, multimaster)
395
396 @staticmethod
398 """Wait until DRBD disks are synched.
399
400 Note that this is only valid for drbd disks, so the members of the
401 disk list must all be drbd devices.
402
403 """
404 nodes_ip, disks = params
405 disks = [objects.Disk.FromDict(cf) for cf in disks]
406 return backend.DrbdWaitSync(nodes_ip, disks)
407
408 @staticmethod
414
415
416
417 @staticmethod
432
433 @staticmethod
435 """Query information about an existing export on this node.
436
437 The given path may not contain an export, in which case we return
438 None.
439
440 """
441 path = params[0]
442 return backend.ExportInfo(path)
443
444 @staticmethod
446 """List the available exports on this node.
447
448 Note that as opposed to export_info, which may query data about an
449 export in any path, this only queries the standard Ganeti path
450 (constants.EXPORT_DIR).
451
452 """
453 return backend.ListExports()
454
455 @staticmethod
457 """Remove an export.
458
459 """
460 export = params[0]
461 return backend.RemoveExport(export)
462
463
464 @staticmethod
466 """Query the list of block devices
467
468 """
469 devices = params[0]
470 return backend.GetBlockDevSizes(devices)
471
472
473
474 @staticmethod
476 """Query the list of logical volumes in a given volume group.
477
478 """
479 vgname = params[0]
480 return backend.GetVolumeList(vgname)
481
482 @staticmethod
488
489
490
491 @staticmethod
493 """Get list of storage units.
494
495 """
496 (su_name, su_args, name, fields) = params
497 return storage.GetStorage(su_name, *su_args).List(name, fields)
498
499 @staticmethod
501 """Modify a storage unit.
502
503 """
504 (su_name, su_args, name, changes) = params
505 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
506
507 @staticmethod
509 """Execute an operation on a storage unit.
510
511 """
512 (su_name, su_args, name, op) = params
513 return storage.GetStorage(su_name, *su_args).Execute(name, op)
514
515
516
517 @staticmethod
519 """Check if all bridges given exist on this node.
520
521 """
522 bridges_list = params[0]
523 return backend.BridgesExist(bridges_list)
524
525
526
527 @staticmethod
529 """Install an OS on a given instance.
530
531 """
532 inst_s = params[0]
533 inst = objects.Instance.FromDict(inst_s)
534 reinstall = params[1]
535 debug = params[2]
536 return backend.InstanceOsAdd(inst, reinstall, debug)
537
538 @staticmethod
546
547 @staticmethod
555
556 @staticmethod
564
565 @staticmethod
572
573 @staticmethod
581
582 @staticmethod
590
591 @staticmethod
599
600 @staticmethod
609
610 @staticmethod
616
617 @staticmethod
624
625 @staticmethod
631
632 @staticmethod
638
639
640
641 @staticmethod
643 """Do a TcpPing on the remote node.
644
645 """
646 return netutils.TcpPing(params[1], params[2], timeout=params[3],
647 live_port_needed=params[4], source=params[0])
648
649 @staticmethod
651 """Checks if a node has the given ip address.
652
653 """
654 return netutils.IPAddress.Own(params[0])
655
656 @staticmethod
658 """Query node information.
659
660 """
661 vgname, hypervisor_type = params
662 return backend.GetNodeInfo(vgname, hypervisor_type)
663
664 @staticmethod
666 """Modify a node entry in /etc/hosts.
667
668 """
669 backend.EtcHostsModify(params[0], params[1], params[2])
670
671 return True
672
673 @staticmethod
675 """Run a verify sequence on this node.
676
677 """
678 return backend.VerifyNode(params[0], params[1])
679
680 @staticmethod
682 """Promote this node to master status.
683
684 """
685 return backend.StartMaster(params[0], params[1])
686
687 @staticmethod
689 """Demote this node from master status.
690
691 """
692 return backend.StopMaster(params[0])
693
694 @staticmethod
700
701 @staticmethod
703 """Query the list of all logical volume groups.
704
705 """
706 return backend.NodeVolumes()
707
708 @staticmethod
710 """Demote a node from the master candidate role.
711
712 """
713 return backend.DemoteFromMC()
714
715 @staticmethod
717 """Tries to powercycle the nod.
718
719 """
720 hypervisor_type = params[0]
721 return backend.PowercycleNode(hypervisor_type)
722
723
724
725 @staticmethod
731
732 @staticmethod
734 """Upload a file.
735
736 Note that the backend implementation imposes strict rules on which
737 files are accepted.
738
739 """
740 return backend.UploadFile(*params)
741
742 @staticmethod
748
749 @staticmethod
760
761 @staticmethod
768
769
770
771 @staticmethod
773 """Query detailed information about existing OSes.
774
775 """
776 return backend.DiagnoseOS()
777
778 @staticmethod
780 """Query information about a given OS.
781
782 """
783 name = params[0]
784 os_obj = backend.OSFromDisk(name)
785 return os_obj.ToDict()
786
787 @staticmethod
789 """Run a given OS' validation routine.
790
791 """
792 required, name, checks, params = params
793 return backend.ValidateOS(required, name, checks, params)
794
795
796
797 @staticmethod
799 """Run hook scripts.
800
801 """
802 hpath, phase, env = params
803 hr = backend.HooksRunner()
804 return hr.RunHooks(hpath, phase, env)
805
806
807
808 @staticmethod
810 """Run an iallocator script.
811
812 """
813 name, idata = params
814 iar = backend.IAllocatorRunner()
815 return iar.Run(name, idata)
816
817
818
819 @staticmethod
821 """Run test delay.
822
823 """
824 duration = params[0]
825 status, rval = utils.TestDelay(duration)
826 if not status:
827 raise backend.RPCFail(rval)
828 return rval
829
830
831
832 @staticmethod
834 """Create the file storage directory.
835
836 """
837 file_storage_dir = params[0]
838 return backend.CreateFileStorageDir(file_storage_dir)
839
840 @staticmethod
842 """Remove the file storage directory.
843
844 """
845 file_storage_dir = params[0]
846 return backend.RemoveFileStorageDir(file_storage_dir)
847
848 @staticmethod
850 """Rename the file storage directory.
851
852 """
853 old_file_storage_dir = params[0]
854 new_file_storage_dir = params[1]
855 return backend.RenameFileStorageDir(old_file_storage_dir,
856 new_file_storage_dir)
857
858
859
860 @staticmethod
861 @_RequireJobQueueLock
863 """Update job queue.
864
865 """
866 (file_name, content) = params
867 return backend.JobQueueUpdate(file_name, content)
868
869 @staticmethod
870 @_RequireJobQueueLock
876
877 @staticmethod
878 @_RequireJobQueueLock
880 """Rename a job queue file.
881
882 """
883
884 return [backend.JobQueueRename(old, new) for old, new in params]
885
886
887
888 @staticmethod
890 """Validate the hypervisor parameters.
891
892 """
893 (hvname, hvparams) = params
894 return backend.ValidateHVParams(hvname, hvparams)
895
896
897
898 @staticmethod
905
906 @staticmethod
913
914
915
916 @staticmethod
918 """Starts an import daemon.
919
920 """
921 (opts_s, instance, component, dest, dest_args) = params
922
923 opts = objects.ImportExportOptions.FromDict(opts_s)
924
925 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
926 None, None,
927 objects.Instance.FromDict(instance),
928 component, dest,
929 _DecodeImportExportIO(dest,
930 dest_args))
931
932 @staticmethod
934 """Starts an export daemon.
935
936 """
937 (opts_s, host, port, instance, component, source, source_args) = params
938
939 opts = objects.ImportExportOptions.FromDict(opts_s)
940
941 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
942 host, port,
943 objects.Instance.FromDict(instance),
944 component, source,
945 _DecodeImportExportIO(source,
946 source_args))
947
948 @staticmethod
954
955 @staticmethod
961
962 @staticmethod
968
971 """Initial checks whether to run or exit with a failure.
972
973 """
974 if args:
975 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
976 sys.argv[0])
977 sys.exit(constants.EXIT_FAILURE)
978 try:
979 codecs.lookup("string-escape")
980 except LookupError:
981 print >> sys.stderr, ("Can't load the string-escape code which is part"
982 " of the Python installation. Is your installation"
983 " complete/correct? Aborting.")
984 sys.exit(constants.EXIT_FAILURE)
985
988 """Preparation node daemon function, executed with the PID file held.
989
990 """
991 if options.mlock:
992 request_executor_class = MlockallRequestExecutor
993 try:
994 utils.Mlockall()
995 except errors.NoCtypesError:
996 logging.warning("Cannot set memory lock, ctypes module not found")
997 request_executor_class = http.server.HttpServerRequestExecutor
998 else:
999 request_executor_class = http.server.HttpServerRequestExecutor
1000
1001
1002 if options.ssl:
1003 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1004 ssl_cert_path=options.ssl_cert)
1005 else:
1006 ssl_params = None
1007
1008 err = _PrepareQueueLock()
1009 if err is not None:
1010
1011
1012
1013 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1014
1015 mainloop = daemon.Mainloop()
1016 server = NodeHttpServer(mainloop, options.bind_address, options.port,
1017 ssl_params=ssl_params, ssl_verify_peer=True,
1018 request_executor_class=request_executor_class)
1019 server.Start()
1020 return (mainloop, server)
1021
1022
1023 -def ExecNoded(options, args, prep_data):
1024 """Main node daemon function, executed with the PID file held.
1025
1026 """
1027 (mainloop, server) = prep_data
1028 try:
1029 mainloop.Run()
1030 finally:
1031 server.Stop()
1032
1035 """Main function for the node daemon.
1036
1037 """
1038 parser = OptionParser(description="Ganeti node daemon",
1039 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1040 version="%%prog (ganeti) %s" %
1041 constants.RELEASE_VERSION)
1042 parser.add_option("--no-mlock", dest="mlock",
1043 help="Do not mlock the node memory in ram",
1044 default=True, action="store_false")
1045
1046 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1047 default_ssl_cert=constants.NODED_CERT_FILE,
1048 default_ssl_key=constants.NODED_CERT_FILE,
1049 console_logging=True)
1050