1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti node daemon"""
23
24
25
26
27
28
29
30
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server
53
54
55 queue_lock = None
59 """Try to prepare the queue lock.
60
61 @return: None for success, otherwise an exception object
62
63 """
64 global queue_lock
65
66 if queue_lock is not None:
67 return None
68
69
70 try:
71 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72 return None
73 except EnvironmentError, err:
74 return err
75
78 """Decorator for job queue manipulating functions.
79
80 """
81 QUEUE_LOCK_TIMEOUT = 10
82
83 def wrapper(*args, **kwargs):
84
85
86 if _PrepareQueueLock() is not None:
87 raise errors.JobQueueError("Job queue failed initialization,"
88 " cannot update jobs")
89 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90 try:
91 return fn(*args, **kwargs)
92 finally:
93 queue_lock.Unlock()
94
95 return wrapper
96
111
114 """Subclass ensuring request handlers are locked in RAM.
115
116 """
121
124 """The server implementation.
125
126 This class holds all methods exposed over the RPC interface.
127
128 """
129
130
131
135
137 """Handle a request.
138
139 """
140
141 if req.request_method.upper() not in (http.HTTP_PUT, http.HTTP_POST):
142 raise http.HttpBadRequest("Only PUT and POST methods are supported")
143
144 path = req.request_path
145 if path.startswith("/"):
146 path = path[1:]
147
148 method = getattr(self, "perspective_%s" % path, None)
149 if method is None:
150 raise http.HttpNotFound()
151
152 try:
153 result = (True, method(serializer.LoadJson(req.request_body)))
154
155 except backend.RPCFail, err:
156
157
158
159 result = (False, str(err))
160 except errors.QuitGanetiException, err:
161
162 logging.info("Shutting down the node daemon, arguments: %s",
163 str(err.args))
164 os.kill(self.noded_pid, signal.SIGTERM)
165
166
167 result = err.args
168 except Exception, err:
169 logging.exception("Error in RPC call")
170 result = (False, "Error while executing backend function: %s" % str(err))
171
172 return serializer.DumpJson(result)
173
174
175
176 @staticmethod
178 """Create a block device.
179
180 """
181 bdev_s, size, owner, on_primary, info = params
182 bdev = objects.Disk.FromDict(bdev_s)
183 if bdev is None:
184 raise ValueError("can't unserialize data!")
185 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186
187 @staticmethod
195
196 @staticmethod
204
205 @staticmethod
213
214 @staticmethod
221
222 @staticmethod
232
233 @staticmethod
243
244 @staticmethod
246 """Add a child to a mirror device.
247
248 Note: this is only valid for mirror devices. It's the caller's duty
249 to send a correct disk, otherwise we raise an error.
250
251 """
252 bdev_s, ndev_s = params
253 bdev = objects.Disk.FromDict(bdev_s)
254 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
255 if bdev is None or ndevs.count(None) > 0:
256 raise ValueError("can't unserialize data!")
257 return backend.BlockdevAddchildren(bdev, ndevs)
258
259 @staticmethod
261 """Remove a child from a mirror device.
262
263 This is only valid for mirror devices, of course. It's the callers
264 duty to send a correct disk, otherwise we raise an error.
265
266 """
267 bdev_s, ndev_s = params
268 bdev = objects.Disk.FromDict(bdev_s)
269 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
270 if bdev is None or ndevs.count(None) > 0:
271 raise ValueError("can't unserialize data!")
272 return backend.BlockdevRemovechildren(bdev, ndevs)
273
274 @staticmethod
283
284 @staticmethod
286 """Return the mirror status for a list of disks.
287
288 """
289 (node_disks, ) = params
290
291 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
292
293 result = []
294
295 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
296 if success:
297 result.append((success, status.ToDict()))
298 else:
299 result.append((success, status))
300
301 return result
302
303 @staticmethod
305 """Expose the FindBlockDevice functionality for a disk.
306
307 This will try to find but not activate a disk.
308
309 """
310 disk = objects.Disk.FromDict(params[0])
311
312 result = backend.BlockdevFind(disk)
313 if result is None:
314 return None
315
316 return result.ToDict()
317
318 @staticmethod
320 """Create a snapshot device.
321
322 Note that this is only valid for LVM disks, if we get passed
323 something else we raise an exception. The snapshot device can be
324 remove by calling the generic block device remove call.
325
326 """
327 cfbd = objects.Disk.FromDict(params[0])
328 return backend.BlockdevSnapshot(cfbd)
329
330 @staticmethod
339
340 @staticmethod
347
348 @staticmethod
355
356 @staticmethod
358 """Compute the sizes of the given block devices.
359
360 """
361 disk = objects.Disk.FromDict(params[0])
362 dest_node, dest_path, cluster_name = params[1:]
363 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
364
365
366
367 @staticmethod
369 """Disconnects the network connection of drbd disks.
370
371 Note that this is only valid for drbd disks, so the members of the
372 disk list must all be drbd devices.
373
374 """
375 nodes_ip, disks = params
376 disks = [objects.Disk.FromDict(cf) for cf in disks]
377 return backend.DrbdDisconnectNet(nodes_ip, disks)
378
379 @staticmethod
381 """Attaches the network connection of drbd disks.
382
383 Note that this is only valid for drbd disks, so the members of the
384 disk list must all be drbd devices.
385
386 """
387 nodes_ip, disks, instance_name, multimaster = params
388 disks = [objects.Disk.FromDict(cf) for cf in disks]
389 return backend.DrbdAttachNet(nodes_ip, disks,
390 instance_name, multimaster)
391
392 @staticmethod
394 """Wait until DRBD disks are synched.
395
396 Note that this is only valid for drbd disks, so the members of the
397 disk list must all be drbd devices.
398
399 """
400 nodes_ip, disks = params
401 disks = [objects.Disk.FromDict(cf) for cf in disks]
402 return backend.DrbdWaitSync(nodes_ip, disks)
403
404 @staticmethod
410
411
412
413 @staticmethod
428
429 @staticmethod
431 """Query information about an existing export on this node.
432
433 The given path may not contain an export, in which case we return
434 None.
435
436 """
437 path = params[0]
438 return backend.ExportInfo(path)
439
440 @staticmethod
442 """List the available exports on this node.
443
444 Note that as opposed to export_info, which may query data about an
445 export in any path, this only queries the standard Ganeti path
446 (constants.EXPORT_DIR).
447
448 """
449 return backend.ListExports()
450
451 @staticmethod
453 """Remove an export.
454
455 """
456 export = params[0]
457 return backend.RemoveExport(export)
458
459
460 @staticmethod
462 """Query the list of block devices
463
464 """
465 devices = params[0]
466 return backend.GetBlockDevSizes(devices)
467
468
469
470 @staticmethod
472 """Query the list of logical volumes in a given volume group.
473
474 """
475 vgname = params[0]
476 return backend.GetVolumeList(vgname)
477
478 @staticmethod
484
485
486
487 @staticmethod
489 """Get list of storage units.
490
491 """
492 (su_name, su_args, name, fields) = params
493 return storage.GetStorage(su_name, *su_args).List(name, fields)
494
495 @staticmethod
497 """Modify a storage unit.
498
499 """
500 (su_name, su_args, name, changes) = params
501 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
502
503 @staticmethod
505 """Execute an operation on a storage unit.
506
507 """
508 (su_name, su_args, name, op) = params
509 return storage.GetStorage(su_name, *su_args).Execute(name, op)
510
511
512
513 @staticmethod
515 """Check if all bridges given exist on this node.
516
517 """
518 bridges_list = params[0]
519 return backend.BridgesExist(bridges_list)
520
521
522
523 @staticmethod
525 """Install an OS on a given instance.
526
527 """
528 inst_s = params[0]
529 inst = objects.Instance.FromDict(inst_s)
530 reinstall = params[1]
531 debug = params[2]
532 return backend.InstanceOsAdd(inst, reinstall, debug)
533
534 @staticmethod
542
543 @staticmethod
551
552 @staticmethod
560
561 @staticmethod
568
569 @staticmethod
577
578 @staticmethod
586
587 @staticmethod
595
596 @staticmethod
604
605 @staticmethod
612
613 @staticmethod
622
623 @staticmethod
631
632 @staticmethod
638
639 @staticmethod
646
647 @staticmethod
653
654 @staticmethod
660
661
662
663 @staticmethod
665 """Checks if a node has the given ip address.
666
667 """
668 return netutils.IPAddress.Own(params[0])
669
670 @staticmethod
672 """Query node information.
673
674 """
675 (vg_names, hv_names) = params
676 return backend.GetNodeInfo(vg_names, hv_names)
677
678 @staticmethod
680 """Modify a node entry in /etc/hosts.
681
682 """
683 backend.EtcHostsModify(params[0], params[1], params[2])
684
685 return True
686
687 @staticmethod
689 """Run a verify sequence on this node.
690
691 """
692 return backend.VerifyNode(params[0], params[1])
693
694 @staticmethod
700
701 @staticmethod
708
709 @staticmethod
716
717 @staticmethod
723
724 @staticmethod
726 """Change the master IP netmask.
727
728 """
729 return backend.ChangeMasterNetmask(params[0], params[1], params[2],
730 params[3])
731
732 @staticmethod
738
739 @staticmethod
741 """Query the list of all logical volume groups.
742
743 """
744 return backend.NodeVolumes()
745
746 @staticmethod
748 """Demote a node from the master candidate role.
749
750 """
751 return backend.DemoteFromMC()
752
753 @staticmethod
755 """Tries to powercycle the nod.
756
757 """
758 hypervisor_type = params[0]
759 return backend.PowercycleNode(hypervisor_type)
760
761
762
763 @staticmethod
769
770 @staticmethod
772 """Upload a file.
773
774 Note that the backend implementation imposes strict rules on which
775 files are accepted.
776
777 """
778 return backend.UploadFile(*(params[0]))
779
780 @staticmethod
786
787 @staticmethod
798
799 @staticmethod
806
807
808
809 @staticmethod
811 """Query detailed information about existing OSes.
812
813 """
814 return backend.DiagnoseOS()
815
816 @staticmethod
824
825 @staticmethod
827 """Run a given OS' validation routine.
828
829 """
830 required, name, checks, params = params
831 return backend.ValidateOS(required, name, checks, params)
832
833
834
835 @staticmethod
837 """Run hook scripts.
838
839 """
840 hpath, phase, env = params
841 hr = backend.HooksRunner()
842 return hr.RunHooks(hpath, phase, env)
843
844
845
846 @staticmethod
854
855
856
857 @staticmethod
859 """Run test delay.
860
861 """
862 duration = params[0]
863 status, rval = utils.TestDelay(duration)
864 if not status:
865 raise backend.RPCFail(rval)
866 return rval
867
868
869
870 @staticmethod
872 """Create the file storage directory.
873
874 """
875 file_storage_dir = params[0]
876 return backend.CreateFileStorageDir(file_storage_dir)
877
878 @staticmethod
880 """Remove the file storage directory.
881
882 """
883 file_storage_dir = params[0]
884 return backend.RemoveFileStorageDir(file_storage_dir)
885
886 @staticmethod
888 """Rename the file storage directory.
889
890 """
891 old_file_storage_dir = params[0]
892 new_file_storage_dir = params[1]
893 return backend.RenameFileStorageDir(old_file_storage_dir,
894 new_file_storage_dir)
895
896
897
898 @staticmethod
899 @_RequireJobQueueLock
901 """Update job queue.
902
903 """
904 (file_name, content) = params
905 return backend.JobQueueUpdate(file_name, content)
906
907 @staticmethod
908 @_RequireJobQueueLock
914
915 @staticmethod
916 @_RequireJobQueueLock
918 """Rename a job queue file.
919
920 """
921
922 return [backend.JobQueueRename(old, new) for old, new in params[0]]
923
924
925
926 @staticmethod
928 """Validate the hypervisor parameters.
929
930 """
931 (hvname, hvparams) = params
932 return backend.ValidateHVParams(hvname, hvparams)
933
934
935
936 @staticmethod
943
944 @staticmethod
951
952
953
954 @staticmethod
956 """Starts an import daemon.
957
958 """
959 (opts_s, instance, component, (dest, dest_args)) = params
960
961 opts = objects.ImportExportOptions.FromDict(opts_s)
962
963 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
964 None, None,
965 objects.Instance.FromDict(instance),
966 component, dest,
967 _DecodeImportExportIO(dest,
968 dest_args))
969
970 @staticmethod
972 """Starts an export daemon.
973
974 """
975 (opts_s, host, port, instance, component, (source, source_args)) = params
976
977 opts = objects.ImportExportOptions.FromDict(opts_s)
978
979 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
980 host, port,
981 objects.Instance.FromDict(instance),
982 component, source,
983 _DecodeImportExportIO(source,
984 source_args))
985
986 @staticmethod
992
993 @staticmethod
999
1000 @staticmethod
1006
1009 """Initial checks whether to run or exit with a failure.
1010
1011 """
1012 if args:
1013 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1014 sys.argv[0])
1015 sys.exit(constants.EXIT_FAILURE)
1016 try:
1017 codecs.lookup("string-escape")
1018 except LookupError:
1019 print >> sys.stderr, ("Can't load the string-escape code which is part"
1020 " of the Python installation. Is your installation"
1021 " complete/correct? Aborting.")
1022 sys.exit(constants.EXIT_FAILURE)
1023
1026 """Preparation node daemon function, executed with the PID file held.
1027
1028 """
1029 if options.mlock:
1030 request_executor_class = MlockallRequestExecutor
1031 try:
1032 utils.Mlockall()
1033 except errors.NoCtypesError:
1034 logging.warning("Cannot set memory lock, ctypes module not found")
1035 request_executor_class = http.server.HttpServerRequestExecutor
1036 else:
1037 request_executor_class = http.server.HttpServerRequestExecutor
1038
1039
1040 if options.ssl:
1041 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1042 ssl_cert_path=options.ssl_cert)
1043 else:
1044 ssl_params = None
1045
1046 err = _PrepareQueueLock()
1047 if err is not None:
1048
1049
1050
1051 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1052
1053 handler = NodeRequestHandler()
1054
1055 mainloop = daemon.Mainloop()
1056 server = \
1057 http.server.HttpServer(mainloop, options.bind_address, options.port,
1058 handler, ssl_params=ssl_params, ssl_verify_peer=True,
1059 request_executor_class=request_executor_class)
1060 server.Start()
1061
1062 return (mainloop, server)
1063
1064
1065 -def ExecNoded(options, args, prep_data):
1066 """Main node daemon function, executed with the PID file held.
1067
1068 """
1069 (mainloop, server) = prep_data
1070 try:
1071 mainloop.Run()
1072 finally:
1073 server.Stop()
1074
1077 """Main function for the node daemon.
1078
1079 """
1080 parser = OptionParser(description="Ganeti node daemon",
1081 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1082 version="%%prog (ganeti) %s" %
1083 constants.RELEASE_VERSION)
1084 parser.add_option("--no-mlock", dest="mlock",
1085 help="Do not mlock the node memory in ram",
1086 default=True, action="store_false")
1087
1088 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1089 default_ssl_cert=constants.NODED_CERT_FILE,
1090 default_ssl_key=constants.NODED_CERT_FILE,
1091 console_logging=True)
1092