1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Ganeti node daemon"""
32
33
34
35
36
37
38
39
40
41 import os
42 import sys
43 import logging
44 import signal
45 import codecs
46
47 from optparse import OptionParser
48
49 from ganeti import backend
50 from ganeti import constants
51 from ganeti import objects
52 from ganeti import errors
53 from ganeti import jstore
54 from ganeti import daemon
55 from ganeti import http
56 from ganeti import utils
57 from ganeti.storage import container
58 from ganeti import serializer
59 from ganeti import netutils
60 from ganeti import pathutils
61 from ganeti import ssconf
62
63 import ganeti.http.server
64
65
66 queue_lock = None
70 """Extend the reason trail with noded information
71
72 The trail is extended by appending the name of the noded functionality
73 """
74 assert trail is not None
75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
76 trail.append((trail_source, reason, utils.EpochNano()))
77
80 """Try to prepare the queue lock.
81
82 @return: None for success, otherwise an exception object
83
84 """
85 global queue_lock
86
87 if queue_lock is not None:
88 return None
89
90
91 try:
92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
93 return None
94 except EnvironmentError, err:
95 return err
96
99 """Decorator for job queue manipulating functions.
100
101 """
102 QUEUE_LOCK_TIMEOUT = 10
103
104 def wrapper(*args, **kwargs):
105
106
107 if _PrepareQueueLock() is not None:
108 raise errors.JobQueueError("Job queue failed initialization,"
109 " cannot update jobs")
110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
111 try:
112 return fn(*args, **kwargs)
113 finally:
114 queue_lock.Unlock()
115
116 return wrapper
117
132
135 """Returns value or, if evaluating to False, a default value.
136
137 Returns the given value, unless it evaluates to False. In the latter case the
138 default value is returned.
139
140 @param value: Value to return if it doesn't evaluate to False
141 @param default: Default value
142 @return: Given value or the default
143
144 """
145 if value:
146 return value
147
148 return default
149
152 """Subclass ensuring request handlers are locked in RAM.
153
154 """
159
162 """The server implementation.
163
164 This class holds all methods exposed over the RPC interface.
165
166 """
167
168
169
173
175 """Handle a request.
176
177 """
178
179 if req.request_method.upper() != http.HTTP_POST:
180 raise http.HttpBadRequest("Only the POST method is supported")
181
182 path = req.request_path
183 if path.startswith("/"):
184 path = path[1:]
185
186 method = getattr(self, "perspective_%s" % path, None)
187 if method is None:
188 raise http.HttpNotFound()
189
190 try:
191 result = (True, method(serializer.LoadJson(req.request_body)))
192
193 except backend.RPCFail, err:
194
195
196
197 result = (False, str(err))
198 except errors.QuitGanetiException, err:
199
200 logging.info("Shutting down the node daemon, arguments: %s",
201 str(err.args))
202 os.kill(self.noded_pid, signal.SIGTERM)
203
204
205 result = err.args
206 except Exception, err:
207 logging.exception("Error in RPC call")
208 result = (False, "Error while executing backend function: %s" % str(err))
209
210 return serializer.DumpJson(result)
211
212
213
214 @staticmethod
216 """Create a block device.
217
218 """
219 (bdev_s, size, owner, on_primary, info, excl_stor) = params
220 bdev = objects.Disk.FromDict(bdev_s)
221 if bdev is None:
222 raise ValueError("can't unserialize data!")
223 return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
224 excl_stor)
225
226 @staticmethod
234
235 @staticmethod
243
244 @staticmethod
252
253 @staticmethod
260
261 @staticmethod
272
273 @staticmethod
283
284 @staticmethod
286 """Add a child to a mirror device.
287
288 Note: this is only valid for mirror devices. It's the caller's duty
289 to send a correct disk, otherwise we raise an error.
290
291 """
292 bdev_s, ndev_s = params
293 bdev = objects.Disk.FromDict(bdev_s)
294 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
295 if bdev is None or ndevs.count(None) > 0:
296 raise ValueError("can't unserialize data!")
297 return backend.BlockdevAddchildren(bdev, ndevs)
298
299 @staticmethod
301 """Remove a child from a mirror device.
302
303 This is only valid for mirror devices, of course. It's the callers
304 duty to send a correct disk, otherwise we raise an error.
305
306 """
307 bdev_s, ndev_s = params
308 bdev = objects.Disk.FromDict(bdev_s)
309 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
310 if bdev is None or ndevs.count(None) > 0:
311 raise ValueError("can't unserialize data!")
312 return backend.BlockdevRemovechildren(bdev, ndevs)
313
314 @staticmethod
323
324 @staticmethod
326 """Return the mirror status for a list of disks.
327
328 """
329 (node_disks, ) = params
330
331 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
332
333 result = []
334
335 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
336 if success:
337 result.append((success, status.ToDict()))
338 else:
339 result.append((success, status))
340
341 return result
342
343 @staticmethod
345 """Expose the FindBlockDevice functionality for a disk.
346
347 This will try to find but not activate a disk.
348
349 """
350 disk = objects.Disk.FromDict(params[0])
351
352 result = backend.BlockdevFind(disk)
353 if result is None:
354 return None
355
356 return result.ToDict()
357
358 @staticmethod
360 """Create a snapshot device.
361
362 Note that this is only valid for LVM disks, if we get passed
363 something else we raise an exception. The snapshot device can be
364 remove by calling the generic block device remove call.
365
366 """
367 cfbd = objects.Disk.FromDict(params[0])
368 return backend.BlockdevSnapshot(cfbd)
369
370 @staticmethod
372 """Grow a stack of devices.
373
374 """
375 if len(params) < 5:
376 raise ValueError("Received only %s parameters in blockdev_grow,"
377 " old master?" % len(params))
378 cfbd = objects.Disk.FromDict(params[0])
379 amount = params[1]
380 dryrun = params[2]
381 backingstore = params[3]
382 excl_stor = params[4]
383 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
384
385 @staticmethod
392
393 @staticmethod
400
401 @staticmethod
409
410
411
412 @staticmethod
414 """Disconnects the network connection of drbd disks.
415
416 Note that this is only valid for drbd disks, so the members of the
417 disk list must all be drbd devices.
418
419 """
420 (disks,) = params
421 disks = [objects.Disk.FromDict(disk) for disk in disks]
422 return backend.DrbdDisconnectNet(disks)
423
424 @staticmethod
426 """Attaches the network connection of drbd disks.
427
428 Note that this is only valid for drbd disks, so the members of the
429 disk list must all be drbd devices.
430
431 """
432 disks, instance_name, multimaster = params
433 disks = [objects.Disk.FromDict(disk) for disk in disks]
434 return backend.DrbdAttachNet(disks, instance_name, multimaster)
435
436 @staticmethod
438 """Wait until DRBD disks are synched.
439
440 Note that this is only valid for drbd disks, so the members of the
441 disk list must all be drbd devices.
442
443 """
444 (disks,) = params
445 disks = [objects.Disk.FromDict(disk) for disk in disks]
446 return backend.DrbdWaitSync(disks)
447
448 @staticmethod
450 """Checks if the drbd devices need activation
451
452 Note that this is only valid for drbd disks, so the members of the
453 disk list must all be drbd devices.
454
455 """
456 (disks,) = params
457 disks = [objects.Disk.FromDict(disk) for disk in disks]
458 return backend.DrbdNeedsActivation(disks)
459
460 @staticmethod
466
467
468
469 @staticmethod
484
485 @staticmethod
487 """Query information about an existing export on this node.
488
489 The given path may not contain an export, in which case we return
490 None.
491
492 """
493 path = params[0]
494 return backend.ExportInfo(path)
495
496 @staticmethod
498 """List the available exports on this node.
499
500 Note that as opposed to export_info, which may query data about an
501 export in any path, this only queries the standard Ganeti path
502 (pathutils.EXPORT_DIR).
503
504 """
505 return backend.ListExports()
506
507 @staticmethod
509 """Remove an export.
510
511 """
512 export = params[0]
513 return backend.RemoveExport(export)
514
515
516 @staticmethod
518 """Query the list of block devices
519
520 """
521 devices = params[0]
522 return backend.GetBlockDevSizes(devices)
523
524
525
526 @staticmethod
528 """Query the list of logical volumes in a given volume group.
529
530 """
531 vgname = params[0]
532 return backend.GetVolumeList(vgname)
533
534 @staticmethod
540
541
542
543 @staticmethod
545 """Get list of storage units.
546
547 """
548 (su_name, su_args, name, fields) = params
549 return container.GetStorage(su_name, *su_args).List(name, fields)
550
551 @staticmethod
558
559 @staticmethod
566
567
568
569 @staticmethod
571 """Check if all bridges given exist on this node.
572
573 """
574 bridges_list = params[0]
575 return backend.BridgesExist(bridges_list)
576
577
578
579 @staticmethod
581 """Install an OS on a given instance.
582
583 """
584 inst_s = params[0]
585 inst = objects.Instance.FromDict(inst_s)
586 reinstall = params[1]
587 debug = params[2]
588 return backend.InstanceOsAdd(inst, reinstall, debug)
589
590 @staticmethod
598
599 @staticmethod
609
610 @staticmethod
619
620 @staticmethod
622 """Hotplugs device to a running instance.
623
624 """
625 (idict, action, dev_type, ddict, extra, seq) = params
626 instance = objects.Instance.FromDict(idict)
627 if dev_type == constants.HOTPLUG_TARGET_DISK:
628 device = objects.Disk.FromDict(ddict)
629 elif dev_type == constants.HOTPLUG_TARGET_NIC:
630 device = objects.NIC.FromDict(ddict)
631 else:
632 assert dev_type in constants.HOTPLUG_ALL_TARGETS
633 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
634
635 @staticmethod
642
643 @staticmethod
650
651 @staticmethod
659
660 @staticmethod
668
669 @staticmethod
677
678 @staticmethod
686
687 @staticmethod
694
695 @staticmethod
707
708 @staticmethod
716
717 @staticmethod
719 """Query instance information.
720
721 """
722 (instance_name, hypervisor_name, hvparams) = params
723 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
724
725 @staticmethod
732
733 @staticmethod
735 """Query information about all instances.
736
737 """
738 (hypervisor_list, all_hvparams) = params
739 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
740
741 @staticmethod
747
748 @staticmethod
750 """Query the list of running instances.
751
752 """
753 (hypervisor_list, hvparams) = params
754 return backend.GetInstanceList(hypervisor_list, hvparams)
755
756
757
758 @staticmethod
760 """Checks if a node has the given ip address.
761
762 """
763 return netutils.IPAddress.Own(params[0])
764
765 @staticmethod
767 """Query node information.
768
769 """
770 (storage_units, hv_specs) = params
771 return backend.GetNodeInfo(storage_units, hv_specs)
772
773 @staticmethod
775 """Modify a node entry in /etc/hosts.
776
777 """
778 backend.EtcHostsModify(params[0], params[1], params[2])
779
780 return True
781
782 @staticmethod
784 """Run a verify sequence on this node.
785
786 """
787 (what, cluster_name, hvparams, node_groups, groups_cfg) = params
788 return backend.VerifyNode(what, cluster_name, hvparams,
789 node_groups, groups_cfg)
790
791 @classmethod
793 """Run a light verify sequence on this node.
794
795 This call is meant to perform a less strict verification of the node in
796 certain situations. Right now, it is invoked only when a node is just about
797 to be added to a cluster, and even then, it performs the same checks as
798 L{perspective_node_verify}.
799 """
800 return cls.perspective_node_verify(params)
801
802 @staticmethod
808
809 @staticmethod
816
817 @staticmethod
824
825 @staticmethod
831
832 @staticmethod
834 """Change the master IP netmask.
835
836 """
837 return backend.ChangeMasterNetmask(params[0], params[1], params[2],
838 params[3])
839
840 @staticmethod
846
847 @staticmethod
849 """Query the list of all logical volume groups.
850
851 """
852 return backend.NodeVolumes()
853
854 @staticmethod
856 """Demote a node from the master candidate role.
857
858 """
859 return backend.DemoteFromMC()
860
861 @staticmethod
863 """Tries to powercycle the node.
864
865 """
866 (hypervisor_type, hvparams) = params
867 return backend.PowercycleNode(hypervisor_type, hvparams)
868
869 @staticmethod
876
877 @staticmethod
879 """Gets the node's public crypto tokens.
880
881 """
882 token_requests = params[0]
883 return backend.GetCryptoTokens(token_requests)
884
885 @staticmethod
887 """Ensure daemon is running.
888
889 """
890 (daemon_name, run) = params
891 return backend.EnsureDaemon(daemon_name, run)
892
893
894
895 @staticmethod
901
902 @staticmethod
904 """Upload a file.
905
906 Note that the backend implementation imposes strict rules on which
907 files are accepted.
908
909 """
910 return backend.UploadFile(*(params[0]))
911
912 @staticmethod
918
919 @staticmethod
930
931 @staticmethod
939
940 @staticmethod
947
948 @staticmethod
954
955 @staticmethod
962
963
964
965 @staticmethod
967 """Query detailed information about existing OSes.
968
969 """
970 return backend.DiagnoseOS()
971
972 @staticmethod
980
981 @staticmethod
983 """Run a given OS' validation routine.
984
985 """
986 required, name, checks, params = params
987 return backend.ValidateOS(required, name, checks, params)
988
989
990
991 @staticmethod
997
998
999
1000 @staticmethod
1002 """Run hook scripts.
1003
1004 """
1005 hpath, phase, env = params
1006 hr = backend.HooksRunner()
1007 return hr.RunHooks(hpath, phase, env)
1008
1009
1010
1011 @staticmethod
1013 """Run an iallocator script.
1014
1015 """
1016 name, idata, ial_params_dict = params
1017 ial_params = []
1018 for ial_param in ial_params_dict.items():
1019 ial_params.append("--" + ial_param[0] + "=" + ial_param[1])
1020 iar = backend.IAllocatorRunner()
1021 return iar.Run(name, idata, ial_params)
1022
1023
1024
1025 @staticmethod
1027 """Run test delay.
1028
1029 """
1030 duration = params[0]
1031 status, rval = utils.TestDelay(duration)
1032 if not status:
1033 raise backend.RPCFail(rval)
1034 return rval
1035
1036
1037
1038 @staticmethod
1040 """Create the file storage directory.
1041
1042 """
1043 file_storage_dir = params[0]
1044 return backend.CreateFileStorageDir(file_storage_dir)
1045
1046 @staticmethod
1048 """Remove the file storage directory.
1049
1050 """
1051 file_storage_dir = params[0]
1052 return backend.RemoveFileStorageDir(file_storage_dir)
1053
1054 @staticmethod
1056 """Rename the file storage directory.
1057
1058 """
1059 old_file_storage_dir = params[0]
1060 new_file_storage_dir = params[1]
1061 return backend.RenameFileStorageDir(old_file_storage_dir,
1062 new_file_storage_dir)
1063
1064
1065
1066 @staticmethod
1067 @_RequireJobQueueLock
1069 """Update job queue.
1070
1071 """
1072 (file_name, content) = params
1073 return backend.JobQueueUpdate(file_name, content)
1074
1075 @staticmethod
1076 @_RequireJobQueueLock
1082
1083 @staticmethod
1084 @_RequireJobQueueLock
1086 """Rename a job queue file.
1087
1088 """
1089
1090 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1091
1092 @staticmethod
1093 @_RequireJobQueueLock
1095 """Set job queue's drain flag.
1096
1097 """
1098 (flag, ) = params
1099
1100 return jstore.SetDrainFlag(flag)
1101
1102
1103
1104 @staticmethod
1106 """Validate the hypervisor parameters.
1107
1108 """
1109 (hvname, hvparams) = params
1110 return backend.ValidateHVParams(hvname, hvparams)
1111
1112
1113
1114 @staticmethod
1116 """Creates a new X509 certificate for SSL/TLS.
1117
1118 """
1119 (validity, ) = params
1120 return backend.CreateX509Certificate(validity)
1121
1122 @staticmethod
1129
1130
1131
1132 @staticmethod
1134 """Starts an import daemon.
1135
1136 """
1137 (opts_s, instance, component, (dest, dest_args)) = params
1138
1139 opts = objects.ImportExportOptions.FromDict(opts_s)
1140
1141 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1142 None, None,
1143 objects.Instance.FromDict(instance),
1144 component, dest,
1145 _DecodeImportExportIO(dest,
1146 dest_args))
1147
1148 @staticmethod
1150 """Starts an export daemon.
1151
1152 """
1153 (opts_s, host, port, instance, component, (source, source_args)) = params
1154
1155 opts = objects.ImportExportOptions.FromDict(opts_s)
1156
1157 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1158 host, port,
1159 objects.Instance.FromDict(instance),
1160 component, source,
1161 _DecodeImportExportIO(source,
1162 source_args))
1163
1164 @staticmethod
1170
1171 @staticmethod
1177
1178 @staticmethod
1184
1187 """Initial checks whether to run or exit with a failure.
1188
1189 """
1190 if args:
1191 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1192 sys.argv[0])
1193 sys.exit(constants.EXIT_FAILURE)
1194 try:
1195 codecs.lookup("string-escape")
1196 except LookupError:
1197 print >> sys.stderr, ("Can't load the string-escape code which is part"
1198 " of the Python installation. Is your installation"
1199 " complete/correct? Aborting.")
1200 sys.exit(constants.EXIT_FAILURE)
1201
1204 """Callback function to verify a peer against the candidate cert map.
1205
1206 Note that we have a chicken-and-egg problem during cluster init and upgrade.
1207 This method checks whether the incoming connection comes from a master
1208 candidate by comparing it to the master certificate map in the cluster
1209 configuration. However, during cluster init and cluster upgrade there
1210 are various RPC calls done to the master node itself, before the candidate
1211 certificate list is established and the cluster configuration is written.
1212 In this case, we cannot check against the master candidate map.
1213
1214 This problem is solved by checking whether the candidate map is empty. An
1215 initialized 2.11 or higher cluster has at least one entry for the master
1216 node in the candidate map. If the map is empty, we know that we are still
1217 in the bootstrap/upgrade phase. In this case, we read the server certificate
1218 digest and compare it to the incoming request.
1219
1220 This means that after an upgrade of Ganeti, the system continues to operate
1221 like before, using server certificates only. After the client certificates
1222 are generated with ``gnt-cluster renew-crypto --new-node-certificates``,
1223 RPC communication is switched to using client certificates and the trick of
1224 using server certificates does not work anymore.
1225
1226 @type conn: C{OpenSSL.SSL.Connection}
1227 @param conn: the OpenSSL connection object
1228 @type cert: C{OpenSSL.X509}
1229 @param cert: the peer's SSL certificate
1230
1231 """
1232
1233
1234 _BOOTSTRAP = "bootstrap"
1235 sstore = ssconf.SimpleStore()
1236 try:
1237 candidate_certs = sstore.GetMasterCandidatesCertMap()
1238 except errors.ConfigurationError:
1239 logging.info("No candidate certificates found. Switching to "
1240 "bootstrap/update mode.")
1241 candidate_certs = None
1242 if not candidate_certs:
1243 candidate_certs = {
1244 _BOOTSTRAP: utils.GetCertificateDigest(
1245 cert_filename=pathutils.NODED_CERT_FILE)}
1246 return cert.digest("sha1") in candidate_certs.values()
1247
1251 """Preparation node daemon function, executed with the PID file held.
1252
1253 """
1254 if options.mlock:
1255 request_executor_class = MlockallRequestExecutor
1256 try:
1257 utils.Mlockall()
1258 except errors.NoCtypesError:
1259 logging.warning("Cannot set memory lock, ctypes module not found")
1260 request_executor_class = http.server.HttpServerRequestExecutor
1261 else:
1262 request_executor_class = http.server.HttpServerRequestExecutor
1263
1264
1265 if options.ssl:
1266 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1267 ssl_cert_path=options.ssl_cert)
1268 else:
1269 ssl_params = None
1270
1271 err = _PrepareQueueLock()
1272 if err is not None:
1273
1274
1275
1276 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1277
1278 handler = NodeRequestHandler()
1279
1280 mainloop = daemon.Mainloop()
1281 server = \
1282 http.server.HttpServer(mainloop, options.bind_address, options.port,
1283 handler, ssl_params=ssl_params, ssl_verify_peer=True,
1284 request_executor_class=request_executor_class,
1285 ssl_verify_callback=SSLVerifyPeer)
1286 server.Start()
1287
1288 return (mainloop, server)
1289
1290
1291 -def ExecNoded(options, args, prep_data):
1292 """Main node daemon function, executed with the PID file held.
1293
1294 """
1295 (mainloop, server) = prep_data
1296 try:
1297 mainloop.Run()
1298 finally:
1299 server.Stop()
1300
1303 """Main function for the node daemon.
1304
1305 """
1306 parser = OptionParser(description="Ganeti node daemon",
1307 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1308 " [-i INTERFACE]"),
1309 version="%%prog (ganeti) %s" %
1310 constants.RELEASE_VERSION)
1311 parser.add_option("--no-mlock", dest="mlock",
1312 help="Do not mlock the node memory in ram",
1313 default=True, action="store_false")
1314
1315 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1316 default_ssl_cert=pathutils.NODED_CERT_FILE,
1317 default_ssl_key=pathutils.NODED_CERT_FILE,
1318 console_logging=True)
1319