1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module with helper classes and functions for daemons"""
32
33
34 import asyncore
35 import asynchat
36 import collections
37 import os
38 import signal
39 import logging
40 import sched
41 import time
42 import socket
43 import select
44 import sys
45
46 from ganeti import utils
47 from ganeti import constants
48 from ganeti import errors
49 from ganeti import netutils
50 from ganeti import ssconf
51 from ganeti import runtime
52 from ganeti import compat
56 """Exception used to get out of the scheduler loop
57
58 """
59
62 """Asyncore-compatible scheduler delay function.
63
64 This is a delay function for sched that, rather than actually sleeping,
65 executes asyncore events happening in the meantime.
66
67 After an event has occurred, rather than returning, it raises a
68 SchedulerBreakout exception, which will force the current scheduler.run()
69 invocation to terminate, so that we can also check for signals. The main loop
70 will then call the scheduler run again, which will allow it to actually
71 process any due events.
72
73 This is needed because scheduler.run() doesn't support a count=..., as
74 asyncore loop, and the scheduler module documents throwing exceptions from
75 inside the delay function as an allowed usage model.
76
77 """
78 asyncore.loop(timeout=timeout, count=1, use_poll=True)
79 raise SchedulerBreakout()
80
83 """Event scheduler integrated with asyncore
84
85 """
87 """Initializes this class.
88
89 """
90 sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
91 self._max_delay = None
92
93 - def run(self, max_delay=None):
94 """Run any pending events.
95
96 @type max_delay: None or number
97 @param max_delay: Maximum delay (useful if caller has timeouts running)
98
99 """
100 assert self._max_delay is None
101
102
103
104 if max_delay is None:
105 self._max_delay = None
106 else:
107 self._max_delay = utils.RunningTimeout(max_delay, False)
108
109 try:
110 return sched.scheduler.run(self)
111 finally:
112 self._max_delay = None
113
115 """Custom delay function for C{sched.scheduler}.
116
117 """
118 if self._max_delay is None:
119 timeout = duration
120 else:
121 timeout = min(duration, self._max_delay.Remaining())
122
123 return AsyncoreDelayFunction(timeout)
124
127 """Base Ganeti Asyncore Dispacher
128
129 """
130
132 """Log an error in handling any request, and proceed.
133
134 """
135 logging.exception("Error while handling asyncore request")
136
137
139 """Most of the time we don't want to check for writability.
140
141 """
142 return False
143
146 """A terminator separated message stream asyncore module.
147
148 Handles a stream connection receiving messages terminated by a defined
149 separator. For each complete message handle_message is called.
150
151 """
152 - def __init__(self, connected_socket, peer_address, terminator, family,
153 unhandled_limit):
154 """AsyncTerminatedMessageStream constructor.
155
156 @type connected_socket: socket.socket
157 @param connected_socket: connected stream socket to receive messages from
158 @param peer_address: family-specific peer address
159 @type terminator: string
160 @param terminator: terminator separating messages in the stream
161 @type family: integer
162 @param family: socket family
163 @type unhandled_limit: integer or None
164 @param unhandled_limit: maximum unanswered messages
165
166 """
167
168
169 asynchat.async_chat.__init__(self, connected_socket)
170 self.connected_socket = connected_socket
171
172
173
174 self.family = family
175 self.peer_address = peer_address
176 self.terminator = terminator
177 self.unhandled_limit = unhandled_limit
178 self.set_terminator(terminator)
179 self.ibuffer = []
180 self.receive_count = 0
181 self.send_count = 0
182 self.oqueue = collections.deque()
183 self.iqueue = collections.deque()
184
185
187 self.ibuffer.append(data)
188
190 return (self.unhandled_limit is None or
191 (self.receive_count < self.send_count + self.unhandled_limit) and
192 not self.iqueue)
193
194
196 message = "".join(self.ibuffer)
197 self.ibuffer = []
198 message_id = self.receive_count
199
200
201 can_handle = self._can_handle_message()
202 self.receive_count += 1
203 if can_handle:
204 self.handle_message(message, message_id)
205 else:
206 self.iqueue.append((message, message_id))
207
209 """Handle a terminated message.
210
211 @type message: string
212 @param message: message to handle
213 @type message_id: integer
214 @param message_id: stream's message sequence number
215
216 """
217 pass
218
219
220
222 """Send a message to the remote peer. This function is thread-safe.
223
224 @type message: string
225 @param message: message to send, without the terminator
226
227 @warning: If calling this function from a thread different than the one
228 performing the main asyncore loop, remember that you have to wake that one
229 up.
230
231 """
232
233
234
235
236
237 self.oqueue.append(message)
238
239
243
244
246
247
248
249 return asynchat.async_chat.writable(self) or self.oqueue
250
251
253 if self.oqueue:
254
255
256
257 data = self.oqueue.popleft()
258 self.push(data + self.terminator)
259 self.send_count += 1
260 if self.iqueue:
261 self.handle_message(*self.iqueue.popleft())
262 self.initiate_send()
263
268
269
272
273
275 """Log an error in handling any request, and proceed.
276
277 """
278 logging.exception("Error while handling asyncore request")
279 self.close_log()
280
283 """An improved asyncore udp socket.
284
285 """
294
295
301
302
315
317 """Handle an already read udp datagram
318
319 """
320 raise NotImplementedError
321
322
324
325
326 return bool(self._out_queue)
327
328
330 if not self._out_queue:
331 logging.error("handle_write called with empty output queue")
332 return
333 (ip, port, payload) = self._out_queue[0]
334 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
335 self._out_queue.pop(0)
336
345
347 """Process the next datagram, waiting for it if necessary.
348
349 @type timeout: float
350 @param timeout: how long to wait for data
351 @rtype: boolean
352 @return: True if some data has been handled, False otherwise
353
354 """
355 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
356 if result is not None and result & select.POLLIN:
357 self.handle_read()
358 return True
359 else:
360 return False
361
364 """A way to notify the asyncore loop that something is going on.
365
366 If an asyncore daemon is multithreaded when a thread tries to push some data
367 to a socket, the main loop handling asynchronous requests might be sleeping
368 waiting on a select(). To avoid this it can create an instance of the
369 AsyncAwaker, which other threads can use to wake it up.
370
371 """
373 """Constructor for AsyncAwaker
374
375 @type signal_fn: function
376 @param signal_fn: function to call when awaken
377
378 """
379 GanetiBaseAsyncoreDispatcher.__init__(self)
380 assert signal_fn is None or callable(signal_fn)
381 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
382 socket.SOCK_STREAM)
383 self.in_socket.setblocking(0)
384 self.in_socket.shutdown(socket.SHUT_WR)
385 self.out_socket.shutdown(socket.SHUT_RD)
386 self.set_socket(self.in_socket)
387 self.need_signal = True
388 self.signal_fn = signal_fn
389 self.connected = True
390
391
393 utils.IgnoreSignals(self.recv, 4096)
394 if self.signal_fn:
395 self.signal_fn()
396 self.need_signal = True
397
398
400 asyncore.dispatcher.close(self)
401 self.out_socket.close()
402
404 """Signal the asyncore main loop.
405
406 Any data we send here will be ignored, but it will cause the select() call
407 to return.
408
409 """
410
411
412 if self.need_signal:
413 self.need_signal = False
414 self.out_socket.send(chr(0))
415
418 """Logic for L{Mainloop} shutdown.
419
420 """
422 """Initializes this class.
423
424 @type fn: callable
425 @param fn: Function returning C{None} if mainloop can be stopped or a
426 duration in seconds after which the function should be called again
427 @see: L{Mainloop.Run}
428
429 """
430 assert callable(fn)
431
432 self._fn = fn
433 self._defer = None
434
436 """Checks whether mainloop can be stopped.
437
438 @rtype: bool
439
440 """
441 if self._defer and self._defer.Remaining() > 0:
442
443 return False
444
445
446 timeout = self._fn()
447
448 if timeout is None:
449
450 return True
451
452
453 self._defer = utils.RunningTimeout(timeout, True)
454
455 return False
456
457
458 -class Mainloop(object):
459 """Generic mainloop for daemons
460
461 @ivar scheduler: A sched.scheduler object, which can be used to register
462 timed events
463
464 """
465 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
466
467 - def __init__(self):
468 """Constructs a new Mainloop instance.
469
470 """
471 self._signal_wait = []
472 self.scheduler = AsyncoreScheduler(time.time)
473
474
475 runtime.GetEnts()
476
477 @utils.SignalHandled([signal.SIGCHLD])
478 @utils.SignalHandled([signal.SIGTERM])
479 @utils.SignalHandled([signal.SIGINT])
480 - def Run(self, shutdown_wait_fn=None, signal_handlers=None):
481 """Runs the mainloop.
482
483 @type shutdown_wait_fn: callable
484 @param shutdown_wait_fn: Function to check whether loop can be terminated;
485 B{important}: function must be idempotent and must return either None
486 for shutting down or a timeout for another call
487 @type signal_handlers: dict
488 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
489
490 """
491 assert isinstance(signal_handlers, dict) and \
492 len(signal_handlers) > 0, \
493 "Broken SignalHandled decorator"
494
495
496 shutdown_signals = 0
497
498
499 shutdown_waiter = None
500
501
502 while True:
503 if shutdown_signals == 1 and shutdown_wait_fn is not None:
504 if shutdown_waiter is None:
505 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
506
507
508 if shutdown_waiter.CanShutdown():
509 break
510
511
512 timeout = 1.0
513
514 elif shutdown_signals >= 1:
515
516
517 break
518
519 else:
520
521 timeout = None
522
523 if self.scheduler.empty():
524 asyncore.loop(count=1, timeout=timeout, use_poll=True)
525 else:
526 try:
527 self.scheduler.run(max_delay=timeout)
528 except SchedulerBreakout:
529 pass
530
531
532 for (sig, handler) in signal_handlers.items():
533 if handler.called:
534 self._CallSignalWaiters(sig)
535 if sig in (signal.SIGTERM, signal.SIGINT):
536 logging.info("Received signal %s asking for shutdown", sig)
537 shutdown_signals += 1
538 handler.Clear()
539
540 - def _CallSignalWaiters(self, signum):
541 """Calls all signal waiters for a certain signal.
542
543 @type signum: int
544 @param signum: Signal number
545
546 """
547 for owner in self._signal_wait:
548 owner.OnSignal(signum)
549
550 - def RegisterSignal(self, owner):
551 """Registers a receiver for signal notifications
552
553 The receiver must support a "OnSignal(self, signum)" function.
554
555 @type owner: instance
556 @param owner: Receiver
557
558 """
559 self._signal_wait.append(owner)
560
563 """Verifies the process uid matches the configured uid.
564
565 This method verifies that a daemon is started as the user it is
566 intended to be run
567
568 @param daemon_name: The name of daemon to be started
569 @return: A tuple with the first item indicating success or not,
570 the second item current uid and third with expected uid
571
572 """
573 getents = runtime.GetEnts()
574 running_uid = os.getuid()
575 daemon_uids = {
576 constants.MASTERD: getents.masterd_uid,
577 constants.RAPI: getents.rapi_uid,
578 constants.NODED: getents.noded_uid,
579 constants.CONFD: getents.confd_uid,
580 }
581 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
582
583 return (daemon_uids[daemon_name] == running_uid, running_uid,
584 daemon_uids[daemon_name])
585
588 """Try to format an error better.
589
590 Since we're dealing with daemon startup errors, in many cases this
591 will be due to socket error and such, so we try to format these cases better.
592
593 @param err: an exception object
594 @rtype: string
595 @return: the formatted error description
596
597 """
598 try:
599 if isinstance(err, socket.error):
600 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
601 elif isinstance(err, EnvironmentError):
602 if err.filename is None:
603 return "%s (errno=%s)" % (err.strerror, err.errno)
604 else:
605 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
606 err.errno)
607 else:
608 return str(err)
609 except Exception:
610 logging.exception("Error while handling existing error %s", err)
611 return "%s" % str(err)
612
615 """Handler for SIGHUP.
616
617 @param reopen_fn: List of callback functions for reopening log files
618
619 """
620 logging.info("Reopening log files after receiving SIGHUP")
621
622 for fn in reopen_fn:
623 if fn:
624 fn()
625
626
627 -def GenericMain(daemon_name, optionparser,
628 check_fn, prepare_fn, exec_fn,
629 multithreaded=False, console_logging=False,
630 default_ssl_cert=None, default_ssl_key=None,
631 warn_breach=False):
632 """Shared main function for daemons.
633
634 @type daemon_name: string
635 @param daemon_name: daemon name
636 @type optionparser: optparse.OptionParser
637 @param optionparser: initialized optionparser with daemon-specific options
638 (common -f -d options will be handled by this module)
639 @type check_fn: function which accepts (options, args)
640 @param check_fn: function that checks start conditions and exits if they're
641 not met
642 @type prepare_fn: function which accepts (options, args)
643 @param prepare_fn: function that is run before forking, or None;
644 it's result will be passed as the third parameter to exec_fn, or
645 if None was passed in, we will just pass None to exec_fn
646 @type exec_fn: function which accepts (options, args, prepare_results)
647 @param exec_fn: function that's executed with the daemon's pid file held, and
648 runs the daemon itself.
649 @type multithreaded: bool
650 @param multithreaded: Whether the daemon uses threads
651 @type console_logging: boolean
652 @param console_logging: if True, the daemon will fall back to the system
653 console if logging fails
654 @type default_ssl_cert: string
655 @param default_ssl_cert: Default SSL certificate path
656 @type default_ssl_key: string
657 @param default_ssl_key: Default SSL key path
658 @type warn_breach: bool
659 @param warn_breach: issue a warning at daemon launch time, before
660 daemonizing, about the possibility of breaking parameter privacy
661 invariants through the otherwise helpful debug logging.
662
663 """
664 optionparser.add_option("-f", "--foreground", dest="fork",
665 help="Don't detach from the current terminal",
666 default=True, action="store_false")
667 optionparser.add_option("-d", "--debug", dest="debug",
668 help="Enable some debug messages",
669 default=False, action="store_true")
670 optionparser.add_option("--syslog", dest="syslog",
671 help="Enable logging to syslog (except debug"
672 " messages); one of 'no', 'yes' or 'only' [%s]" %
673 constants.SYSLOG_USAGE,
674 default=constants.SYSLOG_USAGE,
675 choices=["no", "yes", "only"])
676
677 family = ssconf.SimpleStore().GetPrimaryIPFamily()
678
679
680
681 if daemon_name in constants.DAEMONS_PORTS:
682 default_bind_address = constants.IP4_ADDRESS_ANY
683 if family == netutils.IP6Address.family:
684 default_bind_address = constants.IP6_ADDRESS_ANY
685
686 default_port = netutils.GetDaemonPort(daemon_name)
687
688
689 optionparser.add_option("-p", "--port", dest="port",
690 help="Network port (default: %s)" % default_port,
691 default=default_port, type="int")
692 optionparser.add_option("-b", "--bind", dest="bind_address",
693 help=("Bind address (default: '%s')" %
694 default_bind_address),
695 default=default_bind_address, metavar="ADDRESS")
696 optionparser.add_option("-i", "--interface", dest="bind_interface",
697 help=("Bind interface"), metavar="INTERFACE")
698
699 if default_ssl_key is not None and default_ssl_cert is not None:
700 optionparser.add_option("--no-ssl", dest="ssl",
701 help="Do not secure HTTP protocol with SSL",
702 default=True, action="store_false")
703 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
704 help=("SSL key path (default: %s)" %
705 default_ssl_key),
706 default=default_ssl_key, type="string",
707 metavar="SSL_KEY_PATH")
708 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
709 help=("SSL certificate path (default: %s)" %
710 default_ssl_cert),
711 default=default_ssl_cert, type="string",
712 metavar="SSL_CERT_PATH")
713
714
715 if multithreaded:
716 utils.DisableFork()
717
718 options, args = optionparser.parse_args()
719
720 if getattr(options, "bind_interface", None) is not None:
721 if options.bind_address != default_bind_address:
722 msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
723 (options.bind_address, options.bind_interface))
724 print >> sys.stderr, msg
725 sys.exit(constants.EXIT_FAILURE)
726 interface_ip_addresses = \
727 netutils.GetInterfaceIpAddresses(options.bind_interface)
728 if family == netutils.IP6Address.family:
729 if_addresses = interface_ip_addresses[constants.IP6_VERSION]
730 else:
731 if_addresses = interface_ip_addresses[constants.IP4_VERSION]
732 if len(if_addresses) < 1:
733 msg = "Failed to find IP for interface %s" % options.bind_interace
734 print >> sys.stderr, msg
735 sys.exit(constants.EXIT_FAILURE)
736 options.bind_address = if_addresses[0]
737
738 if getattr(options, "ssl", False):
739 ssl_paths = {
740 "certificate": options.ssl_cert,
741 "key": options.ssl_key,
742 }
743
744 for name, path in ssl_paths.iteritems():
745 if not os.path.isfile(path):
746 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
747 sys.exit(constants.EXIT_FAILURE)
748
749
750
751
752
753 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
754 if not result:
755 msg = ("%s started using wrong user ID (%d), expected %d" %
756 (daemon_name, running_uid, expected_uid))
757 print >> sys.stderr, msg
758 sys.exit(constants.EXIT_FAILURE)
759
760 if check_fn is not None:
761 check_fn(options, args)
762
763 log_filename = constants.DAEMONS_LOGFILES[daemon_name]
764
765
766 if options.debug and warn_breach:
767 sys.stderr.write(constants.DEBUG_MODE_CONFIDENTIALITY_WARNING % daemon_name)
768
769 if options.fork:
770
771
772
773
774
775
776 noclose_fds = []
777 for fd in os.listdir("/proc/self/fd"):
778 try:
779 if os.readlink(os.path.join("/proc/self/fd", fd)) == "/dev/urandom":
780 noclose_fds.append(int(fd))
781 except EnvironmentError:
782
783
784 continue
785
786 utils.CloseFDs(noclose_fds=noclose_fds)
787 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
788 else:
789 (wpipe, stdio_reopen_fn) = (None, None)
790
791 log_reopen_fn = \
792 utils.SetupLogging(log_filename, daemon_name,
793 debug=options.debug,
794 stderr_logging=not options.fork,
795 multithreaded=multithreaded,
796 syslog=options.syslog,
797 console_logging=console_logging)
798
799
800 signal.signal(signal.SIGHUP,
801 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
802
803 try:
804 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
805 except errors.PidFileLockError, err:
806 print >> sys.stderr, "Error while locking PID file:\n%s" % err
807 sys.exit(constants.EXIT_FAILURE)
808
809 try:
810 try:
811 logging.info("%s daemon startup", daemon_name)
812 if callable(prepare_fn):
813 prep_results = prepare_fn(options, args)
814 else:
815 prep_results = None
816 except Exception, err:
817 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
818 raise
819
820 if wpipe is not None:
821
822
823 os.close(wpipe)
824
825 exec_fn(options, args, prep_results)
826 finally:
827 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
828