1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti node daemon"""
23
24
25
26
27
28
29
30
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server
55
56
57 queue_lock = None
61 """Extend the reason trail with noded information
62
63 The trail is extended by appending the name of the noded functionality
64 """
65 assert trail is not None
66 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67 trail.append((trail_source, reason, utils.EpochNano()))
68
71 """Try to prepare the queue lock.
72
73 @return: None for success, otherwise an exception object
74
75 """
76 global queue_lock
77
78 if queue_lock is not None:
79 return None
80
81
82 try:
83 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84 return None
85 except EnvironmentError, err:
86 return err
87
90 """Decorator for job queue manipulating functions.
91
92 """
93 QUEUE_LOCK_TIMEOUT = 10
94
95 def wrapper(*args, **kwargs):
96
97
98 if _PrepareQueueLock() is not None:
99 raise errors.JobQueueError("Job queue failed initialization,"
100 " cannot update jobs")
101 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102 try:
103 return fn(*args, **kwargs)
104 finally:
105 queue_lock.Unlock()
106
107 return wrapper
108
123
126 """Returns value or, if evaluating to False, a default value.
127
128 Returns the given value, unless it evaluates to False. In the latter case the
129 default value is returned.
130
131 @param value: Value to return if it doesn't evaluate to False
132 @param default: Default value
133 @return: Given value or the default
134
135 """
136 if value:
137 return value
138
139 return default
140
143 """Subclass ensuring request handlers are locked in RAM.
144
145 """
150
153 """The server implementation.
154
155 This class holds all methods exposed over the RPC interface.
156
157 """
158
159
160
164
166 """Handle a request.
167
168 """
169 if req.request_method.upper() != http.HTTP_POST:
170 raise http.HttpBadRequest("Only the POST method is supported")
171
172 path = req.request_path
173 if path.startswith("/"):
174 path = path[1:]
175
176 method = getattr(self, "perspective_%s" % path, None)
177 if method is None:
178 raise http.HttpNotFound()
179
180 try:
181 result = (True, method(serializer.LoadJson(req.request_body)))
182
183 except backend.RPCFail, err:
184
185
186
187 result = (False, str(err))
188 except errors.QuitGanetiException, err:
189
190 logging.info("Shutting down the node daemon, arguments: %s",
191 str(err.args))
192 os.kill(self.noded_pid, signal.SIGTERM)
193
194
195 result = err.args
196 except Exception, err:
197 logging.exception("Error in RPC call")
198 result = (False, "Error while executing backend function: %s" % str(err))
199
200 return serializer.DumpJson(result)
201
202
203
204 @staticmethod
206 """Create a block device.
207
208 """
209 (bdev_s, size, owner, on_primary, info, excl_stor) = params
210 bdev = objects.Disk.FromDict(bdev_s)
211 if bdev is None:
212 raise ValueError("can't unserialize data!")
213 return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
214 excl_stor)
215
216 @staticmethod
224
225 @staticmethod
233
234 @staticmethod
242
243 @staticmethod
250
251 @staticmethod
261
262 @staticmethod
272
273 @staticmethod
275 """Add a child to a mirror device.
276
277 Note: this is only valid for mirror devices. It's the caller's duty
278 to send a correct disk, otherwise we raise an error.
279
280 """
281 bdev_s, ndev_s = params
282 bdev = objects.Disk.FromDict(bdev_s)
283 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
284 if bdev is None or ndevs.count(None) > 0:
285 raise ValueError("can't unserialize data!")
286 return backend.BlockdevAddchildren(bdev, ndevs)
287
288 @staticmethod
290 """Remove a child from a mirror device.
291
292 This is only valid for mirror devices, of course. It's the callers
293 duty to send a correct disk, otherwise we raise an error.
294
295 """
296 bdev_s, ndev_s = params
297 bdev = objects.Disk.FromDict(bdev_s)
298 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
299 if bdev is None or ndevs.count(None) > 0:
300 raise ValueError("can't unserialize data!")
301 return backend.BlockdevRemovechildren(bdev, ndevs)
302
303 @staticmethod
312
313 @staticmethod
315 """Return the mirror status for a list of disks.
316
317 """
318 (node_disks, ) = params
319
320 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
321
322 result = []
323
324 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
325 if success:
326 result.append((success, status.ToDict()))
327 else:
328 result.append((success, status))
329
330 return result
331
332 @staticmethod
334 """Expose the FindBlockDevice functionality for a disk.
335
336 This will try to find but not activate a disk.
337
338 """
339 disk = objects.Disk.FromDict(params[0])
340
341 result = backend.BlockdevFind(disk)
342 if result is None:
343 return None
344
345 return result.ToDict()
346
347 @staticmethod
349 """Create a snapshot device.
350
351 Note that this is only valid for LVM disks, if we get passed
352 something else we raise an exception. The snapshot device can be
353 remove by calling the generic block device remove call.
354
355 """
356 cfbd = objects.Disk.FromDict(params[0])
357 return backend.BlockdevSnapshot(cfbd)
358
359 @staticmethod
361 """Grow a stack of devices.
362
363 """
364 if len(params) < 4:
365 raise ValueError("Received only 3 parameters in blockdev_grow,"
366 " old master?")
367 cfbd = objects.Disk.FromDict(params[0])
368 amount = params[1]
369 dryrun = params[2]
370 backingstore = params[3]
371 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
372
373 @staticmethod
380
381 @staticmethod
388
389 @staticmethod
391 """Compute the sizes of the given block devices.
392
393 """
394 disk = objects.Disk.FromDict(params[0])
395 dest_node, dest_path, cluster_name = params[1:]
396 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
397
398 @staticmethod
406
407
408
409 @staticmethod
411 """Disconnects the network connection of drbd disks.
412
413 Note that this is only valid for drbd disks, so the members of the
414 disk list must all be drbd devices.
415
416 """
417 nodes_ip, disks = params
418 disks = [objects.Disk.FromDict(cf) for cf in disks]
419 return backend.DrbdDisconnectNet(nodes_ip, disks)
420
421 @staticmethod
423 """Attaches the network connection of drbd disks.
424
425 Note that this is only valid for drbd disks, so the members of the
426 disk list must all be drbd devices.
427
428 """
429 nodes_ip, disks, instance_name, multimaster = params
430 disks = [objects.Disk.FromDict(cf) for cf in disks]
431 return backend.DrbdAttachNet(nodes_ip, disks,
432 instance_name, multimaster)
433
434 @staticmethod
436 """Wait until DRBD disks are synched.
437
438 Note that this is only valid for drbd disks, so the members of the
439 disk list must all be drbd devices.
440
441 """
442 nodes_ip, disks = params
443 disks = [objects.Disk.FromDict(cf) for cf in disks]
444 return backend.DrbdWaitSync(nodes_ip, disks)
445
446 @staticmethod
452
453
454
455 @staticmethod
470
471 @staticmethod
473 """Query information about an existing export on this node.
474
475 The given path may not contain an export, in which case we return
476 None.
477
478 """
479 path = params[0]
480 return backend.ExportInfo(path)
481
482 @staticmethod
484 """List the available exports on this node.
485
486 Note that as opposed to export_info, which may query data about an
487 export in any path, this only queries the standard Ganeti path
488 (pathutils.EXPORT_DIR).
489
490 """
491 return backend.ListExports()
492
493 @staticmethod
495 """Remove an export.
496
497 """
498 export = params[0]
499 return backend.RemoveExport(export)
500
501
502 @staticmethod
504 """Query the list of block devices
505
506 """
507 devices = params[0]
508 return backend.GetBlockDevSizes(devices)
509
510
511
512 @staticmethod
514 """Query the list of logical volumes in a given volume group.
515
516 """
517 vgname = params[0]
518 return backend.GetVolumeList(vgname)
519
520 @staticmethod
526
527
528
529 @staticmethod
531 """Get list of storage units.
532
533 """
534 (su_name, su_args, name, fields) = params
535 return storage.GetStorage(su_name, *su_args).List(name, fields)
536
537 @staticmethod
539 """Modify a storage unit.
540
541 """
542 (su_name, su_args, name, changes) = params
543 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
544
545 @staticmethod
547 """Execute an operation on a storage unit.
548
549 """
550 (su_name, su_args, name, op) = params
551 return storage.GetStorage(su_name, *su_args).Execute(name, op)
552
553
554
555 @staticmethod
557 """Check if all bridges given exist on this node.
558
559 """
560 bridges_list = params[0]
561 return backend.BridgesExist(bridges_list)
562
563
564
565 @staticmethod
567 """Install an OS on a given instance.
568
569 """
570 inst_s = params[0]
571 inst = objects.Instance.FromDict(inst_s)
572 reinstall = params[1]
573 debug = params[2]
574 return backend.InstanceOsAdd(inst, reinstall, debug)
575
576 @staticmethod
584
585 @staticmethod
595
596 @staticmethod
605
606 @staticmethod
613
614 @staticmethod
622
623 @staticmethod
631
632 @staticmethod
640
641 @staticmethod
649
650 @staticmethod
657
658 @staticmethod
670
671 @staticmethod
679
680 @staticmethod
686
687 @staticmethod
694
695 @staticmethod
701
702 @staticmethod
708
709
710
711 @staticmethod
713 """Checks if a node has the given ip address.
714
715 """
716 return netutils.IPAddress.Own(params[0])
717
718 @staticmethod
720 """Query node information.
721
722 """
723 (vg_names, hv_names, excl_stor) = params
724 return backend.GetNodeInfo(vg_names, hv_names, excl_stor)
725
726 @staticmethod
728 """Modify a node entry in /etc/hosts.
729
730 """
731 backend.EtcHostsModify(params[0], params[1], params[2])
732
733 return True
734
735 @staticmethod
737 """Run a verify sequence on this node.
738
739 """
740 return backend.VerifyNode(params[0], params[1])
741
742 @classmethod
749
750 @staticmethod
756
757 @staticmethod
764
765 @staticmethod
772
773 @staticmethod
779
780 @staticmethod
782 """Change the master IP netmask.
783
784 """
785 return backend.ChangeMasterNetmask(params[0], params[1], params[2],
786 params[3])
787
788 @staticmethod
794
795 @staticmethod
797 """Query the list of all logical volume groups.
798
799 """
800 return backend.NodeVolumes()
801
802 @staticmethod
804 """Demote a node from the master candidate role.
805
806 """
807 return backend.DemoteFromMC()
808
809 @staticmethod
811 """Tries to powercycle the nod.
812
813 """
814 hypervisor_type = params[0]
815 return backend.PowercycleNode(hypervisor_type)
816
817
818
819 @staticmethod
825
826 @staticmethod
828 """Upload a file.
829
830 Note that the backend implementation imposes strict rules on which
831 files are accepted.
832
833 """
834 return backend.UploadFile(*(params[0]))
835
836 @staticmethod
842
843 @staticmethod
854
855 @staticmethod
863
864 @staticmethod
871
872 @staticmethod
878
879 @staticmethod
886
887
888
889 @staticmethod
891 """Query detailed information about existing OSes.
892
893 """
894 return backend.DiagnoseOS()
895
896 @staticmethod
904
905 @staticmethod
907 """Run a given OS' validation routine.
908
909 """
910 required, name, checks, params = params
911 return backend.ValidateOS(required, name, checks, params)
912
913
914
915 @staticmethod
921
922
923
924 @staticmethod
926 """Run hook scripts.
927
928 """
929 hpath, phase, env = params
930 hr = backend.HooksRunner()
931 return hr.RunHooks(hpath, phase, env)
932
933
934
935 @staticmethod
943
944
945
946 @staticmethod
948 """Run test delay.
949
950 """
951 duration = params[0]
952 status, rval = utils.TestDelay(duration)
953 if not status:
954 raise backend.RPCFail(rval)
955 return rval
956
957
958
959 @staticmethod
961 """Create the file storage directory.
962
963 """
964 file_storage_dir = params[0]
965 return backend.CreateFileStorageDir(file_storage_dir)
966
967 @staticmethod
969 """Remove the file storage directory.
970
971 """
972 file_storage_dir = params[0]
973 return backend.RemoveFileStorageDir(file_storage_dir)
974
975 @staticmethod
977 """Rename the file storage directory.
978
979 """
980 old_file_storage_dir = params[0]
981 new_file_storage_dir = params[1]
982 return backend.RenameFileStorageDir(old_file_storage_dir,
983 new_file_storage_dir)
984
985
986
987 @staticmethod
988 @_RequireJobQueueLock
990 """Update job queue.
991
992 """
993 (file_name, content) = params
994 return backend.JobQueueUpdate(file_name, content)
995
996 @staticmethod
997 @_RequireJobQueueLock
1003
1004 @staticmethod
1005 @_RequireJobQueueLock
1007 """Rename a job queue file.
1008
1009 """
1010
1011 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1012
1013 @staticmethod
1014 @_RequireJobQueueLock
1016 """Set job queue's drain flag.
1017
1018 """
1019 (flag, ) = params
1020
1021 return jstore.SetDrainFlag(flag)
1022
1023
1024
1025 @staticmethod
1027 """Validate the hypervisor parameters.
1028
1029 """
1030 (hvname, hvparams) = params
1031 return backend.ValidateHVParams(hvname, hvparams)
1032
1033
1034
1035 @staticmethod
1037 """Creates a new X509 certificate for SSL/TLS.
1038
1039 """
1040 (validity, ) = params
1041 return backend.CreateX509Certificate(validity)
1042
1043 @staticmethod
1050
1051
1052
1053 @staticmethod
1055 """Starts an import daemon.
1056
1057 """
1058 (opts_s, instance, component, (dest, dest_args)) = params
1059
1060 opts = objects.ImportExportOptions.FromDict(opts_s)
1061
1062 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1063 None, None,
1064 objects.Instance.FromDict(instance),
1065 component, dest,
1066 _DecodeImportExportIO(dest,
1067 dest_args))
1068
1069 @staticmethod
1071 """Starts an export daemon.
1072
1073 """
1074 (opts_s, host, port, instance, component, (source, source_args)) = params
1075
1076 opts = objects.ImportExportOptions.FromDict(opts_s)
1077
1078 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1079 host, port,
1080 objects.Instance.FromDict(instance),
1081 component, source,
1082 _DecodeImportExportIO(source,
1083 source_args))
1084
1085 @staticmethod
1091
1092 @staticmethod
1098
1099 @staticmethod
1105
1108 """Initial checks whether to run or exit with a failure.
1109
1110 """
1111 if args:
1112 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1113 sys.argv[0])
1114 sys.exit(constants.EXIT_FAILURE)
1115 try:
1116 codecs.lookup("string-escape")
1117 except LookupError:
1118 print >> sys.stderr, ("Can't load the string-escape code which is part"
1119 " of the Python installation. Is your installation"
1120 " complete/correct? Aborting.")
1121 sys.exit(constants.EXIT_FAILURE)
1122
1125 """Preparation node daemon function, executed with the PID file held.
1126
1127 """
1128 if options.mlock:
1129 request_executor_class = MlockallRequestExecutor
1130 try:
1131 utils.Mlockall()
1132 except errors.NoCtypesError:
1133 logging.warning("Cannot set memory lock, ctypes module not found")
1134 request_executor_class = http.server.HttpServerRequestExecutor
1135 else:
1136 request_executor_class = http.server.HttpServerRequestExecutor
1137
1138
1139 if options.ssl:
1140 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1141 ssl_cert_path=options.ssl_cert)
1142 else:
1143 ssl_params = None
1144
1145 err = _PrepareQueueLock()
1146 if err is not None:
1147
1148
1149
1150 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1151
1152 handler = NodeRequestHandler()
1153
1154 mainloop = daemon.Mainloop()
1155 server = \
1156 http.server.HttpServer(mainloop, options.bind_address, options.port,
1157 handler, ssl_params=ssl_params, ssl_verify_peer=True,
1158 request_executor_class=request_executor_class)
1159 server.Start()
1160
1161 return (mainloop, server)
1162
1163
1164 -def ExecNoded(options, args, prep_data):
1165 """Main node daemon function, executed with the PID file held.
1166
1167 """
1168 (mainloop, server) = prep_data
1169 try:
1170 mainloop.Run()
1171 finally:
1172 server.Stop()
1173
1176 """Main function for the node daemon.
1177
1178 """
1179 parser = OptionParser(description="Ganeti node daemon",
1180 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1181 version="%%prog (ganeti) %s" %
1182 constants.RELEASE_VERSION)
1183 parser.add_option("--no-mlock", dest="mlock",
1184 help="Do not mlock the node memory in ram",
1185 default=True, action="store_false")
1186
1187 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1188 default_ssl_cert=pathutils.NODED_CERT_FILE,
1189 default_ssl_key=pathutils.NODED_CERT_FILE,
1190 console_logging=True)
1191