1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import os
29 import signal
30 import logging
31 import sched
32 import time
33 import socket
34 import select
35 import sys
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
43 from ganeti import compat
47 """Exception used to get out of the scheduler loop
48
49 """
50
53 """Asyncore-compatible scheduler delay function.
54
55 This is a delay function for sched that, rather than actually sleeping,
56 executes asyncore events happening in the meantime.
57
58 After an event has occurred, rather than returning, it raises a
59 SchedulerBreakout exception, which will force the current scheduler.run()
60 invocation to terminate, so that we can also check for signals. The main loop
61 will then call the scheduler run again, which will allow it to actually
62 process any due events.
63
64 This is needed because scheduler.run() doesn't support a count=..., as
65 asyncore loop, and the scheduler module documents throwing exceptions from
66 inside the delay function as an allowed usage model.
67
68 """
69 asyncore.loop(timeout=timeout, count=1, use_poll=True)
70 raise SchedulerBreakout()
71
74 """Event scheduler integrated with asyncore
75
76 """
78 """Initializes this class.
79
80 """
81 sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
82 self._max_delay = None
83
84 - def run(self, max_delay=None):
85 """Run any pending events.
86
87 @type max_delay: None or number
88 @param max_delay: Maximum delay (useful if caller has timeouts running)
89
90 """
91 assert self._max_delay is None
92
93
94
95 if max_delay is None:
96 self._max_delay = None
97 else:
98 self._max_delay = utils.RunningTimeout(max_delay, False)
99
100 try:
101 return sched.scheduler.run(self)
102 finally:
103 self._max_delay = None
104
106 """Custom delay function for C{sched.scheduler}.
107
108 """
109 if self._max_delay is None:
110 timeout = duration
111 else:
112 timeout = min(duration, self._max_delay.Remaining())
113
114 return AsyncoreDelayFunction(timeout)
115
118 """Base Ganeti Asyncore Dispacher
119
120 """
121
123 """Log an error in handling any request, and proceed.
124
125 """
126 logging.exception("Error while handling asyncore request")
127
128
130 """Most of the time we don't want to check for writability.
131
132 """
133 return False
134
137 """A stream server to use with asyncore.
138
139 Each request is accepted, and then dispatched to a separate asyncore
140 dispatcher to handle.
141
142 """
143
144 _REQUEST_QUEUE_SIZE = 5
145
147 """Constructor for AsyncUnixStreamSocket
148
149 @type family: integer
150 @param family: socket family (one of socket.AF_*)
151 @type address: address family dependent
152 @param address: address to bind the socket to
153
154 """
155 GanetiBaseAsyncoreDispatcher.__init__(self)
156 self.family = family
157 self.create_socket(self.family, socket.SOCK_STREAM)
158 self.set_reuse_addr()
159 self.bind(address)
160 self.listen(self._REQUEST_QUEUE_SIZE)
161
162
164 """Accept a new client connection.
165
166 Creates a new instance of the handler class, which will use asyncore to
167 serve the client.
168
169 """
170 accept_result = utils.IgnoreSignals(self.accept)
171 if accept_result is not None:
172 connected_socket, client_address = accept_result
173 if self.family == socket.AF_UNIX:
174
175
176 client_address = netutils.GetSocketCredentials(connected_socket)
177 logging.info("Accepted connection from %s",
178 netutils.FormatAddress(client_address, family=self.family))
179 self.handle_connection(connected_socket, client_address)
180
182 """Handle an already accepted connection.
183
184 """
185 raise NotImplementedError
186
189 """A terminator separated message stream asyncore module.
190
191 Handles a stream connection receiving messages terminated by a defined
192 separator. For each complete message handle_message is called.
193
194 """
195 - def __init__(self, connected_socket, peer_address, terminator, family,
196 unhandled_limit):
197 """AsyncTerminatedMessageStream constructor.
198
199 @type connected_socket: socket.socket
200 @param connected_socket: connected stream socket to receive messages from
201 @param peer_address: family-specific peer address
202 @type terminator: string
203 @param terminator: terminator separating messages in the stream
204 @type family: integer
205 @param family: socket family
206 @type unhandled_limit: integer or None
207 @param unhandled_limit: maximum unanswered messages
208
209 """
210
211
212 asynchat.async_chat.__init__(self, connected_socket)
213 self.connected_socket = connected_socket
214
215
216
217 self.family = family
218 self.peer_address = peer_address
219 self.terminator = terminator
220 self.unhandled_limit = unhandled_limit
221 self.set_terminator(terminator)
222 self.ibuffer = []
223 self.receive_count = 0
224 self.send_count = 0
225 self.oqueue = collections.deque()
226 self.iqueue = collections.deque()
227
228
230 self.ibuffer.append(data)
231
233 return (self.unhandled_limit is None or
234 (self.receive_count < self.send_count + self.unhandled_limit) and
235 not self.iqueue)
236
237
239 message = "".join(self.ibuffer)
240 self.ibuffer = []
241 message_id = self.receive_count
242
243
244 can_handle = self._can_handle_message()
245 self.receive_count += 1
246 if can_handle:
247 self.handle_message(message, message_id)
248 else:
249 self.iqueue.append((message, message_id))
250
252 """Handle a terminated message.
253
254 @type message: string
255 @param message: message to handle
256 @type message_id: integer
257 @param message_id: stream's message sequence number
258
259 """
260 pass
261
262
263
265 """Send a message to the remote peer. This function is thread-safe.
266
267 @type message: string
268 @param message: message to send, without the terminator
269
270 @warning: If calling this function from a thread different than the one
271 performing the main asyncore loop, remember that you have to wake that one
272 up.
273
274 """
275
276
277
278
279
280 self.oqueue.append(message)
281
282
286
287
289
290
291
292 return asynchat.async_chat.writable(self) or self.oqueue
293
294
296 if self.oqueue:
297
298
299
300 data = self.oqueue.popleft()
301 self.push(data + self.terminator)
302 self.send_count += 1
303 if self.iqueue:
304 self.handle_message(*self.iqueue.popleft())
305 self.initiate_send()
306
311
312
315
316
318 """Log an error in handling any request, and proceed.
319
320 """
321 logging.exception("Error while handling asyncore request")
322 self.close_log()
323
326 """An improved asyncore udp socket.
327
328 """
337
338
344
345
347 recv_result = utils.IgnoreSignals(self.recvfrom,
348 constants.MAX_UDP_DATA_SIZE)
349 if recv_result is not None:
350 payload, address = recv_result
351 if self._family == socket.AF_INET6:
352
353 ip, port, _, _ = address
354 else:
355 ip, port = address
356
357 self.handle_datagram(payload, ip, port)
358
360 """Handle an already read udp datagram
361
362 """
363 raise NotImplementedError
364
365
367
368
369 return bool(self._out_queue)
370
371
373 if not self._out_queue:
374 logging.error("handle_write called with empty output queue")
375 return
376 (ip, port, payload) = self._out_queue[0]
377 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
378 self._out_queue.pop(0)
379
388
390 """Process the next datagram, waiting for it if necessary.
391
392 @type timeout: float
393 @param timeout: how long to wait for data
394 @rtype: boolean
395 @return: True if some data has been handled, False otherwise
396
397 """
398 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
399 if result is not None and result & select.POLLIN:
400 self.handle_read()
401 return True
402 else:
403 return False
404
407 """A way to notify the asyncore loop that something is going on.
408
409 If an asyncore daemon is multithreaded when a thread tries to push some data
410 to a socket, the main loop handling asynchronous requests might be sleeping
411 waiting on a select(). To avoid this it can create an instance of the
412 AsyncAwaker, which other threads can use to wake it up.
413
414 """
416 """Constructor for AsyncAwaker
417
418 @type signal_fn: function
419 @param signal_fn: function to call when awaken
420
421 """
422 GanetiBaseAsyncoreDispatcher.__init__(self)
423 assert signal_fn is None or callable(signal_fn)
424 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
425 socket.SOCK_STREAM)
426 self.in_socket.setblocking(0)
427 self.in_socket.shutdown(socket.SHUT_WR)
428 self.out_socket.shutdown(socket.SHUT_RD)
429 self.set_socket(self.in_socket)
430 self.need_signal = True
431 self.signal_fn = signal_fn
432 self.connected = True
433
434
436 utils.IgnoreSignals(self.recv, 4096)
437 if self.signal_fn:
438 self.signal_fn()
439 self.need_signal = True
440
441
443 asyncore.dispatcher.close(self)
444 self.out_socket.close()
445
447 """Signal the asyncore main loop.
448
449 Any data we send here will be ignored, but it will cause the select() call
450 to return.
451
452 """
453
454
455 if self.need_signal:
456 self.need_signal = False
457 self.out_socket.send("\0")
458
461 """Logic for L{Mainloop} shutdown.
462
463 """
465 """Initializes this class.
466
467 @type fn: callable
468 @param fn: Function returning C{None} if mainloop can be stopped or a
469 duration in seconds after which the function should be called again
470 @see: L{Mainloop.Run}
471
472 """
473 assert callable(fn)
474
475 self._fn = fn
476 self._defer = None
477
479 """Checks whether mainloop can be stopped.
480
481 @rtype: bool
482
483 """
484 if self._defer and self._defer.Remaining() > 0:
485
486 return False
487
488
489 timeout = self._fn()
490
491 if timeout is None:
492
493 return True
494
495
496 self._defer = utils.RunningTimeout(timeout, True)
497
498 return False
499
500
501 -class Mainloop(object):
502 """Generic mainloop for daemons
503
504 @ivar scheduler: A sched.scheduler object, which can be used to register
505 timed events
506
507 """
508 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
509
510 - def __init__(self):
511 """Constructs a new Mainloop instance.
512
513 """
514 self._signal_wait = []
515 self.scheduler = AsyncoreScheduler(time.time)
516
517
518 runtime.GetEnts()
519
520 @utils.SignalHandled([signal.SIGCHLD])
521 @utils.SignalHandled([signal.SIGTERM])
522 @utils.SignalHandled([signal.SIGINT])
523 - def Run(self, shutdown_wait_fn=None, signal_handlers=None):
524 """Runs the mainloop.
525
526 @type shutdown_wait_fn: callable
527 @param shutdown_wait_fn: Function to check whether loop can be terminated;
528 B{important}: function must be idempotent and must return either None
529 for shutting down or a timeout for another call
530 @type signal_handlers: dict
531 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
532
533 """
534 assert isinstance(signal_handlers, dict) and \
535 len(signal_handlers) > 0, \
536 "Broken SignalHandled decorator"
537
538
539 shutdown_signals = 0
540
541
542 shutdown_waiter = None
543
544
545 while True:
546 if shutdown_signals == 1 and shutdown_wait_fn is not None:
547 if shutdown_waiter is None:
548 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
549
550
551 if shutdown_waiter.CanShutdown():
552 break
553
554
555 timeout = 1.0
556
557 elif shutdown_signals >= 1:
558
559
560 break
561
562 else:
563
564 timeout = None
565
566 if self.scheduler.empty():
567 asyncore.loop(count=1, timeout=timeout, use_poll=True)
568 else:
569 try:
570 self.scheduler.run(max_delay=timeout)
571 except SchedulerBreakout:
572 pass
573
574
575 for (sig, handler) in signal_handlers.items():
576 if handler.called:
577 self._CallSignalWaiters(sig)
578 if sig in (signal.SIGTERM, signal.SIGINT):
579 logging.info("Received signal %s asking for shutdown", sig)
580 shutdown_signals += 1
581 handler.Clear()
582
583 - def _CallSignalWaiters(self, signum):
584 """Calls all signal waiters for a certain signal.
585
586 @type signum: int
587 @param signum: Signal number
588
589 """
590 for owner in self._signal_wait:
591 owner.OnSignal(signum)
592
593 - def RegisterSignal(self, owner):
594 """Registers a receiver for signal notifications
595
596 The receiver must support a "OnSignal(self, signum)" function.
597
598 @type owner: instance
599 @param owner: Receiver
600
601 """
602 self._signal_wait.append(owner)
603
606 """Verifies the process uid matches the configured uid.
607
608 This method verifies that a daemon is started as the user it is
609 intended to be run
610
611 @param daemon_name: The name of daemon to be started
612 @return: A tuple with the first item indicating success or not,
613 the second item current uid and third with expected uid
614
615 """
616 getents = runtime.GetEnts()
617 running_uid = os.getuid()
618 daemon_uids = {
619 constants.MASTERD: getents.masterd_uid,
620 constants.RAPI: getents.rapi_uid,
621 constants.NODED: getents.noded_uid,
622 constants.CONFD: getents.confd_uid,
623 }
624 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
625
626 return (daemon_uids[daemon_name] == running_uid, running_uid,
627 daemon_uids[daemon_name])
628
631 """Try to format an error better.
632
633 Since we're dealing with daemon startup errors, in many cases this
634 will be due to socket error and such, so we try to format these cases better.
635
636 @param err: an exception object
637 @rtype: string
638 @return: the formatted error description
639
640 """
641 try:
642 if isinstance(err, socket.error):
643 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
644 elif isinstance(err, EnvironmentError):
645 if err.filename is None:
646 return "%s (errno=%s)" % (err.strerror, err.errno)
647 else:
648 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
649 err.errno)
650 else:
651 return str(err)
652 except Exception:
653 logging.exception("Error while handling existing error %s", err)
654 return "%s" % str(err)
655
658 """Handler for SIGHUP.
659
660 @param reopen_fn: List of callback functions for reopening log files
661
662 """
663 logging.info("Reopening log files after receiving SIGHUP")
664
665 for fn in reopen_fn:
666 if fn:
667 fn()
668
669
670 -def GenericMain(daemon_name, optionparser,
671 check_fn, prepare_fn, exec_fn,
672 multithreaded=False, console_logging=False,
673 default_ssl_cert=None, default_ssl_key=None):
674 """Shared main function for daemons.
675
676 @type daemon_name: string
677 @param daemon_name: daemon name
678 @type optionparser: optparse.OptionParser
679 @param optionparser: initialized optionparser with daemon-specific options
680 (common -f -d options will be handled by this module)
681 @type check_fn: function which accepts (options, args)
682 @param check_fn: function that checks start conditions and exits if they're
683 not met
684 @type prepare_fn: function which accepts (options, args)
685 @param prepare_fn: function that is run before forking, or None;
686 it's result will be passed as the third parameter to exec_fn, or
687 if None was passed in, we will just pass None to exec_fn
688 @type exec_fn: function which accepts (options, args, prepare_results)
689 @param exec_fn: function that's executed with the daemon's pid file held, and
690 runs the daemon itself.
691 @type multithreaded: bool
692 @param multithreaded: Whether the daemon uses threads
693 @type console_logging: boolean
694 @param console_logging: if True, the daemon will fall back to the system
695 console if logging fails
696 @type default_ssl_cert: string
697 @param default_ssl_cert: Default SSL certificate path
698 @type default_ssl_key: string
699 @param default_ssl_key: Default SSL key path
700
701 """
702 optionparser.add_option("-f", "--foreground", dest="fork",
703 help="Don't detach from the current terminal",
704 default=True, action="store_false")
705 optionparser.add_option("-d", "--debug", dest="debug",
706 help="Enable some debug messages",
707 default=False, action="store_true")
708 optionparser.add_option("--syslog", dest="syslog",
709 help="Enable logging to syslog (except debug"
710 " messages); one of 'no', 'yes' or 'only' [%s]" %
711 constants.SYSLOG_USAGE,
712 default=constants.SYSLOG_USAGE,
713 choices=["no", "yes", "only"])
714
715 if daemon_name in constants.DAEMONS_PORTS:
716 default_bind_address = constants.IP4_ADDRESS_ANY
717 family = ssconf.SimpleStore().GetPrimaryIPFamily()
718
719
720
721 if family == netutils.IP6Address.family:
722 default_bind_address = constants.IP6_ADDRESS_ANY
723
724 default_port = netutils.GetDaemonPort(daemon_name)
725
726
727 optionparser.add_option("-p", "--port", dest="port",
728 help="Network port (default: %s)" % default_port,
729 default=default_port, type="int")
730 optionparser.add_option("-b", "--bind", dest="bind_address",
731 help=("Bind address (default: '%s')" %
732 default_bind_address),
733 default=default_bind_address, metavar="ADDRESS")
734
735 if default_ssl_key is not None and default_ssl_cert is not None:
736 optionparser.add_option("--no-ssl", dest="ssl",
737 help="Do not secure HTTP protocol with SSL",
738 default=True, action="store_false")
739 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
740 help=("SSL key path (default: %s)" %
741 default_ssl_key),
742 default=default_ssl_key, type="string",
743 metavar="SSL_KEY_PATH")
744 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
745 help=("SSL certificate path (default: %s)" %
746 default_ssl_cert),
747 default=default_ssl_cert, type="string",
748 metavar="SSL_CERT_PATH")
749
750
751 if multithreaded:
752 utils.DisableFork()
753
754 options, args = optionparser.parse_args()
755
756 if getattr(options, "ssl", False):
757 ssl_paths = {
758 "certificate": options.ssl_cert,
759 "key": options.ssl_key,
760 }
761
762 for name, path in ssl_paths.iteritems():
763 if not os.path.isfile(path):
764 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
765 sys.exit(constants.EXIT_FAILURE)
766
767
768
769
770
771 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
772 if not result:
773 msg = ("%s started using wrong user ID (%d), expected %d" %
774 (daemon_name, running_uid, expected_uid))
775 print >> sys.stderr, msg
776 sys.exit(constants.EXIT_FAILURE)
777
778 if check_fn is not None:
779 check_fn(options, args)
780
781 log_filename = constants.DAEMONS_LOGFILES[daemon_name]
782
783 if options.fork:
784 utils.CloseFDs()
785 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
786 else:
787 (wpipe, stdio_reopen_fn) = (None, None)
788
789 log_reopen_fn = \
790 utils.SetupLogging(log_filename, daemon_name,
791 debug=options.debug,
792 stderr_logging=not options.fork,
793 multithreaded=multithreaded,
794 syslog=options.syslog,
795 console_logging=console_logging)
796
797
798 signal.signal(signal.SIGHUP,
799 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
800
801 try:
802 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
803 except errors.PidFileLockError, err:
804 print >> sys.stderr, "Error while locking PID file:\n%s" % err
805 sys.exit(constants.EXIT_FAILURE)
806
807 try:
808 try:
809 logging.info("%s daemon startup", daemon_name)
810 if callable(prepare_fn):
811 prep_results = prepare_fn(options, args)
812 else:
813 prep_results = None
814 except Exception, err:
815 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
816 raise
817
818 if wpipe is not None:
819
820
821 os.close(wpipe)
822
823 exec_fn(options, args, prep_results)
824 finally:
825 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
826