1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module with helper classes and functions for daemons"""
32
33
34 import asyncore
35 import asynchat
36 import collections
37 import os
38 import signal
39 import logging
40 import sched
41 import time
42 import socket
43 import select
44 import sys
45
46 from ganeti import utils
47 from ganeti import constants
48 from ganeti import errors
49 from ganeti import netutils
50 from ganeti import ssconf
51 from ganeti import runtime
52 from ganeti import compat
56 """Exception used to get out of the scheduler loop
57
58 """
59
62 """Asyncore-compatible scheduler delay function.
63
64 This is a delay function for sched that, rather than actually sleeping,
65 executes asyncore events happening in the meantime.
66
67 After an event has occurred, rather than returning, it raises a
68 SchedulerBreakout exception, which will force the current scheduler.run()
69 invocation to terminate, so that we can also check for signals. The main loop
70 will then call the scheduler run again, which will allow it to actually
71 process any due events.
72
73 This is needed because scheduler.run() doesn't support a count=..., as
74 asyncore loop, and the scheduler module documents throwing exceptions from
75 inside the delay function as an allowed usage model.
76
77 """
78 asyncore.loop(timeout=timeout, count=1, use_poll=True)
79 raise SchedulerBreakout()
80
83 """Event scheduler integrated with asyncore
84
85 """
87 """Initializes this class.
88
89 """
90 sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
91 self._max_delay = None
92
93 - def run(self, max_delay=None):
94 """Run any pending events.
95
96 @type max_delay: None or number
97 @param max_delay: Maximum delay (useful if caller has timeouts running)
98
99 """
100 assert self._max_delay is None
101
102
103
104 if max_delay is None:
105 self._max_delay = None
106 else:
107 self._max_delay = utils.RunningTimeout(max_delay, False)
108
109 try:
110 return sched.scheduler.run(self)
111 finally:
112 self._max_delay = None
113
115 """Custom delay function for C{sched.scheduler}.
116
117 """
118 if self._max_delay is None:
119 timeout = duration
120 else:
121 timeout = min(duration, self._max_delay.Remaining())
122
123 return AsyncoreDelayFunction(timeout)
124
127 """Base Ganeti Asyncore Dispacher
128
129 """
130
132 """Log an error in handling any request, and proceed.
133
134 """
135 logging.exception("Error while handling asyncore request")
136
137
139 """Most of the time we don't want to check for writability.
140
141 """
142 return False
143
146 """A stream server to use with asyncore.
147
148 Each request is accepted, and then dispatched to a separate asyncore
149 dispatcher to handle.
150
151 """
152
153 _REQUEST_QUEUE_SIZE = 5
154
156 """Constructor for AsyncStreamServer
157
158 @type family: integer
159 @param family: socket family (one of socket.AF_*)
160 @type address: address family dependent
161 @param address: address to bind the socket to
162
163 """
164 GanetiBaseAsyncoreDispatcher.__init__(self)
165 self.family = family
166 self.create_socket(self.family, socket.SOCK_STREAM)
167 self.set_reuse_addr()
168 self.bind(address)
169 self.listen(self._REQUEST_QUEUE_SIZE)
170
171
173 """Accept a new client connection.
174
175 Creates a new instance of the handler class, which will use asyncore to
176 serve the client.
177
178 """
179 accept_result = utils.IgnoreSignals(self.accept)
180 if accept_result is not None:
181 connected_socket, client_address = accept_result
182 if self.family == socket.AF_UNIX:
183
184
185 client_address = netutils.GetSocketCredentials(connected_socket)
186 logging.info("Accepted connection from %s",
187 netutils.FormatAddress(client_address, family=self.family))
188 self.handle_connection(connected_socket, client_address)
189
191 """Handle an already accepted connection.
192
193 """
194 raise NotImplementedError
195
198 """A terminator separated message stream asyncore module.
199
200 Handles a stream connection receiving messages terminated by a defined
201 separator. For each complete message handle_message is called.
202
203 """
204 - def __init__(self, connected_socket, peer_address, terminator, family,
205 unhandled_limit):
206 """AsyncTerminatedMessageStream constructor.
207
208 @type connected_socket: socket.socket
209 @param connected_socket: connected stream socket to receive messages from
210 @param peer_address: family-specific peer address
211 @type terminator: string
212 @param terminator: terminator separating messages in the stream
213 @type family: integer
214 @param family: socket family
215 @type unhandled_limit: integer or None
216 @param unhandled_limit: maximum unanswered messages
217
218 """
219
220
221 asynchat.async_chat.__init__(self, connected_socket)
222 self.connected_socket = connected_socket
223
224
225
226 self.family = family
227 self.peer_address = peer_address
228 self.terminator = terminator
229 self.unhandled_limit = unhandled_limit
230 self.set_terminator(terminator)
231 self.ibuffer = []
232 self.receive_count = 0
233 self.send_count = 0
234 self.oqueue = collections.deque()
235 self.iqueue = collections.deque()
236
237
239 self.ibuffer.append(data)
240
242 return (self.unhandled_limit is None or
243 (self.receive_count < self.send_count + self.unhandled_limit) and
244 not self.iqueue)
245
246
248 message = "".join(self.ibuffer)
249 self.ibuffer = []
250 message_id = self.receive_count
251
252
253 can_handle = self._can_handle_message()
254 self.receive_count += 1
255 if can_handle:
256 self.handle_message(message, message_id)
257 else:
258 self.iqueue.append((message, message_id))
259
261 """Handle a terminated message.
262
263 @type message: string
264 @param message: message to handle
265 @type message_id: integer
266 @param message_id: stream's message sequence number
267
268 """
269 pass
270
271
272
274 """Send a message to the remote peer. This function is thread-safe.
275
276 @type message: string
277 @param message: message to send, without the terminator
278
279 @warning: If calling this function from a thread different than the one
280 performing the main asyncore loop, remember that you have to wake that one
281 up.
282
283 """
284
285
286
287
288
289 self.oqueue.append(message)
290
291
295
296
298
299
300
301 return asynchat.async_chat.writable(self) or self.oqueue
302
303
305 if self.oqueue:
306
307
308
309 data = self.oqueue.popleft()
310 self.push(data + self.terminator)
311 self.send_count += 1
312 if self.iqueue:
313 self.handle_message(*self.iqueue.popleft())
314 self.initiate_send()
315
320
321
324
325
327 """Log an error in handling any request, and proceed.
328
329 """
330 logging.exception("Error while handling asyncore request")
331 self.close_log()
332
335 """An improved asyncore udp socket.
336
337 """
346
347
353
354
367
369 """Handle an already read udp datagram
370
371 """
372 raise NotImplementedError
373
374
376
377
378 return bool(self._out_queue)
379
380
382 if not self._out_queue:
383 logging.error("handle_write called with empty output queue")
384 return
385 (ip, port, payload) = self._out_queue[0]
386 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
387 self._out_queue.pop(0)
388
397
399 """Process the next datagram, waiting for it if necessary.
400
401 @type timeout: float
402 @param timeout: how long to wait for data
403 @rtype: boolean
404 @return: True if some data has been handled, False otherwise
405
406 """
407 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
408 if result is not None and result & select.POLLIN:
409 self.handle_read()
410 return True
411 else:
412 return False
413
416 """A way to notify the asyncore loop that something is going on.
417
418 If an asyncore daemon is multithreaded when a thread tries to push some data
419 to a socket, the main loop handling asynchronous requests might be sleeping
420 waiting on a select(). To avoid this it can create an instance of the
421 AsyncAwaker, which other threads can use to wake it up.
422
423 """
425 """Constructor for AsyncAwaker
426
427 @type signal_fn: function
428 @param signal_fn: function to call when awaken
429
430 """
431 GanetiBaseAsyncoreDispatcher.__init__(self)
432 assert signal_fn is None or callable(signal_fn)
433 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
434 socket.SOCK_STREAM)
435 self.in_socket.setblocking(0)
436 self.in_socket.shutdown(socket.SHUT_WR)
437 self.out_socket.shutdown(socket.SHUT_RD)
438 self.set_socket(self.in_socket)
439 self.need_signal = True
440 self.signal_fn = signal_fn
441 self.connected = True
442
443
445 utils.IgnoreSignals(self.recv, 4096)
446 if self.signal_fn:
447 self.signal_fn()
448 self.need_signal = True
449
450
452 asyncore.dispatcher.close(self)
453 self.out_socket.close()
454
456 """Signal the asyncore main loop.
457
458 Any data we send here will be ignored, but it will cause the select() call
459 to return.
460
461 """
462
463
464 if self.need_signal:
465 self.need_signal = False
466 self.out_socket.send(chr(0))
467
470 """Logic for L{Mainloop} shutdown.
471
472 """
474 """Initializes this class.
475
476 @type fn: callable
477 @param fn: Function returning C{None} if mainloop can be stopped or a
478 duration in seconds after which the function should be called again
479 @see: L{Mainloop.Run}
480
481 """
482 assert callable(fn)
483
484 self._fn = fn
485 self._defer = None
486
488 """Checks whether mainloop can be stopped.
489
490 @rtype: bool
491
492 """
493 if self._defer and self._defer.Remaining() > 0:
494
495 return False
496
497
498 timeout = self._fn()
499
500 if timeout is None:
501
502 return True
503
504
505 self._defer = utils.RunningTimeout(timeout, True)
506
507 return False
508
509
510 -class Mainloop(object):
511 """Generic mainloop for daemons
512
513 @ivar scheduler: A sched.scheduler object, which can be used to register
514 timed events
515
516 """
517 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
518
519 - def __init__(self):
520 """Constructs a new Mainloop instance.
521
522 """
523 self._signal_wait = []
524 self.scheduler = AsyncoreScheduler(time.time)
525
526
527 runtime.GetEnts()
528
529 @utils.SignalHandled([signal.SIGCHLD])
530 @utils.SignalHandled([signal.SIGTERM])
531 @utils.SignalHandled([signal.SIGINT])
532 - def Run(self, shutdown_wait_fn=None, signal_handlers=None):
533 """Runs the mainloop.
534
535 @type shutdown_wait_fn: callable
536 @param shutdown_wait_fn: Function to check whether loop can be terminated;
537 B{important}: function must be idempotent and must return either None
538 for shutting down or a timeout for another call
539 @type signal_handlers: dict
540 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
541
542 """
543 assert isinstance(signal_handlers, dict) and \
544 len(signal_handlers) > 0, \
545 "Broken SignalHandled decorator"
546
547
548 shutdown_signals = 0
549
550
551 shutdown_waiter = None
552
553
554 while True:
555 if shutdown_signals == 1 and shutdown_wait_fn is not None:
556 if shutdown_waiter is None:
557 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
558
559
560 if shutdown_waiter.CanShutdown():
561 break
562
563
564 timeout = 1.0
565
566 elif shutdown_signals >= 1:
567
568
569 break
570
571 else:
572
573 timeout = None
574
575 if self.scheduler.empty():
576 asyncore.loop(count=1, timeout=timeout, use_poll=True)
577 else:
578 try:
579 self.scheduler.run(max_delay=timeout)
580 except SchedulerBreakout:
581 pass
582
583
584 for (sig, handler) in signal_handlers.items():
585 if handler.called:
586 self._CallSignalWaiters(sig)
587 if sig in (signal.SIGTERM, signal.SIGINT):
588 logging.info("Received signal %s asking for shutdown", sig)
589 shutdown_signals += 1
590 handler.Clear()
591
592 - def _CallSignalWaiters(self, signum):
593 """Calls all signal waiters for a certain signal.
594
595 @type signum: int
596 @param signum: Signal number
597
598 """
599 for owner in self._signal_wait:
600 owner.OnSignal(signum)
601
602 - def RegisterSignal(self, owner):
603 """Registers a receiver for signal notifications
604
605 The receiver must support a "OnSignal(self, signum)" function.
606
607 @type owner: instance
608 @param owner: Receiver
609
610 """
611 self._signal_wait.append(owner)
612
615 """Verifies the process uid matches the configured uid.
616
617 This method verifies that a daemon is started as the user it is
618 intended to be run
619
620 @param daemon_name: The name of daemon to be started
621 @return: A tuple with the first item indicating success or not,
622 the second item current uid and third with expected uid
623
624 """
625 getents = runtime.GetEnts()
626 running_uid = os.getuid()
627 daemon_uids = {
628 constants.MASTERD: getents.masterd_uid,
629 constants.RAPI: getents.rapi_uid,
630 constants.NODED: getents.noded_uid,
631 constants.CONFD: getents.confd_uid,
632 }
633 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
634
635 return (daemon_uids[daemon_name] == running_uid, running_uid,
636 daemon_uids[daemon_name])
637
640 """Try to format an error better.
641
642 Since we're dealing with daemon startup errors, in many cases this
643 will be due to socket error and such, so we try to format these cases better.
644
645 @param err: an exception object
646 @rtype: string
647 @return: the formatted error description
648
649 """
650 try:
651 if isinstance(err, socket.error):
652 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
653 elif isinstance(err, EnvironmentError):
654 if err.filename is None:
655 return "%s (errno=%s)" % (err.strerror, err.errno)
656 else:
657 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
658 err.errno)
659 else:
660 return str(err)
661 except Exception:
662 logging.exception("Error while handling existing error %s", err)
663 return "%s" % str(err)
664
667 """Handler for SIGHUP.
668
669 @param reopen_fn: List of callback functions for reopening log files
670
671 """
672 logging.info("Reopening log files after receiving SIGHUP")
673
674 for fn in reopen_fn:
675 if fn:
676 fn()
677
678
679 -def GenericMain(daemon_name, optionparser,
680 check_fn, prepare_fn, exec_fn,
681 multithreaded=False, console_logging=False,
682 default_ssl_cert=None, default_ssl_key=None,
683 warn_breach=False):
684 """Shared main function for daemons.
685
686 @type daemon_name: string
687 @param daemon_name: daemon name
688 @type optionparser: optparse.OptionParser
689 @param optionparser: initialized optionparser with daemon-specific options
690 (common -f -d options will be handled by this module)
691 @type check_fn: function which accepts (options, args)
692 @param check_fn: function that checks start conditions and exits if they're
693 not met
694 @type prepare_fn: function which accepts (options, args)
695 @param prepare_fn: function that is run before forking, or None;
696 it's result will be passed as the third parameter to exec_fn, or
697 if None was passed in, we will just pass None to exec_fn
698 @type exec_fn: function which accepts (options, args, prepare_results)
699 @param exec_fn: function that's executed with the daemon's pid file held, and
700 runs the daemon itself.
701 @type multithreaded: bool
702 @param multithreaded: Whether the daemon uses threads
703 @type console_logging: boolean
704 @param console_logging: if True, the daemon will fall back to the system
705 console if logging fails
706 @type default_ssl_cert: string
707 @param default_ssl_cert: Default SSL certificate path
708 @type default_ssl_key: string
709 @param default_ssl_key: Default SSL key path
710 @type warn_breach: bool
711 @param warn_breach: issue a warning at daemon launch time, before
712 daemonizing, about the possibility of breaking parameter privacy
713 invariants through the otherwise helpful debug logging.
714
715 """
716 optionparser.add_option("-f", "--foreground", dest="fork",
717 help="Don't detach from the current terminal",
718 default=True, action="store_false")
719 optionparser.add_option("-d", "--debug", dest="debug",
720 help="Enable some debug messages",
721 default=False, action="store_true")
722 optionparser.add_option("--syslog", dest="syslog",
723 help="Enable logging to syslog (except debug"
724 " messages); one of 'no', 'yes' or 'only' [%s]" %
725 constants.SYSLOG_USAGE,
726 default=constants.SYSLOG_USAGE,
727 choices=["no", "yes", "only"])
728
729 family = ssconf.SimpleStore().GetPrimaryIPFamily()
730
731
732
733 if daemon_name in constants.DAEMONS_PORTS:
734 default_bind_address = constants.IP4_ADDRESS_ANY
735 if family == netutils.IP6Address.family:
736 default_bind_address = constants.IP6_ADDRESS_ANY
737
738 default_port = netutils.GetDaemonPort(daemon_name)
739
740
741 optionparser.add_option("-p", "--port", dest="port",
742 help="Network port (default: %s)" % default_port,
743 default=default_port, type="int")
744 optionparser.add_option("-b", "--bind", dest="bind_address",
745 help=("Bind address (default: '%s')" %
746 default_bind_address),
747 default=default_bind_address, metavar="ADDRESS")
748 optionparser.add_option("-i", "--interface", dest="bind_interface",
749 help=("Bind interface"), metavar="INTERFACE")
750
751 if default_ssl_key is not None and default_ssl_cert is not None:
752 optionparser.add_option("--no-ssl", dest="ssl",
753 help="Do not secure HTTP protocol with SSL",
754 default=True, action="store_false")
755 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
756 help=("SSL key path (default: %s)" %
757 default_ssl_key),
758 default=default_ssl_key, type="string",
759 metavar="SSL_KEY_PATH")
760 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
761 help=("SSL certificate path (default: %s)" %
762 default_ssl_cert),
763 default=default_ssl_cert, type="string",
764 metavar="SSL_CERT_PATH")
765
766
767 if multithreaded:
768 utils.DisableFork()
769
770 options, args = optionparser.parse_args()
771
772 if getattr(options, "bind_interface", None) is not None:
773 if options.bind_address != default_bind_address:
774 msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
775 (options.bind_address, options.bind_interface))
776 print >> sys.stderr, msg
777 sys.exit(constants.EXIT_FAILURE)
778 interface_ip_addresses = \
779 netutils.GetInterfaceIpAddresses(options.bind_interface)
780 if family == netutils.IP6Address.family:
781 if_addresses = interface_ip_addresses[constants.IP6_VERSION]
782 else:
783 if_addresses = interface_ip_addresses[constants.IP4_VERSION]
784 if len(if_addresses) < 1:
785 msg = "Failed to find IP for interface %s" % options.bind_interace
786 print >> sys.stderr, msg
787 sys.exit(constants.EXIT_FAILURE)
788 options.bind_address = if_addresses[0]
789
790 if getattr(options, "ssl", False):
791 ssl_paths = {
792 "certificate": options.ssl_cert,
793 "key": options.ssl_key,
794 }
795
796 for name, path in ssl_paths.iteritems():
797 if not os.path.isfile(path):
798 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
799 sys.exit(constants.EXIT_FAILURE)
800
801
802
803
804
805 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
806 if not result:
807 msg = ("%s started using wrong user ID (%d), expected %d" %
808 (daemon_name, running_uid, expected_uid))
809 print >> sys.stderr, msg
810 sys.exit(constants.EXIT_FAILURE)
811
812 if check_fn is not None:
813 check_fn(options, args)
814
815 log_filename = constants.DAEMONS_LOGFILES[daemon_name]
816
817
818 if options.debug and warn_breach:
819 sys.stderr.write(constants.DEBUG_MODE_CONFIDENTIALITY_WARNING % daemon_name)
820
821 if options.fork:
822
823
824
825
826
827
828 noclose_fds = []
829 for fd in os.listdir("/proc/self/fd"):
830 try:
831 if os.readlink(os.path.join("/proc/self/fd", fd)) == "/dev/urandom":
832 noclose_fds.append(int(fd))
833 except EnvironmentError:
834
835
836 continue
837
838 utils.CloseFDs(noclose_fds=noclose_fds)
839 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
840 else:
841 (wpipe, stdio_reopen_fn) = (None, None)
842
843 log_reopen_fn = \
844 utils.SetupLogging(log_filename, daemon_name,
845 debug=options.debug,
846 stderr_logging=not options.fork,
847 multithreaded=multithreaded,
848 syslog=options.syslog,
849 console_logging=console_logging)
850
851
852 signal.signal(signal.SIGHUP,
853 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
854
855 try:
856 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
857 except errors.PidFileLockError, err:
858 print >> sys.stderr, "Error while locking PID file:\n%s" % err
859 sys.exit(constants.EXIT_FAILURE)
860
861 try:
862 try:
863 logging.info("%s daemon startup", daemon_name)
864 if callable(prepare_fn):
865 prep_results = prepare_fn(options, args)
866 else:
867 prep_results = None
868 except Exception, err:
869 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
870 raise
871
872 if wpipe is not None:
873
874
875 os.close(wpipe)
876
877 exec_fn(options, args, prep_results)
878 finally:
879 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
880