1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Ganeti node daemon"""
32
33
34
35
36
37
38
39
40
41 import os
42 import sys
43 import logging
44 import signal
45 import codecs
46
47 from optparse import OptionParser
48
49 from ganeti import backend
50 from ganeti import constants
51 from ganeti import objects
52 from ganeti import errors
53 from ganeti import jstore
54 from ganeti import daemon
55 from ganeti import http
56 from ganeti import utils
57 from ganeti.storage import container
58 from ganeti import serializer
59 from ganeti import netutils
60 from ganeti import pathutils
61 from ganeti import ssconf
62
63 import ganeti.http.server
64
65
66 queue_lock = None
70 """Extend the reason trail with noded information
71
72 The trail is extended by appending the name of the noded functionality
73 """
74 assert trail is not None
75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
76 trail.append((trail_source, reason, utils.EpochNano()))
77
80 """Try to prepare the queue lock.
81
82 @return: None for success, otherwise an exception object
83
84 """
85 global queue_lock
86
87 if queue_lock is not None:
88 return None
89
90
91 try:
92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
93 return None
94 except EnvironmentError, err:
95 return err
96
99 """Decorator for job queue manipulating functions.
100
101 """
102 QUEUE_LOCK_TIMEOUT = 10
103
104 def wrapper(*args, **kwargs):
105
106
107 if _PrepareQueueLock() is not None:
108 raise errors.JobQueueError("Job queue failed initialization,"
109 " cannot update jobs")
110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
111 try:
112 return fn(*args, **kwargs)
113 finally:
114 queue_lock.Unlock()
115
116 return wrapper
117
132
135 """Returns value or, if evaluating to False, a default value.
136
137 Returns the given value, unless it evaluates to False. In the latter case the
138 default value is returned.
139
140 @param value: Value to return if it doesn't evaluate to False
141 @param default: Default value
142 @return: Given value or the default
143
144 """
145 if value:
146 return value
147
148 return default
149
152 """Subclass ensuring request handlers are locked in RAM.
153
154 """
159
162 """The server implementation.
163
164 This class holds all methods exposed over the RPC interface.
165
166 """
167
168
169
173
175 """Handle a request.
176
177 """
178 if req.request_method.upper() != http.HTTP_POST:
179 raise http.HttpBadRequest("Only the POST method is supported")
180
181 path = req.request_path
182 if path.startswith("/"):
183 path = path[1:]
184
185 method = getattr(self, "perspective_%s" % path, None)
186 if method is None:
187 raise http.HttpNotFound()
188
189 try:
190 result = (True, method(serializer.LoadJson(req.request_body)))
191
192 except backend.RPCFail, err:
193
194
195
196 result = (False, str(err))
197 except errors.QuitGanetiException, err:
198
199 logging.info("Shutting down the node daemon, arguments: %s",
200 str(err.args))
201 os.kill(self.noded_pid, signal.SIGTERM)
202
203
204 result = err.args
205 except Exception, err:
206 logging.exception("Error in RPC call")
207 result = (False, "Error while executing backend function: %s" % str(err))
208
209 return serializer.DumpJson(result)
210
211
212
213 @staticmethod
215 """Create a block device.
216
217 """
218 (bdev_s, size, owner, on_primary, info, excl_stor) = params
219 bdev = objects.Disk.FromDict(bdev_s)
220 if bdev is None:
221 raise ValueError("can't unserialize data!")
222 return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
223 excl_stor)
224
225 @staticmethod
233
234 @staticmethod
242
243 @staticmethod
251
252 @staticmethod
259
260 @staticmethod
271
272 @staticmethod
282
283 @staticmethod
285 """Add a child to a mirror device.
286
287 Note: this is only valid for mirror devices. It's the caller's duty
288 to send a correct disk, otherwise we raise an error.
289
290 """
291 bdev_s, ndev_s = params
292 bdev = objects.Disk.FromDict(bdev_s)
293 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
294 if bdev is None or ndevs.count(None) > 0:
295 raise ValueError("can't unserialize data!")
296 return backend.BlockdevAddchildren(bdev, ndevs)
297
298 @staticmethod
300 """Remove a child from a mirror device.
301
302 This is only valid for mirror devices, of course. It's the callers
303 duty to send a correct disk, otherwise we raise an error.
304
305 """
306 bdev_s, ndev_s = params
307 bdev = objects.Disk.FromDict(bdev_s)
308 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
309 if bdev is None or ndevs.count(None) > 0:
310 raise ValueError("can't unserialize data!")
311 return backend.BlockdevRemovechildren(bdev, ndevs)
312
313 @staticmethod
322
323 @staticmethod
325 """Return the mirror status for a list of disks.
326
327 """
328 (node_disks, ) = params
329
330 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
331
332 result = []
333
334 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
335 if success:
336 result.append((success, status.ToDict()))
337 else:
338 result.append((success, status))
339
340 return result
341
342 @staticmethod
344 """Expose the FindBlockDevice functionality for a disk.
345
346 This will try to find but not activate a disk.
347
348 """
349 disk = objects.Disk.FromDict(params[0])
350
351 result = backend.BlockdevFind(disk)
352 if result is None:
353 return None
354
355 return result.ToDict()
356
357 @staticmethod
359 """Create a snapshot device.
360
361 Note that this is only valid for LVM disks, if we get passed
362 something else we raise an exception. The snapshot device can be
363 remove by calling the generic block device remove call.
364
365 """
366 cfbd = objects.Disk.FromDict(params[0])
367 return backend.BlockdevSnapshot(cfbd)
368
369 @staticmethod
371 """Grow a stack of devices.
372
373 """
374 if len(params) < 5:
375 raise ValueError("Received only %s parameters in blockdev_grow,"
376 " old master?" % len(params))
377 cfbd = objects.Disk.FromDict(params[0])
378 amount = params[1]
379 dryrun = params[2]
380 backingstore = params[3]
381 excl_stor = params[4]
382 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
383
384 @staticmethod
391
392 @staticmethod
399
400 @staticmethod
402 """Compute the sizes of the given block devices.
403
404 """
405 disk = objects.Disk.FromDict(params[0])
406 dest_node_ip, dest_path, cluster_name = params[1:]
407 return backend.BlockdevExport(disk, dest_node_ip, dest_path, cluster_name)
408
409 @staticmethod
417
418
419
420 @staticmethod
422 """Disconnects the network connection of drbd disks.
423
424 Note that this is only valid for drbd disks, so the members of the
425 disk list must all be drbd devices.
426
427 """
428 (disks,) = params
429 disks = [objects.Disk.FromDict(disk) for disk in disks]
430 return backend.DrbdDisconnectNet(disks)
431
432 @staticmethod
434 """Attaches the network connection of drbd disks.
435
436 Note that this is only valid for drbd disks, so the members of the
437 disk list must all be drbd devices.
438
439 """
440 disks, instance_name, multimaster = params
441 disks = [objects.Disk.FromDict(disk) for disk in disks]
442 return backend.DrbdAttachNet(disks, instance_name, multimaster)
443
444 @staticmethod
446 """Wait until DRBD disks are synched.
447
448 Note that this is only valid for drbd disks, so the members of the
449 disk list must all be drbd devices.
450
451 """
452 (disks,) = params
453 disks = [objects.Disk.FromDict(disk) for disk in disks]
454 return backend.DrbdWaitSync(disks)
455
456 @staticmethod
458 """Checks if the drbd devices need activation
459
460 Note that this is only valid for drbd disks, so the members of the
461 disk list must all be drbd devices.
462
463 """
464 (disks,) = params
465 disks = [objects.Disk.FromDict(disk) for disk in disks]
466 return backend.DrbdNeedsActivation(disks)
467
468 @staticmethod
474
475
476
477 @staticmethod
492
493 @staticmethod
495 """Query information about an existing export on this node.
496
497 The given path may not contain an export, in which case we return
498 None.
499
500 """
501 path = params[0]
502 return backend.ExportInfo(path)
503
504 @staticmethod
506 """List the available exports on this node.
507
508 Note that as opposed to export_info, which may query data about an
509 export in any path, this only queries the standard Ganeti path
510 (pathutils.EXPORT_DIR).
511
512 """
513 return backend.ListExports()
514
515 @staticmethod
517 """Remove an export.
518
519 """
520 export = params[0]
521 return backend.RemoveExport(export)
522
523
524 @staticmethod
526 """Query the list of block devices
527
528 """
529 devices = params[0]
530 return backend.GetBlockDevSizes(devices)
531
532
533
534 @staticmethod
536 """Query the list of logical volumes in a given volume group.
537
538 """
539 vgname = params[0]
540 return backend.GetVolumeList(vgname)
541
542 @staticmethod
548
549
550
551 @staticmethod
553 """Get list of storage units.
554
555 """
556 (su_name, su_args, name, fields) = params
557 return container.GetStorage(su_name, *su_args).List(name, fields)
558
559 @staticmethod
566
567 @staticmethod
574
575
576
577 @staticmethod
579 """Check if all bridges given exist on this node.
580
581 """
582 bridges_list = params[0]
583 return backend.BridgesExist(bridges_list)
584
585
586
587 @staticmethod
589 """Install an OS on a given instance.
590
591 """
592 inst_s = params[0]
593 inst = objects.Instance.FromDict(inst_s)
594 reinstall = params[1]
595 debug = params[2]
596 return backend.InstanceOsAdd(inst, reinstall, debug)
597
598 @staticmethod
606
607 @staticmethod
617
618 @staticmethod
627
628 @staticmethod
630 """Hotplugs device to a running instance.
631
632 """
633 (idict, action, dev_type, ddict, extra, seq) = params
634 instance = objects.Instance.FromDict(idict)
635 if dev_type == constants.HOTPLUG_TARGET_DISK:
636 device = objects.Disk.FromDict(ddict)
637 elif dev_type == constants.HOTPLUG_TARGET_NIC:
638 device = objects.NIC.FromDict(ddict)
639 else:
640 assert dev_type in constants.HOTPLUG_ALL_TARGETS
641 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
642
643 @staticmethod
650
651 @staticmethod
658
659 @staticmethod
667
668 @staticmethod
676
677 @staticmethod
685
686 @staticmethod
694
695 @staticmethod
702
703 @staticmethod
715
716 @staticmethod
724
725 @staticmethod
727 """Query instance information.
728
729 """
730 (instance_name, hypervisor_name, hvparams) = params
731 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
732
733 @staticmethod
740
741 @staticmethod
743 """Query information about all instances.
744
745 """
746 (hypervisor_list, all_hvparams) = params
747 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
748
749 @staticmethod
751 """Query the list of running instances.
752
753 """
754 (hypervisor_list, hvparams) = params
755 return backend.GetInstanceList(hypervisor_list, hvparams)
756
757
758
759 @staticmethod
761 """Checks if a node has the given ip address.
762
763 """
764 return netutils.IPAddress.Own(params[0])
765
766 @staticmethod
768 """Query node information.
769
770 """
771 (storage_units, hv_specs) = params
772 return backend.GetNodeInfo(storage_units, hv_specs)
773
774 @staticmethod
776 """Modify a node entry in /etc/hosts.
777
778 """
779 backend.EtcHostsModify(params[0], params[1], params[2])
780
781 return True
782
783 @staticmethod
785 """Run a verify sequence on this node.
786
787 """
788 (what, cluster_name, hvparams) = params
789 return backend.VerifyNode(what, cluster_name, hvparams)
790
791 @classmethod
798
799 @staticmethod
805
806 @staticmethod
813
814 @staticmethod
821
822 @staticmethod
828
829 @staticmethod
831 """Change the master IP netmask.
832
833 """
834 return backend.ChangeMasterNetmask(params[0], params[1], params[2],
835 params[3])
836
837 @staticmethod
843
844 @staticmethod
846 """Query the list of all logical volume groups.
847
848 """
849 return backend.NodeVolumes()
850
851 @staticmethod
853 """Demote a node from the master candidate role.
854
855 """
856 return backend.DemoteFromMC()
857
858 @staticmethod
860 """Tries to powercycle the node.
861
862 """
863 (hypervisor_type, hvparams) = params
864 return backend.PowercycleNode(hypervisor_type, hvparams)
865
866 @staticmethod
873
874
875
876 @staticmethod
882
883 @staticmethod
885 """Upload a file.
886
887 Note that the backend implementation imposes strict rules on which
888 files are accepted.
889
890 """
891 return backend.UploadFile(*(params[0]))
892
893 @staticmethod
899
900 @staticmethod
911
912 @staticmethod
920
921 @staticmethod
928
929 @staticmethod
935
936 @staticmethod
943
944
945
946 @staticmethod
948 """Query detailed information about existing OSes.
949
950 """
951 return backend.DiagnoseOS()
952
953 @staticmethod
961
962 @staticmethod
964 """Run a given OS' validation routine.
965
966 """
967 required, name, checks, params = params
968 return backend.ValidateOS(required, name, checks, params)
969
970
971
972 @staticmethod
978
979
980
981 @staticmethod
983 """Run hook scripts.
984
985 """
986 hpath, phase, env = params
987 hr = backend.HooksRunner()
988 return hr.RunHooks(hpath, phase, env)
989
990
991
992 @staticmethod
1000
1001
1002
1003 @staticmethod
1005 """Run test delay.
1006
1007 """
1008 duration = params[0]
1009 status, rval = utils.TestDelay(duration)
1010 if not status:
1011 raise backend.RPCFail(rval)
1012 return rval
1013
1014
1015
1016 @staticmethod
1018 """Create the file storage directory.
1019
1020 """
1021 file_storage_dir = params[0]
1022 return backend.CreateFileStorageDir(file_storage_dir)
1023
1024 @staticmethod
1026 """Remove the file storage directory.
1027
1028 """
1029 file_storage_dir = params[0]
1030 return backend.RemoveFileStorageDir(file_storage_dir)
1031
1032 @staticmethod
1034 """Rename the file storage directory.
1035
1036 """
1037 old_file_storage_dir = params[0]
1038 new_file_storage_dir = params[1]
1039 return backend.RenameFileStorageDir(old_file_storage_dir,
1040 new_file_storage_dir)
1041
1042
1043
1044 @staticmethod
1045 @_RequireJobQueueLock
1047 """Update job queue.
1048
1049 """
1050 (file_name, content) = params
1051 return backend.JobQueueUpdate(file_name, content)
1052
1053 @staticmethod
1054 @_RequireJobQueueLock
1060
1061 @staticmethod
1062 @_RequireJobQueueLock
1064 """Rename a job queue file.
1065
1066 """
1067
1068 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1069
1070 @staticmethod
1071 @_RequireJobQueueLock
1073 """Set job queue's drain flag.
1074
1075 """
1076 (flag, ) = params
1077
1078 return jstore.SetDrainFlag(flag)
1079
1080
1081
1082 @staticmethod
1084 """Validate the hypervisor parameters.
1085
1086 """
1087 (hvname, hvparams) = params
1088 return backend.ValidateHVParams(hvname, hvparams)
1089
1090
1091
1092 @staticmethod
1094 """Creates a new X509 certificate for SSL/TLS.
1095
1096 """
1097 (validity, ) = params
1098 return backend.CreateX509Certificate(validity)
1099
1100 @staticmethod
1107
1108
1109
1110 @staticmethod
1112 """Starts an import daemon.
1113
1114 """
1115 (opts_s, instance, component, (dest, dest_args)) = params
1116
1117 opts = objects.ImportExportOptions.FromDict(opts_s)
1118
1119 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1120 None, None,
1121 objects.Instance.FromDict(instance),
1122 component, dest,
1123 _DecodeImportExportIO(dest,
1124 dest_args))
1125
1126 @staticmethod
1128 """Starts an export daemon.
1129
1130 """
1131 (opts_s, host, port, instance, component, (source, source_args)) = params
1132
1133 opts = objects.ImportExportOptions.FromDict(opts_s)
1134
1135 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1136 host, port,
1137 objects.Instance.FromDict(instance),
1138 component, source,
1139 _DecodeImportExportIO(source,
1140 source_args))
1141
1142 @staticmethod
1148
1149 @staticmethod
1155
1156 @staticmethod
1162
1165 """Initial checks whether to run or exit with a failure.
1166
1167 """
1168 if args:
1169 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1170 sys.argv[0])
1171 sys.exit(constants.EXIT_FAILURE)
1172 try:
1173 codecs.lookup("string-escape")
1174 except LookupError:
1175 print >> sys.stderr, ("Can't load the string-escape code which is part"
1176 " of the Python installation. Is your installation"
1177 " complete/correct? Aborting.")
1178 sys.exit(constants.EXIT_FAILURE)
1179
1182 """Preparation node daemon function, executed with the PID file held.
1183
1184 """
1185 if options.mlock:
1186 request_executor_class = MlockallRequestExecutor
1187 try:
1188 utils.Mlockall()
1189 except errors.NoCtypesError:
1190 logging.warning("Cannot set memory lock, ctypes module not found")
1191 request_executor_class = http.server.HttpServerRequestExecutor
1192 else:
1193 request_executor_class = http.server.HttpServerRequestExecutor
1194
1195
1196 if options.ssl:
1197 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1198 ssl_cert_path=options.ssl_cert)
1199 else:
1200 ssl_params = None
1201
1202 err = _PrepareQueueLock()
1203 if err is not None:
1204
1205
1206
1207 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1208
1209 handler = NodeRequestHandler()
1210
1211 mainloop = daemon.Mainloop()
1212 server = \
1213 http.server.HttpServer(mainloop, options.bind_address, options.port,
1214 handler, ssl_params=ssl_params, ssl_verify_peer=True,
1215 request_executor_class=request_executor_class)
1216 server.Start()
1217
1218 return (mainloop, server)
1219
1220
1221 -def ExecNoded(options, args, prep_data):
1222 """Main node daemon function, executed with the PID file held.
1223
1224 """
1225 (mainloop, server) = prep_data
1226 try:
1227 mainloop.Run()
1228 finally:
1229 server.Stop()
1230
1233 """Main function for the node daemon.
1234
1235 """
1236 parser = OptionParser(description="Ganeti node daemon",
1237 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1238 " [-i INTERFACE]"),
1239 version="%%prog (ganeti) %s" %
1240 constants.RELEASE_VERSION)
1241 parser.add_option("--no-mlock", dest="mlock",
1242 help="Do not mlock the node memory in ram",
1243 default=True, action="store_false")
1244
1245 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1246 default_ssl_cert=pathutils.NODED_CERT_FILE,
1247 default_ssl_key=pathutils.NODED_CERT_FILE,
1248 console_logging=True)
1249