1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti node daemon"""
23
24
25
26
27
28
29
30
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server
53
54
55 queue_lock = None
59 """Try to prepare the queue lock.
60
61 @return: None for success, otherwise an exception object
62
63 """
64 global queue_lock
65
66 if queue_lock is not None:
67 return None
68
69
70 try:
71 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72 return None
73 except EnvironmentError, err:
74 return err
75
78 """Decorator for job queue manipulating functions.
79
80 """
81 QUEUE_LOCK_TIMEOUT = 10
82
83 def wrapper(*args, **kwargs):
84
85
86 if _PrepareQueueLock() is not None:
87 raise errors.JobQueueError("Job queue failed initialization,"
88 " cannot update jobs")
89 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90 try:
91 return fn(*args, **kwargs)
92 finally:
93 queue_lock.Unlock()
94
95 return wrapper
96
111
114 """Custom Request Executor class that ensures NodeHttpServer children are
115 locked in ram.
116
117 """
122
125 """The server implementation.
126
127 This class holds all methods exposed over the RPC interface.
128
129 """
130
131
132
136
138 """Handle a request.
139
140 """
141 if req.request_method.upper() != http.HTTP_PUT:
142 raise http.HttpBadRequest()
143
144 path = req.request_path
145 if path.startswith("/"):
146 path = path[1:]
147
148 method = getattr(self, "perspective_%s" % path, None)
149 if method is None:
150 raise http.HttpNotFound()
151
152 try:
153 result = (True, method(serializer.LoadJson(req.request_body)))
154
155 except backend.RPCFail, err:
156
157
158
159 result = (False, str(err))
160 except errors.QuitGanetiException, err:
161
162 logging.info("Shutting down the node daemon, arguments: %s",
163 str(err.args))
164 os.kill(self.noded_pid, signal.SIGTERM)
165
166
167 result = err.args
168 except Exception, err:
169 logging.exception("Error in RPC call")
170 result = (False, "Error while executing backend function: %s" % str(err))
171
172 return serializer.DumpJson(result, indent=False)
173
174
175
176 @staticmethod
178 """Create a block device.
179
180 """
181 bdev_s, size, owner, on_primary, info = params
182 bdev = objects.Disk.FromDict(bdev_s)
183 if bdev is None:
184 raise ValueError("can't unserialize data!")
185 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186
187 @staticmethod
195
196 @staticmethod
204
205 @staticmethod
213
214 @staticmethod
221
222 @staticmethod
232
233 @staticmethod
243
244 @staticmethod
246 """Add a child to a mirror device.
247
248 Note: this is only valid for mirror devices. It's the caller's duty
249 to send a correct disk, otherwise we raise an error.
250
251 """
252 bdev_s, ndev_s = params
253 bdev = objects.Disk.FromDict(bdev_s)
254 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
255 if bdev is None or ndevs.count(None) > 0:
256 raise ValueError("can't unserialize data!")
257 return backend.BlockdevAddchildren(bdev, ndevs)
258
259 @staticmethod
261 """Remove a child from a mirror device.
262
263 This is only valid for mirror devices, of course. It's the callers
264 duty to send a correct disk, otherwise we raise an error.
265
266 """
267 bdev_s, ndev_s = params
268 bdev = objects.Disk.FromDict(bdev_s)
269 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
270 if bdev is None or ndevs.count(None) > 0:
271 raise ValueError("can't unserialize data!")
272 return backend.BlockdevRemovechildren(bdev, ndevs)
273
274 @staticmethod
283
284 @staticmethod
286 """Return the mirror status for a list of disks.
287
288 """
289 (node_disks, ) = params
290
291 node_name = netutils.Hostname.GetSysName()
292
293 disks = [objects.Disk.FromDict(dsk_s)
294 for dsk_s in node_disks.get(node_name, [])]
295
296 result = []
297
298 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
299 if success:
300 result.append((success, status.ToDict()))
301 else:
302 result.append((success, status))
303
304 return result
305
306 @staticmethod
308 """Expose the FindBlockDevice functionality for a disk.
309
310 This will try to find but not activate a disk.
311
312 """
313 disk = objects.Disk.FromDict(params[0])
314
315 result = backend.BlockdevFind(disk)
316 if result is None:
317 return None
318
319 return result.ToDict()
320
321 @staticmethod
323 """Create a snapshot device.
324
325 Note that this is only valid for LVM disks, if we get passed
326 something else we raise an exception. The snapshot device can be
327 remove by calling the generic block device remove call.
328
329 """
330 cfbd = objects.Disk.FromDict(params[0])
331 return backend.BlockdevSnapshot(cfbd)
332
333 @staticmethod
341
342 @staticmethod
349
350 @staticmethod
357
358 @staticmethod
360 """Compute the sizes of the given block devices.
361
362 """
363 disk = objects.Disk.FromDict(params[0])
364 dest_node, dest_path, cluster_name = params[1:]
365 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
366
367
368
369 @staticmethod
371 """Disconnects the network connection of drbd disks.
372
373 Note that this is only valid for drbd disks, so the members of the
374 disk list must all be drbd devices.
375
376 """
377 nodes_ip, disks = params
378 disks = [objects.Disk.FromDict(cf) for cf in disks]
379 return backend.DrbdDisconnectNet(nodes_ip, disks)
380
381 @staticmethod
383 """Attaches the network connection of drbd disks.
384
385 Note that this is only valid for drbd disks, so the members of the
386 disk list must all be drbd devices.
387
388 """
389 nodes_ip, disks, instance_name, multimaster = params
390 disks = [objects.Disk.FromDict(cf) for cf in disks]
391 return backend.DrbdAttachNet(nodes_ip, disks,
392 instance_name, multimaster)
393
394 @staticmethod
396 """Wait until DRBD disks are synched.
397
398 Note that this is only valid for drbd disks, so the members of the
399 disk list must all be drbd devices.
400
401 """
402 nodes_ip, disks = params
403 disks = [objects.Disk.FromDict(cf) for cf in disks]
404 return backend.DrbdWaitSync(nodes_ip, disks)
405
406 @staticmethod
412
413
414
415 @staticmethod
430
431 @staticmethod
433 """Query information about an existing export on this node.
434
435 The given path may not contain an export, in which case we return
436 None.
437
438 """
439 path = params[0]
440 return backend.ExportInfo(path)
441
442 @staticmethod
444 """List the available exports on this node.
445
446 Note that as opposed to export_info, which may query data about an
447 export in any path, this only queries the standard Ganeti path
448 (constants.EXPORT_DIR).
449
450 """
451 return backend.ListExports()
452
453 @staticmethod
455 """Remove an export.
456
457 """
458 export = params[0]
459 return backend.RemoveExport(export)
460
461
462
463 @staticmethod
465 """Query the list of logical volumes in a given volume group.
466
467 """
468 vgname = params[0]
469 return backend.GetVolumeList(vgname)
470
471 @staticmethod
477
478
479
480 @staticmethod
482 """Get list of storage units.
483
484 """
485 (su_name, su_args, name, fields) = params
486 return storage.GetStorage(su_name, *su_args).List(name, fields)
487
488 @staticmethod
490 """Modify a storage unit.
491
492 """
493 (su_name, su_args, name, changes) = params
494 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
495
496 @staticmethod
498 """Execute an operation on a storage unit.
499
500 """
501 (su_name, su_args, name, op) = params
502 return storage.GetStorage(su_name, *su_args).Execute(name, op)
503
504
505
506 @staticmethod
508 """Check if all bridges given exist on this node.
509
510 """
511 bridges_list = params[0]
512 return backend.BridgesExist(bridges_list)
513
514
515
516 @staticmethod
518 """Install an OS on a given instance.
519
520 """
521 inst_s = params[0]
522 inst = objects.Instance.FromDict(inst_s)
523 reinstall = params[1]
524 debug = params[2]
525 return backend.InstanceOsAdd(inst, reinstall, debug)
526
527 @staticmethod
535
536 @staticmethod
544
545 @staticmethod
552
553 @staticmethod
560
561 @staticmethod
569
570 @staticmethod
578
579 @staticmethod
587
588 @staticmethod
597
598 @staticmethod
604
605 @staticmethod
612
613 @staticmethod
619
620 @staticmethod
626
627
628
629 @staticmethod
631 """Do a TcpPing on the remote node.
632
633 """
634 return netutils.TcpPing(params[1], params[2], timeout=params[3],
635 live_port_needed=params[4], source=params[0])
636
637 @staticmethod
639 """Checks if a node has the given ip address.
640
641 """
642 return netutils.IPAddress.Own(params[0])
643
644 @staticmethod
646 """Query node information.
647
648 """
649 vgname, hypervisor_type = params
650 return backend.GetNodeInfo(vgname, hypervisor_type)
651
652 @staticmethod
654 """Modify a node entry in /etc/hosts.
655
656 """
657 backend.EtcHostsModify(params[0], params[1], params[2])
658
659 return True
660
661 @staticmethod
663 """Run a verify sequence on this node.
664
665 """
666 return backend.VerifyNode(params[0], params[1])
667
668 @staticmethod
670 """Promote this node to master status.
671
672 """
673 return backend.StartMaster(params[0], params[1])
674
675 @staticmethod
677 """Demote this node from master status.
678
679 """
680 return backend.StopMaster(params[0])
681
682 @staticmethod
688
689 @staticmethod
691 """Query the list of all logical volume groups.
692
693 """
694 return backend.NodeVolumes()
695
696 @staticmethod
698 """Demote a node from the master candidate role.
699
700 """
701 return backend.DemoteFromMC()
702
703
704 @staticmethod
706 """Tries to powercycle the nod.
707
708 """
709 hypervisor_type = params[0]
710 return backend.PowercycleNode(hypervisor_type)
711
712
713
714
715 @staticmethod
721
722 @staticmethod
724 """Upload a file.
725
726 Note that the backend implementation imposes strict rules on which
727 files are accepted.
728
729 """
730 return backend.UploadFile(*params)
731
732 @staticmethod
738
739 @staticmethod
750
751 @staticmethod
758
759
760
761 @staticmethod
763 """Query detailed information about existing OSes.
764
765 """
766 return backend.DiagnoseOS()
767
768 @staticmethod
770 """Query information about a given OS.
771
772 """
773 name = params[0]
774 os_obj = backend.OSFromDisk(name)
775 return os_obj.ToDict()
776
777 @staticmethod
779 """Run a given OS' validation routine.
780
781 """
782 required, name, checks, params = params
783 return backend.ValidateOS(required, name, checks, params)
784
785
786
787 @staticmethod
789 """Run hook scripts.
790
791 """
792 hpath, phase, env = params
793 hr = backend.HooksRunner()
794 return hr.RunHooks(hpath, phase, env)
795
796
797
798 @staticmethod
800 """Run an iallocator script.
801
802 """
803 name, idata = params
804 iar = backend.IAllocatorRunner()
805 return iar.Run(name, idata)
806
807
808
809 @staticmethod
811 """Run test delay.
812
813 """
814 duration = params[0]
815 status, rval = utils.TestDelay(duration)
816 if not status:
817 raise backend.RPCFail(rval)
818 return rval
819
820
821
822 @staticmethod
824 """Create the file storage directory.
825
826 """
827 file_storage_dir = params[0]
828 return backend.CreateFileStorageDir(file_storage_dir)
829
830 @staticmethod
832 """Remove the file storage directory.
833
834 """
835 file_storage_dir = params[0]
836 return backend.RemoveFileStorageDir(file_storage_dir)
837
838 @staticmethod
840 """Rename the file storage directory.
841
842 """
843 old_file_storage_dir = params[0]
844 new_file_storage_dir = params[1]
845 return backend.RenameFileStorageDir(old_file_storage_dir,
846 new_file_storage_dir)
847
848
849
850 @staticmethod
851 @_RequireJobQueueLock
853 """Update job queue.
854
855 """
856 (file_name, content) = params
857 return backend.JobQueueUpdate(file_name, content)
858
859 @staticmethod
860 @_RequireJobQueueLock
866
867 @staticmethod
868 @_RequireJobQueueLock
870 """Rename a job queue file.
871
872 """
873
874 return [backend.JobQueueRename(old, new) for old, new in params]
875
876
877
878 @staticmethod
880 """Validate the hypervisor parameters.
881
882 """
883 (hvname, hvparams) = params
884 return backend.ValidateHVParams(hvname, hvparams)
885
886
887
888 @staticmethod
895
896 @staticmethod
903
904
905
906 @staticmethod
908 """Starts an import daemon.
909
910 """
911 (opts_s, instance, dest, dest_args) = params
912
913 opts = objects.ImportExportOptions.FromDict(opts_s)
914
915 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
916 None, None,
917 objects.Instance.FromDict(instance),
918 dest,
919 _DecodeImportExportIO(dest,
920 dest_args))
921
922 @staticmethod
924 """Starts an export daemon.
925
926 """
927 (opts_s, host, port, instance, source, source_args) = params
928
929 opts = objects.ImportExportOptions.FromDict(opts_s)
930
931 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
932 host, port,
933 objects.Instance.FromDict(instance),
934 source,
935 _DecodeImportExportIO(source,
936 source_args))
937
938 @staticmethod
944
945 @staticmethod
951
952 @staticmethod
958
961 """Initial checks whether to run or exit with a failure.
962
963 """
964 if args:
965 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
966 sys.argv[0])
967 sys.exit(constants.EXIT_FAILURE)
968 try:
969 codecs.lookup("string-escape")
970 except LookupError:
971 print >> sys.stderr, ("Can't load the string-escape code which is part"
972 " of the Python installation. Is your installation"
973 " complete/correct? Aborting.")
974 sys.exit(constants.EXIT_FAILURE)
975
978 """Preparation node daemon function, executed with the PID file held.
979
980 """
981 if options.mlock:
982 request_executor_class = MlockallRequestExecutor
983 try:
984 utils.Mlockall()
985 except errors.NoCtypesError:
986 logging.warning("Cannot set memory lock, ctypes module not found")
987 request_executor_class = http.server.HttpServerRequestExecutor
988 else:
989 request_executor_class = http.server.HttpServerRequestExecutor
990
991
992 if options.ssl:
993 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
994 ssl_cert_path=options.ssl_cert)
995 else:
996 ssl_params = None
997
998 err = _PrepareQueueLock()
999 if err is not None:
1000
1001
1002
1003 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1004
1005 mainloop = daemon.Mainloop()
1006 server = NodeHttpServer(mainloop, options.bind_address, options.port,
1007 ssl_params=ssl_params, ssl_verify_peer=True,
1008 request_executor_class=request_executor_class)
1009 server.Start()
1010 return (mainloop, server)
1011
1012
1013 -def ExecNoded(options, args, prep_data):
1014 """Main node daemon function, executed with the PID file held.
1015
1016 """
1017 (mainloop, server) = prep_data
1018 try:
1019 mainloop.Run()
1020 finally:
1021 server.Stop()
1022
1025 """Main function for the node daemon.
1026
1027 """
1028 parser = OptionParser(description="Ganeti node daemon",
1029 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1030 version="%%prog (ganeti) %s" %
1031 constants.RELEASE_VERSION)
1032 parser.add_option("--no-mlock", dest="mlock",
1033 help="Do not mlock the node memory in ram",
1034 default=True, action="store_false")
1035
1036 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1037 default_ssl_cert=constants.NODED_CERT_FILE,
1038 default_ssl_key=constants.NODED_CERT_FILE,
1039 console_logging=True)
1040