1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Ganeti node daemon"""
32
33
34
35
36
37
38
39
40
41 import os
42 import sys
43 import logging
44 import signal
45 import codecs
46
47 from optparse import OptionParser
48
49 from ganeti import backend
50 from ganeti import constants
51 from ganeti import objects
52 from ganeti import errors
53 from ganeti import jstore
54 from ganeti import daemon
55 from ganeti import http
56 from ganeti import utils
57 from ganeti.storage import container
58 from ganeti import serializer
59 from ganeti import netutils
60 from ganeti import pathutils
61 from ganeti import ssconf
62
63 import ganeti.http.server
64
65
66 queue_lock = None
70 """Extend the reason trail with noded information
71
72 The trail is extended by appending the name of the noded functionality
73 """
74 assert trail is not None
75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
76 trail.append((trail_source, reason, utils.EpochNano()))
77
80 """Try to prepare the queue lock.
81
82 @return: None for success, otherwise an exception object
83
84 """
85 global queue_lock
86
87 if queue_lock is not None:
88 return None
89
90
91 try:
92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
93 return None
94 except EnvironmentError, err:
95 return err
96
99 """Decorator for job queue manipulating functions.
100
101 """
102 QUEUE_LOCK_TIMEOUT = 10
103
104 def wrapper(*args, **kwargs):
105
106
107 if _PrepareQueueLock() is not None:
108 raise errors.JobQueueError("Job queue failed initialization,"
109 " cannot update jobs")
110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
111 try:
112 return fn(*args, **kwargs)
113 finally:
114 queue_lock.Unlock()
115
116 return wrapper
117
132
135 """Returns value or, if evaluating to False, a default value.
136
137 Returns the given value, unless it evaluates to False. In the latter case the
138 default value is returned.
139
140 @param value: Value to return if it doesn't evaluate to False
141 @param default: Default value
142 @return: Given value or the default
143
144 """
145 if value:
146 return value
147
148 return default
149
152 """Subclass ensuring request handlers are locked in RAM.
153
154 """
159
162 """The server implementation.
163
164 This class holds all methods exposed over the RPC interface.
165
166 """
167
168
169
173
175 """Handle a request.
176
177 """
178
179 if req.request_method.upper() != http.HTTP_POST:
180 raise http.HttpBadRequest("Only the POST method is supported")
181
182 path = req.request_path
183 if path.startswith("/"):
184 path = path[1:]
185
186 method = getattr(self, "perspective_%s" % path, None)
187 if method is None:
188 raise http.HttpNotFound()
189
190 try:
191 result = (True, method(serializer.LoadJson(req.request_body)))
192
193 except backend.RPCFail, err:
194
195
196
197 result = (False, str(err))
198 except errors.QuitGanetiException, err:
199
200 logging.info("Shutting down the node daemon, arguments: %s",
201 str(err.args))
202 os.kill(self.noded_pid, signal.SIGTERM)
203
204
205 result = err.args
206 except Exception, err:
207 logging.exception("Error in RPC call")
208 result = (False, "Error while executing backend function: %s" % str(err))
209
210 return serializer.DumpJson(result)
211
212
213
214 @staticmethod
216 """Create a block device.
217
218 """
219 (bdev_s, size, owner, on_primary, info, excl_stor) = params
220 bdev = objects.Disk.FromDict(bdev_s)
221 if bdev is None:
222 raise ValueError("can't unserialize data!")
223 return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
224 excl_stor)
225
226 @staticmethod
234
235 @staticmethod
243
244 @staticmethod
252
253 @staticmethod
261
262 @staticmethod
269
270 @staticmethod
281
282 @staticmethod
292
293 @staticmethod
295 """Add a child to a mirror device.
296
297 Note: this is only valid for mirror devices. It's the caller's duty
298 to send a correct disk, otherwise we raise an error.
299
300 """
301 bdev_s, ndev_s = params
302 bdev = objects.Disk.FromDict(bdev_s)
303 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
304 if bdev is None or ndevs.count(None) > 0:
305 raise ValueError("can't unserialize data!")
306 return backend.BlockdevAddchildren(bdev, ndevs)
307
308 @staticmethod
310 """Remove a child from a mirror device.
311
312 This is only valid for mirror devices, of course. It's the callers
313 duty to send a correct disk, otherwise we raise an error.
314
315 """
316 bdev_s, ndev_s = params
317 bdev = objects.Disk.FromDict(bdev_s)
318 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
319 if bdev is None or ndevs.count(None) > 0:
320 raise ValueError("can't unserialize data!")
321 return backend.BlockdevRemovechildren(bdev, ndevs)
322
323 @staticmethod
332
333 @staticmethod
335 """Return the mirror status for a list of disks.
336
337 """
338 (node_disks, ) = params
339
340 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
341
342 result = []
343
344 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
345 if success:
346 result.append((success, status.ToDict()))
347 else:
348 result.append((success, status))
349
350 return result
351
352 @staticmethod
354 """Expose the FindBlockDevice functionality for a disk.
355
356 This will try to find but not activate a disk.
357
358 """
359 disk = objects.Disk.FromDict(params[0])
360
361 result = backend.BlockdevFind(disk)
362 if result is None:
363 return None
364
365 return result.ToDict()
366
367 @staticmethod
369 """Create a snapshot device.
370
371 Note that this is only valid for LVM disks, if we get passed
372 something else we raise an exception. The snapshot device can be
373 remove by calling the generic block device remove call.
374
375 """
376 cfbd = objects.Disk.FromDict(params[0])
377 return backend.BlockdevSnapshot(cfbd)
378
379 @staticmethod
381 """Grow a stack of devices.
382
383 """
384 if len(params) < 5:
385 raise ValueError("Received only %s parameters in blockdev_grow,"
386 " old master?" % len(params))
387 cfbd = objects.Disk.FromDict(params[0])
388 amount = params[1]
389 dryrun = params[2]
390 backingstore = params[3]
391 excl_stor = params[4]
392 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
393
394 @staticmethod
401
402 @staticmethod
409
410 @staticmethod
418
419
420
421 @staticmethod
423 """Disconnects the network connection of drbd disks.
424
425 Note that this is only valid for drbd disks, so the members of the
426 disk list must all be drbd devices.
427
428 """
429 (disks,) = params
430 disks = [objects.Disk.FromDict(disk) for disk in disks]
431 return backend.DrbdDisconnectNet(disks)
432
433 @staticmethod
435 """Attaches the network connection of drbd disks.
436
437 Note that this is only valid for drbd disks, so the members of the
438 disk list must all be drbd devices.
439
440 """
441 disks, instance_name, multimaster = params
442 disks = [objects.Disk.FromDict(disk) for disk in disks]
443 return backend.DrbdAttachNet(disks, instance_name, multimaster)
444
445 @staticmethod
447 """Wait until DRBD disks are synched.
448
449 Note that this is only valid for drbd disks, so the members of the
450 disk list must all be drbd devices.
451
452 """
453 (disks,) = params
454 disks = [objects.Disk.FromDict(disk) for disk in disks]
455 return backend.DrbdWaitSync(disks)
456
457 @staticmethod
459 """Checks if the drbd devices need activation
460
461 Note that this is only valid for drbd disks, so the members of the
462 disk list must all be drbd devices.
463
464 """
465 (disks,) = params
466 disks = [objects.Disk.FromDict(disk) for disk in disks]
467 return backend.DrbdNeedsActivation(disks)
468
469 @staticmethod
475
476
477
478 @staticmethod
493
494 @staticmethod
496 """Query information about an existing export on this node.
497
498 The given path may not contain an export, in which case we return
499 None.
500
501 """
502 path = params[0]
503 return backend.ExportInfo(path)
504
505 @staticmethod
507 """List the available exports on this node.
508
509 Note that as opposed to export_info, which may query data about an
510 export in any path, this only queries the standard Ganeti path
511 (pathutils.EXPORT_DIR).
512
513 """
514 return backend.ListExports()
515
516 @staticmethod
518 """Remove an export.
519
520 """
521 export = params[0]
522 return backend.RemoveExport(export)
523
524
525 @staticmethod
527 """Query the list of block devices
528
529 """
530 devices = params[0]
531 return backend.GetBlockDevSizes(devices)
532
533
534
535 @staticmethod
537 """Query the list of logical volumes in a given volume group.
538
539 """
540 vgname = params[0]
541 return backend.GetVolumeList(vgname)
542
543 @staticmethod
549
550
551
552 @staticmethod
554 """Get list of storage units.
555
556 """
557 (su_name, su_args, name, fields) = params
558 return container.GetStorage(su_name, *su_args).List(name, fields)
559
560 @staticmethod
567
568 @staticmethod
575
576
577
578 @staticmethod
580 """Check if all bridges given exist on this node.
581
582 """
583 bridges_list = params[0]
584 return backend.BridgesExist(bridges_list)
585
586
587
588 @staticmethod
590 """Install an OS on a given instance.
591
592 """
593 inst_s = params[0]
594 inst = objects.Instance.FromDict(inst_s)
595 reinstall = params[1]
596 debug = params[2]
597 return backend.InstanceOsAdd(inst, reinstall, debug)
598
599 @staticmethod
607
608 @staticmethod
618
619 @staticmethod
628
629 @staticmethod
631 """Hotplugs device to a running instance.
632
633 """
634 (idict, action, dev_type, ddict, extra, seq) = params
635 instance = objects.Instance.FromDict(idict)
636 if dev_type == constants.HOTPLUG_TARGET_DISK:
637 device = objects.Disk.FromDict(ddict)
638 elif dev_type == constants.HOTPLUG_TARGET_NIC:
639 device = objects.NIC.FromDict(ddict)
640 else:
641 assert dev_type in constants.HOTPLUG_ALL_TARGETS
642 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
643
644 @staticmethod
651
652 @staticmethod
659
660 @staticmethod
667
668 @staticmethod
676
677 @staticmethod
685
686 @staticmethod
694
695 @staticmethod
703
704 @staticmethod
711
712 @staticmethod
724
725 @staticmethod
733
734 @staticmethod
736 """Query instance information.
737
738 """
739 (instance_name, hypervisor_name, hvparams) = params
740 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
741
742 @staticmethod
749
750 @staticmethod
752 """Query information about all instances.
753
754 """
755 (hypervisor_list, all_hvparams) = params
756 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
757
758 @staticmethod
764
765 @staticmethod
767 """Query the list of running instances.
768
769 """
770 (hypervisor_list, hvparams) = params
771 return backend.GetInstanceList(hypervisor_list, hvparams)
772
773
774
775 @staticmethod
777 """Checks if a node has the given ip address.
778
779 """
780 return netutils.IPAddress.Own(params[0])
781
782 @staticmethod
784 """Query node information.
785
786 """
787 (storage_units, hv_specs) = params
788 return backend.GetNodeInfo(storage_units, hv_specs)
789
790 @staticmethod
792 """Modify a node entry in /etc/hosts.
793
794 """
795 backend.EtcHostsModify(params[0], params[1], params[2])
796
797 return True
798
799 @staticmethod
801 """Run a verify sequence on this node.
802
803 """
804 (what, cluster_name, hvparams, node_groups, groups_cfg) = params
805 return backend.VerifyNode(what, cluster_name, hvparams,
806 node_groups, groups_cfg)
807
808 @classmethod
810 """Run a light verify sequence on this node.
811
812 This call is meant to perform a less strict verification of the node in
813 certain situations. Right now, it is invoked only when a node is just about
814 to be added to a cluster, and even then, it performs the same checks as
815 L{perspective_node_verify}.
816 """
817 return cls.perspective_node_verify(params)
818
819 @staticmethod
825
826 @staticmethod
833
834 @staticmethod
841
842 @staticmethod
848
849 @staticmethod
851 """Change the master IP netmask.
852
853 """
854 return backend.ChangeMasterNetmask(params[0], params[1], params[2],
855 params[3])
856
857 @staticmethod
863
864 @staticmethod
866 """Query the list of all logical volume groups.
867
868 """
869 return backend.NodeVolumes()
870
871 @staticmethod
873 """Demote a node from the master candidate role.
874
875 """
876 return backend.DemoteFromMC()
877
878 @staticmethod
880 """Tries to powercycle the node.
881
882 """
883 (hypervisor_type, hvparams) = params
884 return backend.PowercycleNode(hypervisor_type, hvparams)
885
886 @staticmethod
893
894 @staticmethod
896 """Gets the node's public crypto tokens.
897
898 """
899 token_requests = params[0]
900 return backend.GetCryptoTokens(token_requests)
901
902 @staticmethod
904 """Ensure daemon is running.
905
906 """
907 (daemon_name, run) = params
908 return backend.EnsureDaemon(daemon_name, run)
909
910
911
912 @staticmethod
918
919 @staticmethod
921 """Upload a file.
922
923 Note that the backend implementation imposes strict rules on which
924 files are accepted.
925
926 """
927 return backend.UploadFile(*(params[0]))
928
929 @staticmethod
931 """Upload a file.
932
933 Note that the backend implementation imposes strict rules on which
934 files are accepted.
935
936 """
937 return backend.UploadFile(*params)
938
939 @staticmethod
945
946 @staticmethod
957
958 @staticmethod
966
967 @staticmethod
974
975 @staticmethod
981
982 @staticmethod
989
990 @staticmethod
992 """Get info on whether a file exists and its properties.
993
994 """
995 (path, ) = params
996 return backend.GetFileInfo(path)
997
998
999
1000 @staticmethod
1002 """Query detailed information about existing OSes.
1003
1004 """
1005 return backend.DiagnoseOS()
1006
1007 @staticmethod
1009 """Run a given OS' validation routine.
1010
1011 """
1012 required, name, checks, params, force_variant = params
1013 return backend.ValidateOS(required, name, checks, params, force_variant)
1014
1015 @staticmethod
1023
1024
1025
1026 @staticmethod
1028 """Query detailed information about existing extstorage providers.
1029
1030 """
1031 return backend.DiagnoseExtStorage()
1032
1033
1034
1035 @staticmethod
1037 """Run hook scripts.
1038
1039 """
1040 hpath, phase, env = params
1041 hr = backend.HooksRunner()
1042 return hr.RunHooks(hpath, phase, env)
1043
1044
1045
1046 @staticmethod
1048 """Run an iallocator script.
1049
1050 """
1051 name, idata, ial_params_dict = params
1052 ial_params = []
1053 for ial_param in ial_params_dict.items():
1054 ial_params.append("--" + ial_param[0] + "=" + ial_param[1])
1055 iar = backend.IAllocatorRunner()
1056 return iar.Run(name, idata, ial_params)
1057
1058
1059
1060 @staticmethod
1062 """Run test delay.
1063
1064 """
1065 duration = params[0]
1066 status, rval = utils.TestDelay(duration)
1067 if not status:
1068 raise backend.RPCFail(rval)
1069 return rval
1070
1071
1072
1073 @staticmethod
1075 """Create the file storage directory.
1076
1077 """
1078 file_storage_dir = params[0]
1079 return backend.CreateFileStorageDir(file_storage_dir)
1080
1081 @staticmethod
1083 """Remove the file storage directory.
1084
1085 """
1086 file_storage_dir = params[0]
1087 return backend.RemoveFileStorageDir(file_storage_dir)
1088
1089 @staticmethod
1091 """Rename the file storage directory.
1092
1093 """
1094 old_file_storage_dir = params[0]
1095 new_file_storage_dir = params[1]
1096 return backend.RenameFileStorageDir(old_file_storage_dir,
1097 new_file_storage_dir)
1098
1099
1100
1101 @staticmethod
1102 @_RequireJobQueueLock
1104 """Update job queue.
1105
1106 """
1107 (file_name, content) = params
1108 return backend.JobQueueUpdate(file_name, content)
1109
1110 @staticmethod
1111 @_RequireJobQueueLock
1117
1118 @staticmethod
1119 @_RequireJobQueueLock
1121 """Rename a job queue file.
1122
1123 """
1124
1125 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1126
1127 @staticmethod
1128 @_RequireJobQueueLock
1130 """Set job queue's drain flag.
1131
1132 """
1133 (flag, ) = params
1134
1135 return jstore.SetDrainFlag(flag)
1136
1137
1138
1139 @staticmethod
1141 """Validate the hypervisor parameters.
1142
1143 """
1144 (hvname, hvparams) = params
1145 return backend.ValidateHVParams(hvname, hvparams)
1146
1147
1148
1149 @staticmethod
1151 """Creates a new X509 certificate for SSL/TLS.
1152
1153 """
1154 (validity, ) = params
1155 return backend.CreateX509Certificate(validity)
1156
1157 @staticmethod
1164
1165
1166
1167 @staticmethod
1169 """Starts an import daemon.
1170
1171 """
1172 (opts_s, instance, component, (dest, dest_args)) = params
1173
1174 opts = objects.ImportExportOptions.FromDict(opts_s)
1175
1176 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1177 None, None,
1178 objects.Instance.FromDict(instance),
1179 component, dest,
1180 _DecodeImportExportIO(dest,
1181 dest_args))
1182
1183 @staticmethod
1185 """Starts an export daemon.
1186
1187 """
1188 (opts_s, host, port, instance, component, (source, source_args)) = params
1189
1190 opts = objects.ImportExportOptions.FromDict(opts_s)
1191
1192 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1193 host, port,
1194 objects.Instance.FromDict(instance),
1195 component, source,
1196 _DecodeImportExportIO(source,
1197 source_args))
1198
1199 @staticmethod
1205
1206 @staticmethod
1212
1213 @staticmethod
1219
1222 """Initial checks whether to run or exit with a failure.
1223
1224 """
1225 if args:
1226 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1227 sys.argv[0])
1228 sys.exit(constants.EXIT_FAILURE)
1229 try:
1230 codecs.lookup("string-escape")
1231 except LookupError:
1232 print >> sys.stderr, ("Can't load the string-escape code which is part"
1233 " of the Python installation. Is your installation"
1234 " complete/correct? Aborting.")
1235 sys.exit(constants.EXIT_FAILURE)
1236
1239 """Callback function to verify a peer against the candidate cert map.
1240
1241 Note that we have a chicken-and-egg problem during cluster init and upgrade.
1242 This method checks whether the incoming connection comes from a master
1243 candidate by comparing it to the master certificate map in the cluster
1244 configuration. However, during cluster init and cluster upgrade there
1245 are various RPC calls done to the master node itself, before the candidate
1246 certificate list is established and the cluster configuration is written.
1247 In this case, we cannot check against the master candidate map.
1248
1249 This problem is solved by checking whether the candidate map is empty. An
1250 initialized 2.11 or higher cluster has at least one entry for the master
1251 node in the candidate map. If the map is empty, we know that we are still
1252 in the bootstrap/upgrade phase. In this case, we read the server certificate
1253 digest and compare it to the incoming request.
1254
1255 This means that after an upgrade of Ganeti, the system continues to operate
1256 like before, using server certificates only. After the client certificates
1257 are generated with ``gnt-cluster renew-crypto --new-node-certificates``,
1258 RPC communication is switched to using client certificates and the trick of
1259 using server certificates does not work anymore.
1260
1261 @type conn: C{OpenSSL.SSL.Connection}
1262 @param conn: the OpenSSL connection object
1263 @type cert: C{OpenSSL.X509}
1264 @param cert: the peer's SSL certificate
1265 @type errdepth: integer
1266 @param errdepth: number of the step in the certificate chain starting at 0
1267 for the actual client certificate.
1268
1269 """
1270
1271
1272
1273
1274
1275
1276 if errdepth > 0:
1277 server_digest = utils.GetCertificateDigest(
1278 cert_filename=pathutils.NODED_CERT_FILE)
1279 match = cert.digest("sha1") == server_digest
1280 if not match:
1281 logging.debug("Received certificate from the certificate chain, which"
1282 " does not match the server certficate. Digest of the"
1283 " received certificate: %s. Digest of the server"
1284 " certificate: %s.", cert.digest("sha1"), server_digest)
1285 return match
1286 elif errdepth == 0:
1287 sstore = ssconf.SimpleStore()
1288 try:
1289 candidate_certs = sstore.GetMasterCandidatesCertMap()
1290 except errors.ConfigurationError:
1291 logging.info("No candidate certificates found. Switching to "
1292 "bootstrap/update mode.")
1293 candidate_certs = None
1294 if not candidate_certs:
1295 candidate_certs = {
1296 constants.CRYPTO_BOOTSTRAP: utils.GetCertificateDigest(
1297 cert_filename=pathutils.NODED_CERT_FILE)}
1298 match = cert.digest("sha1") in candidate_certs.values()
1299 if not match:
1300 logging.debug("Received certificate which is not a certificate of a"
1301 " master candidate. Certificate digest: %s. List of master"
1302 " candidate certificate digests: %s.", cert.digest("sha1"),
1303 str(candidate_certs))
1304 return match
1305 else:
1306 logging.error("Invalid errdepth value: %s.", errdepth)
1307 return False
1308
1312 """Preparation node daemon function, executed with the PID file held.
1313
1314 """
1315 if options.mlock:
1316 request_executor_class = MlockallRequestExecutor
1317 try:
1318 utils.Mlockall()
1319 except errors.NoCtypesError:
1320 logging.warning("Cannot set memory lock, ctypes module not found")
1321 request_executor_class = http.server.HttpServerRequestExecutor
1322 else:
1323 request_executor_class = http.server.HttpServerRequestExecutor
1324
1325
1326 if options.ssl:
1327 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1328 ssl_cert_path=options.ssl_cert)
1329 else:
1330 ssl_params = None
1331
1332 err = _PrepareQueueLock()
1333 if err is not None:
1334
1335
1336
1337 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1338
1339 handler = NodeRequestHandler()
1340
1341 mainloop = daemon.Mainloop()
1342 server = \
1343 http.server.HttpServer(mainloop, options.bind_address, options.port,
1344 handler, ssl_params=ssl_params, ssl_verify_peer=True,
1345 request_executor_class=request_executor_class,
1346 ssl_verify_callback=SSLVerifyPeer)
1347 server.Start()
1348
1349 return (mainloop, server)
1350
1351
1352 -def ExecNoded(options, args, prep_data):
1353 """Main node daemon function, executed with the PID file held.
1354
1355 """
1356 (mainloop, server) = prep_data
1357 try:
1358 mainloop.Run()
1359 finally:
1360 server.Stop()
1361
1364 """Main function for the node daemon.
1365
1366 """
1367 parser = OptionParser(description="Ganeti node daemon",
1368 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1369 " [-i INTERFACE]"),
1370 version="%%prog (ganeti) %s" %
1371 constants.RELEASE_VERSION)
1372 parser.add_option("--no-mlock", dest="mlock",
1373 help="Do not mlock the node memory in ram",
1374 default=True, action="store_false")
1375
1376 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1377 default_ssl_cert=pathutils.NODED_CERT_FILE,
1378 default_ssl_key=pathutils.NODED_CERT_FILE,
1379 console_logging=True,
1380 warn_breach=True)
1381