1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti node daemon"""
23
24
25
26
27
28
29
30
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server
55
56
57 queue_lock = None
61 """Try to prepare the queue lock.
62
63 @return: None for success, otherwise an exception object
64
65 """
66 global queue_lock
67
68 if queue_lock is not None:
69 return None
70
71
72 try:
73 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
74 return None
75 except EnvironmentError, err:
76 return err
77
80 """Decorator for job queue manipulating functions.
81
82 """
83 QUEUE_LOCK_TIMEOUT = 10
84
85 def wrapper(*args, **kwargs):
86
87
88 if _PrepareQueueLock() is not None:
89 raise errors.JobQueueError("Job queue failed initialization,"
90 " cannot update jobs")
91 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
92 try:
93 return fn(*args, **kwargs)
94 finally:
95 queue_lock.Unlock()
96
97 return wrapper
98
113
116 """Subclass ensuring request handlers are locked in RAM.
117
118 """
123
126 """The server implementation.
127
128 This class holds all methods exposed over the RPC interface.
129
130 """
131
132
133
137
139 """Handle a request.
140
141 """
142 if req.request_method.upper() != http.HTTP_POST:
143 raise http.HttpBadRequest("Only the POST method is supported")
144
145 path = req.request_path
146 if path.startswith("/"):
147 path = path[1:]
148
149 method = getattr(self, "perspective_%s" % path, None)
150 if method is None:
151 raise http.HttpNotFound()
152
153 try:
154 result = (True, method(serializer.LoadJson(req.request_body)))
155
156 except backend.RPCFail, err:
157
158
159
160 result = (False, str(err))
161 except errors.QuitGanetiException, err:
162
163 logging.info("Shutting down the node daemon, arguments: %s",
164 str(err.args))
165 os.kill(self.noded_pid, signal.SIGTERM)
166
167
168 result = err.args
169 except Exception, err:
170 logging.exception("Error in RPC call")
171 result = (False, "Error while executing backend function: %s" % str(err))
172
173 return serializer.DumpJson(result)
174
175
176
177 @staticmethod
179 """Create a block device.
180
181 """
182 (bdev_s, size, owner, on_primary, info, excl_stor) = params
183 bdev = objects.Disk.FromDict(bdev_s)
184 if bdev is None:
185 raise ValueError("can't unserialize data!")
186 return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
187 excl_stor)
188
189 @staticmethod
197
198 @staticmethod
206
207 @staticmethod
215
216 @staticmethod
223
224 @staticmethod
234
235 @staticmethod
245
246 @staticmethod
248 """Add a child to a mirror device.
249
250 Note: this is only valid for mirror devices. It's the caller's duty
251 to send a correct disk, otherwise we raise an error.
252
253 """
254 bdev_s, ndev_s = params
255 bdev = objects.Disk.FromDict(bdev_s)
256 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
257 if bdev is None or ndevs.count(None) > 0:
258 raise ValueError("can't unserialize data!")
259 return backend.BlockdevAddchildren(bdev, ndevs)
260
261 @staticmethod
263 """Remove a child from a mirror device.
264
265 This is only valid for mirror devices, of course. It's the callers
266 duty to send a correct disk, otherwise we raise an error.
267
268 """
269 bdev_s, ndev_s = params
270 bdev = objects.Disk.FromDict(bdev_s)
271 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
272 if bdev is None or ndevs.count(None) > 0:
273 raise ValueError("can't unserialize data!")
274 return backend.BlockdevRemovechildren(bdev, ndevs)
275
276 @staticmethod
285
286 @staticmethod
288 """Return the mirror status for a list of disks.
289
290 """
291 (node_disks, ) = params
292
293 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
294
295 result = []
296
297 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
298 if success:
299 result.append((success, status.ToDict()))
300 else:
301 result.append((success, status))
302
303 return result
304
305 @staticmethod
307 """Expose the FindBlockDevice functionality for a disk.
308
309 This will try to find but not activate a disk.
310
311 """
312 disk = objects.Disk.FromDict(params[0])
313
314 result = backend.BlockdevFind(disk)
315 if result is None:
316 return None
317
318 return result.ToDict()
319
320 @staticmethod
322 """Create a snapshot device.
323
324 Note that this is only valid for LVM disks, if we get passed
325 something else we raise an exception. The snapshot device can be
326 remove by calling the generic block device remove call.
327
328 """
329 cfbd = objects.Disk.FromDict(params[0])
330 return backend.BlockdevSnapshot(cfbd)
331
332 @staticmethod
334 """Grow a stack of devices.
335
336 """
337 if len(params) < 4:
338 raise ValueError("Received only 3 parameters in blockdev_grow,"
339 " old master?")
340 cfbd = objects.Disk.FromDict(params[0])
341 amount = params[1]
342 dryrun = params[2]
343 backingstore = params[3]
344 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
345
346 @staticmethod
353
354 @staticmethod
361
362 @staticmethod
364 """Compute the sizes of the given block devices.
365
366 """
367 disk = objects.Disk.FromDict(params[0])
368 dest_node, dest_path, cluster_name = params[1:]
369 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
370
371 @staticmethod
379
380
381
382 @staticmethod
384 """Disconnects the network connection of drbd disks.
385
386 Note that this is only valid for drbd disks, so the members of the
387 disk list must all be drbd devices.
388
389 """
390 nodes_ip, disks = params
391 disks = [objects.Disk.FromDict(cf) for cf in disks]
392 return backend.DrbdDisconnectNet(nodes_ip, disks)
393
394 @staticmethod
396 """Attaches the network connection of drbd disks.
397
398 Note that this is only valid for drbd disks, so the members of the
399 disk list must all be drbd devices.
400
401 """
402 nodes_ip, disks, instance_name, multimaster = params
403 disks = [objects.Disk.FromDict(cf) for cf in disks]
404 return backend.DrbdAttachNet(nodes_ip, disks,
405 instance_name, multimaster)
406
407 @staticmethod
409 """Wait until DRBD disks are synched.
410
411 Note that this is only valid for drbd disks, so the members of the
412 disk list must all be drbd devices.
413
414 """
415 nodes_ip, disks = params
416 disks = [objects.Disk.FromDict(cf) for cf in disks]
417 return backend.DrbdWaitSync(nodes_ip, disks)
418
419 @staticmethod
425
426
427
428 @staticmethod
443
444 @staticmethod
446 """Query information about an existing export on this node.
447
448 The given path may not contain an export, in which case we return
449 None.
450
451 """
452 path = params[0]
453 return backend.ExportInfo(path)
454
455 @staticmethod
457 """List the available exports on this node.
458
459 Note that as opposed to export_info, which may query data about an
460 export in any path, this only queries the standard Ganeti path
461 (pathutils.EXPORT_DIR).
462
463 """
464 return backend.ListExports()
465
466 @staticmethod
468 """Remove an export.
469
470 """
471 export = params[0]
472 return backend.RemoveExport(export)
473
474
475 @staticmethod
477 """Query the list of block devices
478
479 """
480 devices = params[0]
481 return backend.GetBlockDevSizes(devices)
482
483
484
485 @staticmethod
487 """Query the list of logical volumes in a given volume group.
488
489 """
490 vgname = params[0]
491 return backend.GetVolumeList(vgname)
492
493 @staticmethod
499
500
501
502 @staticmethod
504 """Get list of storage units.
505
506 """
507 (su_name, su_args, name, fields) = params
508 return storage.GetStorage(su_name, *su_args).List(name, fields)
509
510 @staticmethod
512 """Modify a storage unit.
513
514 """
515 (su_name, su_args, name, changes) = params
516 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
517
518 @staticmethod
520 """Execute an operation on a storage unit.
521
522 """
523 (su_name, su_args, name, op) = params
524 return storage.GetStorage(su_name, *su_args).Execute(name, op)
525
526
527
528 @staticmethod
530 """Check if all bridges given exist on this node.
531
532 """
533 bridges_list = params[0]
534 return backend.BridgesExist(bridges_list)
535
536
537
538 @staticmethod
540 """Install an OS on a given instance.
541
542 """
543 inst_s = params[0]
544 inst = objects.Instance.FromDict(inst_s)
545 reinstall = params[1]
546 debug = params[2]
547 return backend.InstanceOsAdd(inst, reinstall, debug)
548
549 @staticmethod
557
558 @staticmethod
566
567 @staticmethod
575
576 @staticmethod
583
584 @staticmethod
592
593 @staticmethod
601
602 @staticmethod
610
611 @staticmethod
619
620 @staticmethod
627
628 @staticmethod
637
638 @staticmethod
646
647 @staticmethod
653
654 @staticmethod
661
662 @staticmethod
668
669 @staticmethod
675
676
677
678 @staticmethod
680 """Checks if a node has the given ip address.
681
682 """
683 return netutils.IPAddress.Own(params[0])
684
685 @staticmethod
687 """Query node information.
688
689 """
690 (vg_names, hv_names, excl_stor) = params
691 return backend.GetNodeInfo(vg_names, hv_names, excl_stor)
692
693 @staticmethod
695 """Modify a node entry in /etc/hosts.
696
697 """
698 backend.EtcHostsModify(params[0], params[1], params[2])
699
700 return True
701
702 @staticmethod
704 """Run a verify sequence on this node.
705
706 """
707 return backend.VerifyNode(params[0], params[1])
708
709 @classmethod
716
717 @staticmethod
723
724 @staticmethod
731
732 @staticmethod
739
740 @staticmethod
746
747 @staticmethod
749 """Change the master IP netmask.
750
751 """
752 return backend.ChangeMasterNetmask(params[0], params[1], params[2],
753 params[3])
754
755 @staticmethod
761
762 @staticmethod
764 """Query the list of all logical volume groups.
765
766 """
767 return backend.NodeVolumes()
768
769 @staticmethod
771 """Demote a node from the master candidate role.
772
773 """
774 return backend.DemoteFromMC()
775
776 @staticmethod
778 """Tries to powercycle the nod.
779
780 """
781 hypervisor_type = params[0]
782 return backend.PowercycleNode(hypervisor_type)
783
784
785
786 @staticmethod
792
793 @staticmethod
795 """Upload a file.
796
797 Note that the backend implementation imposes strict rules on which
798 files are accepted.
799
800 """
801 return backend.UploadFile(*(params[0]))
802
803 @staticmethod
809
810 @staticmethod
821
822 @staticmethod
830
831 @staticmethod
838
839 @staticmethod
845
846 @staticmethod
853
854
855
856 @staticmethod
858 """Query detailed information about existing OSes.
859
860 """
861 return backend.DiagnoseOS()
862
863 @staticmethod
871
872 @staticmethod
874 """Run a given OS' validation routine.
875
876 """
877 required, name, checks, params = params
878 return backend.ValidateOS(required, name, checks, params)
879
880
881
882 @staticmethod
888
889
890
891 @staticmethod
893 """Run hook scripts.
894
895 """
896 hpath, phase, env = params
897 hr = backend.HooksRunner()
898 return hr.RunHooks(hpath, phase, env)
899
900
901
902 @staticmethod
910
911
912
913 @staticmethod
915 """Run test delay.
916
917 """
918 duration = params[0]
919 status, rval = utils.TestDelay(duration)
920 if not status:
921 raise backend.RPCFail(rval)
922 return rval
923
924
925
926 @staticmethod
928 """Create the file storage directory.
929
930 """
931 file_storage_dir = params[0]
932 return backend.CreateFileStorageDir(file_storage_dir)
933
934 @staticmethod
936 """Remove the file storage directory.
937
938 """
939 file_storage_dir = params[0]
940 return backend.RemoveFileStorageDir(file_storage_dir)
941
942 @staticmethod
944 """Rename the file storage directory.
945
946 """
947 old_file_storage_dir = params[0]
948 new_file_storage_dir = params[1]
949 return backend.RenameFileStorageDir(old_file_storage_dir,
950 new_file_storage_dir)
951
952
953
954 @staticmethod
955 @_RequireJobQueueLock
957 """Update job queue.
958
959 """
960 (file_name, content) = params
961 return backend.JobQueueUpdate(file_name, content)
962
963 @staticmethod
964 @_RequireJobQueueLock
970
971 @staticmethod
972 @_RequireJobQueueLock
974 """Rename a job queue file.
975
976 """
977
978 return [backend.JobQueueRename(old, new) for old, new in params[0]]
979
980 @staticmethod
981 @_RequireJobQueueLock
983 """Set job queue's drain flag.
984
985 """
986 (flag, ) = params
987
988 return jstore.SetDrainFlag(flag)
989
990
991
992 @staticmethod
994 """Validate the hypervisor parameters.
995
996 """
997 (hvname, hvparams) = params
998 return backend.ValidateHVParams(hvname, hvparams)
999
1000
1001
1002 @staticmethod
1004 """Creates a new X509 certificate for SSL/TLS.
1005
1006 """
1007 (validity, ) = params
1008 return backend.CreateX509Certificate(validity)
1009
1010 @staticmethod
1017
1018
1019
1020 @staticmethod
1022 """Starts an import daemon.
1023
1024 """
1025 (opts_s, instance, component, (dest, dest_args)) = params
1026
1027 opts = objects.ImportExportOptions.FromDict(opts_s)
1028
1029 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1030 None, None,
1031 objects.Instance.FromDict(instance),
1032 component, dest,
1033 _DecodeImportExportIO(dest,
1034 dest_args))
1035
1036 @staticmethod
1038 """Starts an export daemon.
1039
1040 """
1041 (opts_s, host, port, instance, component, (source, source_args)) = params
1042
1043 opts = objects.ImportExportOptions.FromDict(opts_s)
1044
1045 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1046 host, port,
1047 objects.Instance.FromDict(instance),
1048 component, source,
1049 _DecodeImportExportIO(source,
1050 source_args))
1051
1052 @staticmethod
1058
1059 @staticmethod
1065
1066 @staticmethod
1072
1075 """Initial checks whether to run or exit with a failure.
1076
1077 """
1078 if args:
1079 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1080 sys.argv[0])
1081 sys.exit(constants.EXIT_FAILURE)
1082 try:
1083 codecs.lookup("string-escape")
1084 except LookupError:
1085 print >> sys.stderr, ("Can't load the string-escape code which is part"
1086 " of the Python installation. Is your installation"
1087 " complete/correct? Aborting.")
1088 sys.exit(constants.EXIT_FAILURE)
1089
1092 """Preparation node daemon function, executed with the PID file held.
1093
1094 """
1095 if options.mlock:
1096 request_executor_class = MlockallRequestExecutor
1097 try:
1098 utils.Mlockall()
1099 except errors.NoCtypesError:
1100 logging.warning("Cannot set memory lock, ctypes module not found")
1101 request_executor_class = http.server.HttpServerRequestExecutor
1102 else:
1103 request_executor_class = http.server.HttpServerRequestExecutor
1104
1105
1106 if options.ssl:
1107 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1108 ssl_cert_path=options.ssl_cert)
1109 else:
1110 ssl_params = None
1111
1112 err = _PrepareQueueLock()
1113 if err is not None:
1114
1115
1116
1117 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1118
1119 handler = NodeRequestHandler()
1120
1121 mainloop = daemon.Mainloop()
1122 server = \
1123 http.server.HttpServer(mainloop, options.bind_address, options.port,
1124 handler, ssl_params=ssl_params, ssl_verify_peer=True,
1125 request_executor_class=request_executor_class)
1126 server.Start()
1127
1128 return (mainloop, server)
1129
1130
1131 -def ExecNoded(options, args, prep_data):
1132 """Main node daemon function, executed with the PID file held.
1133
1134 """
1135 (mainloop, server) = prep_data
1136 try:
1137 mainloop.Run()
1138 finally:
1139 server.Stop()
1140
1143 """Main function for the node daemon.
1144
1145 """
1146 parser = OptionParser(description="Ganeti node daemon",
1147 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1148 version="%%prog (ganeti) %s" %
1149 constants.RELEASE_VERSION)
1150 parser.add_option("--no-mlock", dest="mlock",
1151 help="Do not mlock the node memory in ram",
1152 default=True, action="store_false")
1153
1154 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1155 default_ssl_cert=pathutils.NODED_CERT_FILE,
1156 default_ssl_key=pathutils.NODED_CERT_FILE,
1157 console_logging=True)
1158