1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti node daemon"""
23
24
25
26
27
28
29
30
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti.storage import container
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server
55
56
57 queue_lock = None
61 """Extend the reason trail with noded information
62
63 The trail is extended by appending the name of the noded functionality
64 """
65 assert trail is not None
66 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67 trail.append((trail_source, reason, utils.EpochNano()))
68
71 """Try to prepare the queue lock.
72
73 @return: None for success, otherwise an exception object
74
75 """
76 global queue_lock
77
78 if queue_lock is not None:
79 return None
80
81
82 try:
83 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84 return None
85 except EnvironmentError, err:
86 return err
87
90 """Decorator for job queue manipulating functions.
91
92 """
93 QUEUE_LOCK_TIMEOUT = 10
94
95 def wrapper(*args, **kwargs):
96
97
98 if _PrepareQueueLock() is not None:
99 raise errors.JobQueueError("Job queue failed initialization,"
100 " cannot update jobs")
101 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102 try:
103 return fn(*args, **kwargs)
104 finally:
105 queue_lock.Unlock()
106
107 return wrapper
108
123
126 """Returns value or, if evaluating to False, a default value.
127
128 Returns the given value, unless it evaluates to False. In the latter case the
129 default value is returned.
130
131 @param value: Value to return if it doesn't evaluate to False
132 @param default: Default value
133 @return: Given value or the default
134
135 """
136 if value:
137 return value
138
139 return default
140
143 """Subclass ensuring request handlers are locked in RAM.
144
145 """
150
153 """The server implementation.
154
155 This class holds all methods exposed over the RPC interface.
156
157 """
158
159
160
164
166 """Handle a request.
167
168 """
169 if req.request_method.upper() != http.HTTP_POST:
170 raise http.HttpBadRequest("Only the POST method is supported")
171
172 path = req.request_path
173 if path.startswith("/"):
174 path = path[1:]
175
176 method = getattr(self, "perspective_%s" % path, None)
177 if method is None:
178 raise http.HttpNotFound()
179
180 try:
181 result = (True, method(serializer.LoadJson(req.request_body)))
182
183 except backend.RPCFail, err:
184
185
186
187 result = (False, str(err))
188 except errors.QuitGanetiException, err:
189
190 logging.info("Shutting down the node daemon, arguments: %s",
191 str(err.args))
192 os.kill(self.noded_pid, signal.SIGTERM)
193
194
195 result = err.args
196 except Exception, err:
197 logging.exception("Error in RPC call")
198 result = (False, "Error while executing backend function: %s" % str(err))
199
200 return serializer.DumpJson(result)
201
202
203
204 @staticmethod
206 """Create a block device.
207
208 """
209 (bdev_s, size, owner, on_primary, info, excl_stor) = params
210 bdev = objects.Disk.FromDict(bdev_s)
211 if bdev is None:
212 raise ValueError("can't unserialize data!")
213 return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
214 excl_stor)
215
216 @staticmethod
224
225 @staticmethod
233
234 @staticmethod
242
243 @staticmethod
250
251 @staticmethod
261
262 @staticmethod
272
273 @staticmethod
275 """Add a child to a mirror device.
276
277 Note: this is only valid for mirror devices. It's the caller's duty
278 to send a correct disk, otherwise we raise an error.
279
280 """
281 bdev_s, ndev_s = params
282 bdev = objects.Disk.FromDict(bdev_s)
283 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
284 if bdev is None or ndevs.count(None) > 0:
285 raise ValueError("can't unserialize data!")
286 return backend.BlockdevAddchildren(bdev, ndevs)
287
288 @staticmethod
290 """Remove a child from a mirror device.
291
292 This is only valid for mirror devices, of course. It's the callers
293 duty to send a correct disk, otherwise we raise an error.
294
295 """
296 bdev_s, ndev_s = params
297 bdev = objects.Disk.FromDict(bdev_s)
298 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
299 if bdev is None or ndevs.count(None) > 0:
300 raise ValueError("can't unserialize data!")
301 return backend.BlockdevRemovechildren(bdev, ndevs)
302
303 @staticmethod
312
313 @staticmethod
315 """Return the mirror status for a list of disks.
316
317 """
318 (node_disks, ) = params
319
320 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
321
322 result = []
323
324 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
325 if success:
326 result.append((success, status.ToDict()))
327 else:
328 result.append((success, status))
329
330 return result
331
332 @staticmethod
334 """Expose the FindBlockDevice functionality for a disk.
335
336 This will try to find but not activate a disk.
337
338 """
339 disk = objects.Disk.FromDict(params[0])
340
341 result = backend.BlockdevFind(disk)
342 if result is None:
343 return None
344
345 return result.ToDict()
346
347 @staticmethod
349 """Create a snapshot device.
350
351 Note that this is only valid for LVM disks, if we get passed
352 something else we raise an exception. The snapshot device can be
353 remove by calling the generic block device remove call.
354
355 """
356 cfbd = objects.Disk.FromDict(params[0])
357 return backend.BlockdevSnapshot(cfbd)
358
359 @staticmethod
361 """Grow a stack of devices.
362
363 """
364 if len(params) < 5:
365 raise ValueError("Received only %s parameters in blockdev_grow,"
366 " old master?" % len(params))
367 cfbd = objects.Disk.FromDict(params[0])
368 amount = params[1]
369 dryrun = params[2]
370 backingstore = params[3]
371 excl_stor = params[4]
372 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
373
374 @staticmethod
381
382 @staticmethod
389
390 @staticmethod
392 """Compute the sizes of the given block devices.
393
394 """
395 disk = objects.Disk.FromDict(params[0])
396 dest_node_ip, dest_path, cluster_name = params[1:]
397 return backend.BlockdevExport(disk, dest_node_ip, dest_path, cluster_name)
398
399 @staticmethod
407
408
409
410 @staticmethod
412 """Disconnects the network connection of drbd disks.
413
414 Note that this is only valid for drbd disks, so the members of the
415 disk list must all be drbd devices.
416
417 """
418 nodes_ip, disks, target_node_uuid = params
419 disks = [objects.Disk.FromDict(cf) for cf in disks]
420 return backend.DrbdDisconnectNet(target_node_uuid, nodes_ip, disks)
421
422 @staticmethod
424 """Attaches the network connection of drbd disks.
425
426 Note that this is only valid for drbd disks, so the members of the
427 disk list must all be drbd devices.
428
429 """
430 nodes_ip, disks, instance_name, multimaster, target_node_uuid = params
431 disks = [objects.Disk.FromDict(cf) for cf in disks]
432 return backend.DrbdAttachNet(target_node_uuid, nodes_ip, disks,
433 instance_name, multimaster)
434
435 @staticmethod
437 """Wait until DRBD disks are synched.
438
439 Note that this is only valid for drbd disks, so the members of the
440 disk list must all be drbd devices.
441
442 """
443 nodes_ip, disks, target_node_uuid = params
444 disks = [objects.Disk.FromDict(cf) for cf in disks]
445 return backend.DrbdWaitSync(target_node_uuid, nodes_ip, disks)
446
447 @staticmethod
449 """Checks if the drbd devices need activation
450
451 Note that this is only valid for drbd disks, so the members of the
452 disk list must all be drbd devices.
453
454 """
455 nodes_ip, disks, target_node_uuid = params
456 disks = [objects.Disk.FromDict(cf) for cf in disks]
457 return backend.DrbdNeedsActivation(target_node_uuid, nodes_ip, disks)
458
459 @staticmethod
465
466
467
468 @staticmethod
483
484 @staticmethod
486 """Query information about an existing export on this node.
487
488 The given path may not contain an export, in which case we return
489 None.
490
491 """
492 path = params[0]
493 return backend.ExportInfo(path)
494
495 @staticmethod
497 """List the available exports on this node.
498
499 Note that as opposed to export_info, which may query data about an
500 export in any path, this only queries the standard Ganeti path
501 (pathutils.EXPORT_DIR).
502
503 """
504 return backend.ListExports()
505
506 @staticmethod
508 """Remove an export.
509
510 """
511 export = params[0]
512 return backend.RemoveExport(export)
513
514
515 @staticmethod
517 """Query the list of block devices
518
519 """
520 devices = params[0]
521 return backend.GetBlockDevSizes(devices)
522
523
524
525 @staticmethod
527 """Query the list of logical volumes in a given volume group.
528
529 """
530 vgname = params[0]
531 return backend.GetVolumeList(vgname)
532
533 @staticmethod
539
540
541
542 @staticmethod
544 """Get list of storage units.
545
546 """
547 (su_name, su_args, name, fields) = params
548 return container.GetStorage(su_name, *su_args).List(name, fields)
549
550 @staticmethod
557
558 @staticmethod
565
566
567
568 @staticmethod
570 """Check if all bridges given exist on this node.
571
572 """
573 bridges_list = params[0]
574 return backend.BridgesExist(bridges_list)
575
576
577
578 @staticmethod
580 """Install an OS on a given instance.
581
582 """
583 inst_s = params[0]
584 inst = objects.Instance.FromDict(inst_s)
585 reinstall = params[1]
586 debug = params[2]
587 return backend.InstanceOsAdd(inst, reinstall, debug)
588
589 @staticmethod
597
598 @staticmethod
608
609 @staticmethod
618
619 @staticmethod
626
627 @staticmethod
635
636 @staticmethod
644
645 @staticmethod
653
654 @staticmethod
662
663 @staticmethod
670
671 @staticmethod
683
684 @staticmethod
692
693 @staticmethod
695 """Query instance information.
696
697 """
698 (instance_name, hypervisor_name, hvparams) = params
699 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
700
701 @staticmethod
708
709 @staticmethod
711 """Query information about all instances.
712
713 """
714 (hypervisor_list, all_hvparams) = params
715 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
716
717 @staticmethod
719 """Query the list of running instances.
720
721 """
722 (hypervisor_list, hvparams) = params
723 return backend.GetInstanceList(hypervisor_list, hvparams)
724
725
726
727 @staticmethod
729 """Checks if a node has the given ip address.
730
731 """
732 return netutils.IPAddress.Own(params[0])
733
734 @staticmethod
736 """Query node information.
737
738 """
739 (storage_units, hv_specs) = params
740 return backend.GetNodeInfo(storage_units, hv_specs)
741
742 @staticmethod
744 """Modify a node entry in /etc/hosts.
745
746 """
747 backend.EtcHostsModify(params[0], params[1], params[2])
748
749 return True
750
751 @staticmethod
753 """Run a verify sequence on this node.
754
755 """
756 (what, cluster_name, hvparams) = params
757 return backend.VerifyNode(what, cluster_name, hvparams)
758
759 @classmethod
766
767 @staticmethod
773
774 @staticmethod
781
782 @staticmethod
789
790 @staticmethod
796
797 @staticmethod
799 """Change the master IP netmask.
800
801 """
802 return backend.ChangeMasterNetmask(params[0], params[1], params[2],
803 params[3])
804
805 @staticmethod
811
812 @staticmethod
814 """Query the list of all logical volume groups.
815
816 """
817 return backend.NodeVolumes()
818
819 @staticmethod
821 """Demote a node from the master candidate role.
822
823 """
824 return backend.DemoteFromMC()
825
826 @staticmethod
828 """Tries to powercycle the nod.
829
830 """
831 (hypervisor_type, hvparams) = params
832 return backend.PowercycleNode(hypervisor_type, hvparams)
833
834
835
836 @staticmethod
842
843 @staticmethod
845 """Upload a file.
846
847 Note that the backend implementation imposes strict rules on which
848 files are accepted.
849
850 """
851 return backend.UploadFile(*(params[0]))
852
853 @staticmethod
859
860 @staticmethod
871
872 @staticmethod
880
881 @staticmethod
888
889 @staticmethod
895
896 @staticmethod
903
904
905
906 @staticmethod
908 """Query detailed information about existing OSes.
909
910 """
911 return backend.DiagnoseOS()
912
913 @staticmethod
921
922 @staticmethod
924 """Run a given OS' validation routine.
925
926 """
927 required, name, checks, params = params
928 return backend.ValidateOS(required, name, checks, params)
929
930
931
932 @staticmethod
938
939
940
941 @staticmethod
943 """Run hook scripts.
944
945 """
946 hpath, phase, env = params
947 hr = backend.HooksRunner()
948 return hr.RunHooks(hpath, phase, env)
949
950
951
952 @staticmethod
960
961
962
963 @staticmethod
965 """Run test delay.
966
967 """
968 duration = params[0]
969 status, rval = utils.TestDelay(duration)
970 if not status:
971 raise backend.RPCFail(rval)
972 return rval
973
974
975
976 @staticmethod
978 """Create the file storage directory.
979
980 """
981 file_storage_dir = params[0]
982 return backend.CreateFileStorageDir(file_storage_dir)
983
984 @staticmethod
986 """Remove the file storage directory.
987
988 """
989 file_storage_dir = params[0]
990 return backend.RemoveFileStorageDir(file_storage_dir)
991
992 @staticmethod
994 """Rename the file storage directory.
995
996 """
997 old_file_storage_dir = params[0]
998 new_file_storage_dir = params[1]
999 return backend.RenameFileStorageDir(old_file_storage_dir,
1000 new_file_storage_dir)
1001
1002
1003
1004 @staticmethod
1005 @_RequireJobQueueLock
1007 """Update job queue.
1008
1009 """
1010 (file_name, content) = params
1011 return backend.JobQueueUpdate(file_name, content)
1012
1013 @staticmethod
1014 @_RequireJobQueueLock
1020
1021 @staticmethod
1022 @_RequireJobQueueLock
1024 """Rename a job queue file.
1025
1026 """
1027
1028 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1029
1030 @staticmethod
1031 @_RequireJobQueueLock
1033 """Set job queue's drain flag.
1034
1035 """
1036 (flag, ) = params
1037
1038 return jstore.SetDrainFlag(flag)
1039
1040
1041
1042 @staticmethod
1044 """Validate the hypervisor parameters.
1045
1046 """
1047 (hvname, hvparams) = params
1048 return backend.ValidateHVParams(hvname, hvparams)
1049
1050
1051
1052 @staticmethod
1054 """Creates a new X509 certificate for SSL/TLS.
1055
1056 """
1057 (validity, ) = params
1058 return backend.CreateX509Certificate(validity)
1059
1060 @staticmethod
1067
1068
1069
1070 @staticmethod
1072 """Starts an import daemon.
1073
1074 """
1075 (opts_s, instance, component, (dest, dest_args)) = params
1076
1077 opts = objects.ImportExportOptions.FromDict(opts_s)
1078
1079 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1080 None, None,
1081 objects.Instance.FromDict(instance),
1082 component, dest,
1083 _DecodeImportExportIO(dest,
1084 dest_args))
1085
1086 @staticmethod
1088 """Starts an export daemon.
1089
1090 """
1091 (opts_s, host, port, instance, component, (source, source_args)) = params
1092
1093 opts = objects.ImportExportOptions.FromDict(opts_s)
1094
1095 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1096 host, port,
1097 objects.Instance.FromDict(instance),
1098 component, source,
1099 _DecodeImportExportIO(source,
1100 source_args))
1101
1102 @staticmethod
1108
1109 @staticmethod
1115
1116 @staticmethod
1122
1125 """Initial checks whether to run or exit with a failure.
1126
1127 """
1128 if args:
1129 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1130 sys.argv[0])
1131 sys.exit(constants.EXIT_FAILURE)
1132 try:
1133 codecs.lookup("string-escape")
1134 except LookupError:
1135 print >> sys.stderr, ("Can't load the string-escape code which is part"
1136 " of the Python installation. Is your installation"
1137 " complete/correct? Aborting.")
1138 sys.exit(constants.EXIT_FAILURE)
1139
1142 """Preparation node daemon function, executed with the PID file held.
1143
1144 """
1145 if options.mlock:
1146 request_executor_class = MlockallRequestExecutor
1147 try:
1148 utils.Mlockall()
1149 except errors.NoCtypesError:
1150 logging.warning("Cannot set memory lock, ctypes module not found")
1151 request_executor_class = http.server.HttpServerRequestExecutor
1152 else:
1153 request_executor_class = http.server.HttpServerRequestExecutor
1154
1155
1156 if options.ssl:
1157 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1158 ssl_cert_path=options.ssl_cert)
1159 else:
1160 ssl_params = None
1161
1162 err = _PrepareQueueLock()
1163 if err is not None:
1164
1165
1166
1167 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1168
1169 handler = NodeRequestHandler()
1170
1171 mainloop = daemon.Mainloop()
1172 server = \
1173 http.server.HttpServer(mainloop, options.bind_address, options.port,
1174 handler, ssl_params=ssl_params, ssl_verify_peer=True,
1175 request_executor_class=request_executor_class)
1176 server.Start()
1177
1178 return (mainloop, server)
1179
1180
1181 -def ExecNoded(options, args, prep_data):
1182 """Main node daemon function, executed with the PID file held.
1183
1184 """
1185 (mainloop, server) = prep_data
1186 try:
1187 mainloop.Run()
1188 finally:
1189 server.Stop()
1190
1193 """Main function for the node daemon.
1194
1195 """
1196 parser = OptionParser(description="Ganeti node daemon",
1197 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1198 " [-i INTERFACE]"),
1199 version="%%prog (ganeti) %s" %
1200 constants.RELEASE_VERSION)
1201 parser.add_option("--no-mlock", dest="mlock",
1202 help="Do not mlock the node memory in ram",
1203 default=True, action="store_false")
1204
1205 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1206 default_ssl_cert=pathutils.NODED_CERT_FILE,
1207 default_ssl_key=pathutils.NODED_CERT_FILE,
1208 console_logging=True)
1209