1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti node daemon"""
23
24
25
26
27
28
29
30
31
32 import os
33 import sys
34 import logging
35 import signal
36
37 from optparse import OptionParser
38
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
49 from ganeti import netutils
50
51 import ganeti.http.server
52
53
54 queue_lock = None
58 """Try to prepare the queue lock.
59
60 @return: None for success, otherwise an exception object
61
62 """
63 global queue_lock
64
65 if queue_lock is not None:
66 return None
67
68
69 try:
70 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
71 return None
72 except EnvironmentError, err:
73 return err
74
77 """Decorator for job queue manipulating functions.
78
79 """
80 QUEUE_LOCK_TIMEOUT = 10
81
82 def wrapper(*args, **kwargs):
83
84
85 if _PrepareQueueLock() is not None:
86 raise errors.JobQueueError("Job queue failed initialization,"
87 " cannot update jobs")
88 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
89 try:
90 return fn(*args, **kwargs)
91 finally:
92 queue_lock.Unlock()
93
94 return wrapper
95
110
113 """Custom Request Executor class that ensures NodeHttpServer children are
114 locked in ram.
115
116 """
121
124 """The server implementation.
125
126 This class holds all methods exposed over the RPC interface.
127
128 """
129
130
131
135
137 """Handle a request.
138
139 """
140 if req.request_method.upper() != http.HTTP_PUT:
141 raise http.HttpBadRequest()
142
143 path = req.request_path
144 if path.startswith("/"):
145 path = path[1:]
146
147 method = getattr(self, "perspective_%s" % path, None)
148 if method is None:
149 raise http.HttpNotFound()
150
151 try:
152 result = (True, method(serializer.LoadJson(req.request_body)))
153
154 except backend.RPCFail, err:
155
156
157
158 result = (False, str(err))
159 except errors.QuitGanetiException, err:
160
161 logging.info("Shutting down the node daemon, arguments: %s",
162 str(err.args))
163 os.kill(self.noded_pid, signal.SIGTERM)
164
165
166 result = err.args
167 except Exception, err:
168 logging.exception("Error in RPC call")
169 result = (False, "Error while executing backend function: %s" % str(err))
170
171 return serializer.DumpJson(result, indent=False)
172
173
174
175 @staticmethod
177 """Create a block device.
178
179 """
180 bdev_s, size, owner, on_primary, info = params
181 bdev = objects.Disk.FromDict(bdev_s)
182 if bdev is None:
183 raise ValueError("can't unserialize data!")
184 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
185
186 @staticmethod
194
195 @staticmethod
203
204 @staticmethod
211
212 @staticmethod
222
223 @staticmethod
233
234 @staticmethod
236 """Add a child to a mirror device.
237
238 Note: this is only valid for mirror devices. It's the caller's duty
239 to send a correct disk, otherwise we raise an error.
240
241 """
242 bdev_s, ndev_s = params
243 bdev = objects.Disk.FromDict(bdev_s)
244 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
245 if bdev is None or ndevs.count(None) > 0:
246 raise ValueError("can't unserialize data!")
247 return backend.BlockdevAddchildren(bdev, ndevs)
248
249 @staticmethod
251 """Remove a child from a mirror device.
252
253 This is only valid for mirror devices, of course. It's the callers
254 duty to send a correct disk, otherwise we raise an error.
255
256 """
257 bdev_s, ndev_s = params
258 bdev = objects.Disk.FromDict(bdev_s)
259 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
260 if bdev is None or ndevs.count(None) > 0:
261 raise ValueError("can't unserialize data!")
262 return backend.BlockdevRemovechildren(bdev, ndevs)
263
264 @staticmethod
273
274 @staticmethod
276 """Return the mirror status for a list of disks.
277
278 """
279 (node_disks, ) = params
280
281 node_name = netutils.Hostname.GetSysName()
282
283 disks = [objects.Disk.FromDict(dsk_s)
284 for dsk_s in node_disks.get(node_name, [])]
285
286 result = []
287
288 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
289 if success:
290 result.append((success, status.ToDict()))
291 else:
292 result.append((success, status))
293
294 return result
295
296 @staticmethod
298 """Expose the FindBlockDevice functionality for a disk.
299
300 This will try to find but not activate a disk.
301
302 """
303 disk = objects.Disk.FromDict(params[0])
304
305 result = backend.BlockdevFind(disk)
306 if result is None:
307 return None
308
309 return result.ToDict()
310
311 @staticmethod
313 """Create a snapshot device.
314
315 Note that this is only valid for LVM disks, if we get passed
316 something else we raise an exception. The snapshot device can be
317 remove by calling the generic block device remove call.
318
319 """
320 cfbd = objects.Disk.FromDict(params[0])
321 return backend.BlockdevSnapshot(cfbd)
322
323 @staticmethod
331
332 @staticmethod
339
340 @staticmethod
347
348 @staticmethod
350 """Compute the sizes of the given block devices.
351
352 """
353 disk = objects.Disk.FromDict(params[0])
354 dest_node, dest_path, cluster_name = params[1:]
355 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
356
357
358
359 @staticmethod
361 """Disconnects the network connection of drbd disks.
362
363 Note that this is only valid for drbd disks, so the members of the
364 disk list must all be drbd devices.
365
366 """
367 nodes_ip, disks = params
368 disks = [objects.Disk.FromDict(cf) for cf in disks]
369 return backend.DrbdDisconnectNet(nodes_ip, disks)
370
371 @staticmethod
373 """Attaches the network connection of drbd disks.
374
375 Note that this is only valid for drbd disks, so the members of the
376 disk list must all be drbd devices.
377
378 """
379 nodes_ip, disks, instance_name, multimaster = params
380 disks = [objects.Disk.FromDict(cf) for cf in disks]
381 return backend.DrbdAttachNet(nodes_ip, disks,
382 instance_name, multimaster)
383
384 @staticmethod
386 """Wait until DRBD disks are synched.
387
388 Note that this is only valid for drbd disks, so the members of the
389 disk list must all be drbd devices.
390
391 """
392 nodes_ip, disks = params
393 disks = [objects.Disk.FromDict(cf) for cf in disks]
394 return backend.DrbdWaitSync(nodes_ip, disks)
395
396 @staticmethod
402
403
404
405 @staticmethod
420
421 @staticmethod
423 """Query information about an existing export on this node.
424
425 The given path may not contain an export, in which case we return
426 None.
427
428 """
429 path = params[0]
430 return backend.ExportInfo(path)
431
432 @staticmethod
434 """List the available exports on this node.
435
436 Note that as opposed to export_info, which may query data about an
437 export in any path, this only queries the standard Ganeti path
438 (constants.EXPORT_DIR).
439
440 """
441 return backend.ListExports()
442
443 @staticmethod
445 """Remove an export.
446
447 """
448 export = params[0]
449 return backend.RemoveExport(export)
450
451
452
453 @staticmethod
455 """Query the list of logical volumes in a given volume group.
456
457 """
458 vgname = params[0]
459 return backend.GetVolumeList(vgname)
460
461 @staticmethod
467
468
469
470 @staticmethod
472 """Get list of storage units.
473
474 """
475 (su_name, su_args, name, fields) = params
476 return storage.GetStorage(su_name, *su_args).List(name, fields)
477
478 @staticmethod
480 """Modify a storage unit.
481
482 """
483 (su_name, su_args, name, changes) = params
484 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
485
486 @staticmethod
488 """Execute an operation on a storage unit.
489
490 """
491 (su_name, su_args, name, op) = params
492 return storage.GetStorage(su_name, *su_args).Execute(name, op)
493
494
495
496 @staticmethod
498 """Check if all bridges given exist on this node.
499
500 """
501 bridges_list = params[0]
502 return backend.BridgesExist(bridges_list)
503
504
505
506 @staticmethod
508 """Install an OS on a given instance.
509
510 """
511 inst_s = params[0]
512 inst = objects.Instance.FromDict(inst_s)
513 reinstall = params[1]
514 debug = params[2]
515 return backend.InstanceOsAdd(inst, reinstall, debug)
516
517 @staticmethod
525
526 @staticmethod
534
535 @staticmethod
542
543 @staticmethod
550
551 @staticmethod
559
560 @staticmethod
568
569 @staticmethod
577
578 @staticmethod
587
588 @staticmethod
594
595 @staticmethod
602
603 @staticmethod
609
610 @staticmethod
616
617
618
619 @staticmethod
621 """Do a TcpPing on the remote node.
622
623 """
624 return netutils.TcpPing(params[1], params[2], timeout=params[3],
625 live_port_needed=params[4], source=params[0])
626
627 @staticmethod
629 """Checks if a node has the given ip address.
630
631 """
632 return netutils.IPAddress.Own(params[0])
633
634 @staticmethod
636 """Query node information.
637
638 """
639 vgname, hypervisor_type = params
640 return backend.GetNodeInfo(vgname, hypervisor_type)
641
642 @staticmethod
644 """Modify a node entry in /etc/hosts.
645
646 """
647 backend.EtcHostsModify(params[0], params[1], params[2])
648
649 return True
650
651 @staticmethod
653 """Run a verify sequence on this node.
654
655 """
656 return backend.VerifyNode(params[0], params[1])
657
658 @staticmethod
660 """Promote this node to master status.
661
662 """
663 return backend.StartMaster(params[0], params[1])
664
665 @staticmethod
667 """Demote this node from master status.
668
669 """
670 return backend.StopMaster(params[0])
671
672 @staticmethod
678
679 @staticmethod
681 """Query the list of all logical volume groups.
682
683 """
684 return backend.NodeVolumes()
685
686 @staticmethod
688 """Demote a node from the master candidate role.
689
690 """
691 return backend.DemoteFromMC()
692
693
694 @staticmethod
696 """Tries to powercycle the nod.
697
698 """
699 hypervisor_type = params[0]
700 return backend.PowercycleNode(hypervisor_type)
701
702
703
704
705 @staticmethod
711
712 @staticmethod
714 """Upload a file.
715
716 Note that the backend implementation imposes strict rules on which
717 files are accepted.
718
719 """
720 return backend.UploadFile(*params)
721
722 @staticmethod
728
729 @staticmethod
736
737
738
739 @staticmethod
741 """Query detailed information about existing OSes.
742
743 """
744 return backend.DiagnoseOS()
745
746 @staticmethod
748 """Query information about a given OS.
749
750 """
751 name = params[0]
752 os_obj = backend.OSFromDisk(name)
753 return os_obj.ToDict()
754
755 @staticmethod
757 """Run a given OS' validation routine.
758
759 """
760 required, name, checks, params = params
761 return backend.ValidateOS(required, name, checks, params)
762
763
764
765 @staticmethod
767 """Run hook scripts.
768
769 """
770 hpath, phase, env = params
771 hr = backend.HooksRunner()
772 return hr.RunHooks(hpath, phase, env)
773
774
775
776 @staticmethod
778 """Run an iallocator script.
779
780 """
781 name, idata = params
782 iar = backend.IAllocatorRunner()
783 return iar.Run(name, idata)
784
785
786
787 @staticmethod
789 """Run test delay.
790
791 """
792 duration = params[0]
793 status, rval = utils.TestDelay(duration)
794 if not status:
795 raise backend.RPCFail(rval)
796 return rval
797
798
799
800 @staticmethod
802 """Create the file storage directory.
803
804 """
805 file_storage_dir = params[0]
806 return backend.CreateFileStorageDir(file_storage_dir)
807
808 @staticmethod
810 """Remove the file storage directory.
811
812 """
813 file_storage_dir = params[0]
814 return backend.RemoveFileStorageDir(file_storage_dir)
815
816 @staticmethod
818 """Rename the file storage directory.
819
820 """
821 old_file_storage_dir = params[0]
822 new_file_storage_dir = params[1]
823 return backend.RenameFileStorageDir(old_file_storage_dir,
824 new_file_storage_dir)
825
826
827
828 @staticmethod
829 @_RequireJobQueueLock
831 """Update job queue.
832
833 """
834 (file_name, content) = params
835 return backend.JobQueueUpdate(file_name, content)
836
837 @staticmethod
838 @_RequireJobQueueLock
844
845 @staticmethod
846 @_RequireJobQueueLock
848 """Rename a job queue file.
849
850 """
851
852 return [backend.JobQueueRename(old, new) for old, new in params]
853
854
855
856 @staticmethod
858 """Validate the hypervisor parameters.
859
860 """
861 (hvname, hvparams) = params
862 return backend.ValidateHVParams(hvname, hvparams)
863
864
865
866 @staticmethod
873
874 @staticmethod
881
882
883
884 @staticmethod
886 """Starts an import daemon.
887
888 """
889 (opts_s, instance, dest, dest_args) = params
890
891 opts = objects.ImportExportOptions.FromDict(opts_s)
892
893 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
894 None, None,
895 objects.Instance.FromDict(instance),
896 dest,
897 _DecodeImportExportIO(dest,
898 dest_args))
899
900 @staticmethod
902 """Starts an export daemon.
903
904 """
905 (opts_s, host, port, instance, source, source_args) = params
906
907 opts = objects.ImportExportOptions.FromDict(opts_s)
908
909 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
910 host, port,
911 objects.Instance.FromDict(instance),
912 source,
913 _DecodeImportExportIO(source,
914 source_args))
915
916 @staticmethod
922
923 @staticmethod
929
930 @staticmethod
936
939 """Initial checks whether to run or exit with a failure.
940
941 """
942 if args:
943 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
944 sys.argv[0])
945 sys.exit(constants.EXIT_FAILURE)
946
949 """Preparation node daemon function, executed with the PID file held.
950
951 """
952 if options.mlock:
953 request_executor_class = MlockallRequestExecutor
954 try:
955 utils.Mlockall()
956 except errors.NoCtypesError:
957 logging.warning("Cannot set memory lock, ctypes module not found")
958 request_executor_class = http.server.HttpServerRequestExecutor
959 else:
960 request_executor_class = http.server.HttpServerRequestExecutor
961
962
963 if options.ssl:
964 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
965 ssl_cert_path=options.ssl_cert)
966 else:
967 ssl_params = None
968
969 err = _PrepareQueueLock()
970 if err is not None:
971
972
973
974 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
975
976 mainloop = daemon.Mainloop()
977 server = NodeHttpServer(mainloop, options.bind_address, options.port,
978 ssl_params=ssl_params, ssl_verify_peer=True,
979 request_executor_class=request_executor_class)
980 server.Start()
981 return (mainloop, server)
982
985 """Main node daemon function, executed with the PID file held.
986
987 """
988 (mainloop, server) = prep_data
989 try:
990 mainloop.Run()
991 finally:
992 server.Stop()
993
996 """Main function for the node daemon.
997
998 """
999 parser = OptionParser(description="Ganeti node daemon",
1000 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1001 version="%%prog (ganeti) %s" %
1002 constants.RELEASE_VERSION)
1003 parser.add_option("--no-mlock", dest="mlock",
1004 help="Do not mlock the node memory in ram",
1005 default=True, action="store_false")
1006
1007 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1008 default_ssl_cert=constants.NODED_CERT_FILE,
1009 default_ssl_key=constants.NODED_CERT_FILE,
1010 console_logging=True)
1011