1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module with helper classes and functions for daemons"""
32
33
34 import asyncore
35 import asynchat
36 import collections
37 import os
38 import signal
39 import logging
40 import sched
41 import time
42 import socket
43 import select
44 import sys
45
46 from ganeti import utils
47 from ganeti import constants
48 from ganeti import errors
49 from ganeti import netutils
50 from ganeti import ssconf
51 from ganeti import runtime
52 from ganeti import compat
56 """Exception used to get out of the scheduler loop
57
58 """
59
62 """Asyncore-compatible scheduler delay function.
63
64 This is a delay function for sched that, rather than actually sleeping,
65 executes asyncore events happening in the meantime.
66
67 After an event has occurred, rather than returning, it raises a
68 SchedulerBreakout exception, which will force the current scheduler.run()
69 invocation to terminate, so that we can also check for signals. The main loop
70 will then call the scheduler run again, which will allow it to actually
71 process any due events.
72
73 This is needed because scheduler.run() doesn't support a count=..., as
74 asyncore loop, and the scheduler module documents throwing exceptions from
75 inside the delay function as an allowed usage model.
76
77 """
78 asyncore.loop(timeout=timeout, count=1, use_poll=True)
79 raise SchedulerBreakout()
80
83 """Event scheduler integrated with asyncore
84
85 """
87 """Initializes this class.
88
89 """
90 sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
91 self._max_delay = None
92
93 - def run(self, max_delay=None):
94 """Run any pending events.
95
96 @type max_delay: None or number
97 @param max_delay: Maximum delay (useful if caller has timeouts running)
98
99 """
100 assert self._max_delay is None
101
102
103
104 if max_delay is None:
105 self._max_delay = None
106 else:
107 self._max_delay = utils.RunningTimeout(max_delay, False)
108
109 try:
110 return sched.scheduler.run(self)
111 finally:
112 self._max_delay = None
113
115 """Custom delay function for C{sched.scheduler}.
116
117 """
118 if self._max_delay is None:
119 timeout = duration
120 else:
121 timeout = min(duration, self._max_delay.Remaining())
122
123 return AsyncoreDelayFunction(timeout)
124
127 """Base Ganeti Asyncore Dispacher
128
129 """
130
132 """Log an error in handling any request, and proceed.
133
134 """
135 logging.exception("Error while handling asyncore request")
136
137
139 """Most of the time we don't want to check for writability.
140
141 """
142 return False
143
146 """A stream server to use with asyncore.
147
148 Each request is accepted, and then dispatched to a separate asyncore
149 dispatcher to handle.
150
151 """
152
153 _REQUEST_QUEUE_SIZE = 5
154
156 """Constructor for AsyncUnixStreamSocket
157
158 @type family: integer
159 @param family: socket family (one of socket.AF_*)
160 @type address: address family dependent
161 @param address: address to bind the socket to
162
163 """
164 GanetiBaseAsyncoreDispatcher.__init__(self)
165 self.family = family
166 self.create_socket(self.family, socket.SOCK_STREAM)
167 self.set_reuse_addr()
168 self.bind(address)
169 self.listen(self._REQUEST_QUEUE_SIZE)
170
171
173 """Accept a new client connection.
174
175 Creates a new instance of the handler class, which will use asyncore to
176 serve the client.
177
178 """
179 accept_result = utils.IgnoreSignals(self.accept)
180 if accept_result is not None:
181 connected_socket, client_address = accept_result
182 if self.family == socket.AF_UNIX:
183
184
185 client_address = netutils.GetSocketCredentials(connected_socket)
186 logging.info("Accepted connection from %s",
187 netutils.FormatAddress(client_address, family=self.family))
188 self.handle_connection(connected_socket, client_address)
189
191 """Handle an already accepted connection.
192
193 """
194 raise NotImplementedError
195
198 """A terminator separated message stream asyncore module.
199
200 Handles a stream connection receiving messages terminated by a defined
201 separator. For each complete message handle_message is called.
202
203 """
204 - def __init__(self, connected_socket, peer_address, terminator, family,
205 unhandled_limit):
206 """AsyncTerminatedMessageStream constructor.
207
208 @type connected_socket: socket.socket
209 @param connected_socket: connected stream socket to receive messages from
210 @param peer_address: family-specific peer address
211 @type terminator: string
212 @param terminator: terminator separating messages in the stream
213 @type family: integer
214 @param family: socket family
215 @type unhandled_limit: integer or None
216 @param unhandled_limit: maximum unanswered messages
217
218 """
219
220
221 asynchat.async_chat.__init__(self, connected_socket)
222 self.connected_socket = connected_socket
223
224
225
226 self.family = family
227 self.peer_address = peer_address
228 self.terminator = terminator
229 self.unhandled_limit = unhandled_limit
230 self.set_terminator(terminator)
231 self.ibuffer = []
232 self.receive_count = 0
233 self.send_count = 0
234 self.oqueue = collections.deque()
235 self.iqueue = collections.deque()
236
237
239 self.ibuffer.append(data)
240
242 return (self.unhandled_limit is None or
243 (self.receive_count < self.send_count + self.unhandled_limit) and
244 not self.iqueue)
245
246
248 message = "".join(self.ibuffer)
249 self.ibuffer = []
250 message_id = self.receive_count
251
252
253 can_handle = self._can_handle_message()
254 self.receive_count += 1
255 if can_handle:
256 self.handle_message(message, message_id)
257 else:
258 self.iqueue.append((message, message_id))
259
261 """Handle a terminated message.
262
263 @type message: string
264 @param message: message to handle
265 @type message_id: integer
266 @param message_id: stream's message sequence number
267
268 """
269 pass
270
271
272
274 """Send a message to the remote peer. This function is thread-safe.
275
276 @type message: string
277 @param message: message to send, without the terminator
278
279 @warning: If calling this function from a thread different than the one
280 performing the main asyncore loop, remember that you have to wake that one
281 up.
282
283 """
284
285
286
287
288
289 self.oqueue.append(message)
290
291
295
296
298
299
300
301 return asynchat.async_chat.writable(self) or self.oqueue
302
303
305 if self.oqueue:
306
307
308
309 data = self.oqueue.popleft()
310 self.push(data + self.terminator)
311 self.send_count += 1
312 if self.iqueue:
313 self.handle_message(*self.iqueue.popleft())
314 self.initiate_send()
315
320
321
324
325
327 """Log an error in handling any request, and proceed.
328
329 """
330 logging.exception("Error while handling asyncore request")
331 self.close_log()
332
335 """An improved asyncore udp socket.
336
337 """
346
347
353
354
356 recv_result = utils.IgnoreSignals(self.recvfrom,
357 constants.MAX_UDP_DATA_SIZE)
358 if recv_result is not None:
359 payload, address = recv_result
360 if self._family == socket.AF_INET6:
361
362 ip, port, _, _ = address
363 else:
364 ip, port = address
365
366 self.handle_datagram(payload, ip, port)
367
369 """Handle an already read udp datagram
370
371 """
372 raise NotImplementedError
373
374
376
377
378 return bool(self._out_queue)
379
380
382 if not self._out_queue:
383 logging.error("handle_write called with empty output queue")
384 return
385 (ip, port, payload) = self._out_queue[0]
386 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
387 self._out_queue.pop(0)
388
397
399 """Process the next datagram, waiting for it if necessary.
400
401 @type timeout: float
402 @param timeout: how long to wait for data
403 @rtype: boolean
404 @return: True if some data has been handled, False otherwise
405
406 """
407 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
408 if result is not None and result & select.POLLIN:
409 self.handle_read()
410 return True
411 else:
412 return False
413
416 """A way to notify the asyncore loop that something is going on.
417
418 If an asyncore daemon is multithreaded when a thread tries to push some data
419 to a socket, the main loop handling asynchronous requests might be sleeping
420 waiting on a select(). To avoid this it can create an instance of the
421 AsyncAwaker, which other threads can use to wake it up.
422
423 """
425 """Constructor for AsyncAwaker
426
427 @type signal_fn: function
428 @param signal_fn: function to call when awaken
429
430 """
431 GanetiBaseAsyncoreDispatcher.__init__(self)
432 assert signal_fn is None or callable(signal_fn)
433 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
434 socket.SOCK_STREAM)
435 self.in_socket.setblocking(0)
436 self.in_socket.shutdown(socket.SHUT_WR)
437 self.out_socket.shutdown(socket.SHUT_RD)
438 self.set_socket(self.in_socket)
439 self.need_signal = True
440 self.signal_fn = signal_fn
441 self.connected = True
442
443
445 utils.IgnoreSignals(self.recv, 4096)
446 if self.signal_fn:
447 self.signal_fn()
448 self.need_signal = True
449
450
452 asyncore.dispatcher.close(self)
453 self.out_socket.close()
454
456 """Signal the asyncore main loop.
457
458 Any data we send here will be ignored, but it will cause the select() call
459 to return.
460
461 """
462
463
464 if self.need_signal:
465 self.need_signal = False
466 self.out_socket.send(chr(0))
467
470 """Logic for L{Mainloop} shutdown.
471
472 """
474 """Initializes this class.
475
476 @type fn: callable
477 @param fn: Function returning C{None} if mainloop can be stopped or a
478 duration in seconds after which the function should be called again
479 @see: L{Mainloop.Run}
480
481 """
482 assert callable(fn)
483
484 self._fn = fn
485 self._defer = None
486
488 """Checks whether mainloop can be stopped.
489
490 @rtype: bool
491
492 """
493 if self._defer and self._defer.Remaining() > 0:
494
495 return False
496
497
498 timeout = self._fn()
499
500 if timeout is None:
501
502 return True
503
504
505 self._defer = utils.RunningTimeout(timeout, True)
506
507 return False
508
509
510 -class Mainloop(object):
511 """Generic mainloop for daemons
512
513 @ivar scheduler: A sched.scheduler object, which can be used to register
514 timed events
515
516 """
517 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
518
519 - def __init__(self):
520 """Constructs a new Mainloop instance.
521
522 """
523 self._signal_wait = []
524 self.scheduler = AsyncoreScheduler(time.time)
525
526
527 runtime.GetEnts()
528
529 @utils.SignalHandled([signal.SIGCHLD])
530 @utils.SignalHandled([signal.SIGTERM])
531 @utils.SignalHandled([signal.SIGINT])
532 - def Run(self, shutdown_wait_fn=None, signal_handlers=None):
533 """Runs the mainloop.
534
535 @type shutdown_wait_fn: callable
536 @param shutdown_wait_fn: Function to check whether loop can be terminated;
537 B{important}: function must be idempotent and must return either None
538 for shutting down or a timeout for another call
539 @type signal_handlers: dict
540 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
541
542 """
543 assert isinstance(signal_handlers, dict) and \
544 len(signal_handlers) > 0, \
545 "Broken SignalHandled decorator"
546
547
548 shutdown_signals = 0
549
550
551 shutdown_waiter = None
552
553
554 while True:
555 if shutdown_signals == 1 and shutdown_wait_fn is not None:
556 if shutdown_waiter is None:
557 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
558
559
560 if shutdown_waiter.CanShutdown():
561 break
562
563
564 timeout = 1.0
565
566 elif shutdown_signals >= 1:
567
568
569 break
570
571 else:
572
573 timeout = None
574
575 if self.scheduler.empty():
576 asyncore.loop(count=1, timeout=timeout, use_poll=True)
577 else:
578 try:
579 self.scheduler.run(max_delay=timeout)
580 except SchedulerBreakout:
581 pass
582
583
584 for (sig, handler) in signal_handlers.items():
585 if handler.called:
586 self._CallSignalWaiters(sig)
587 if sig in (signal.SIGTERM, signal.SIGINT):
588 logging.info("Received signal %s asking for shutdown", sig)
589 shutdown_signals += 1
590 handler.Clear()
591
592 - def _CallSignalWaiters(self, signum):
593 """Calls all signal waiters for a certain signal.
594
595 @type signum: int
596 @param signum: Signal number
597
598 """
599 for owner in self._signal_wait:
600 owner.OnSignal(signum)
601
602 - def RegisterSignal(self, owner):
603 """Registers a receiver for signal notifications
604
605 The receiver must support a "OnSignal(self, signum)" function.
606
607 @type owner: instance
608 @param owner: Receiver
609
610 """
611 self._signal_wait.append(owner)
612
615 """Verifies the process uid matches the configured uid.
616
617 This method verifies that a daemon is started as the user it is
618 intended to be run
619
620 @param daemon_name: The name of daemon to be started
621 @return: A tuple with the first item indicating success or not,
622 the second item current uid and third with expected uid
623
624 """
625 getents = runtime.GetEnts()
626 running_uid = os.getuid()
627 daemon_uids = {
628 constants.MASTERD: getents.masterd_uid,
629 constants.RAPI: getents.rapi_uid,
630 constants.NODED: getents.noded_uid,
631 constants.CONFD: getents.confd_uid,
632 }
633 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
634
635 return (daemon_uids[daemon_name] == running_uid, running_uid,
636 daemon_uids[daemon_name])
637
640 """Try to format an error better.
641
642 Since we're dealing with daemon startup errors, in many cases this
643 will be due to socket error and such, so we try to format these cases better.
644
645 @param err: an exception object
646 @rtype: string
647 @return: the formatted error description
648
649 """
650 try:
651 if isinstance(err, socket.error):
652 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
653 elif isinstance(err, EnvironmentError):
654 if err.filename is None:
655 return "%s (errno=%s)" % (err.strerror, err.errno)
656 else:
657 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
658 err.errno)
659 else:
660 return str(err)
661 except Exception:
662 logging.exception("Error while handling existing error %s", err)
663 return "%s" % str(err)
664
667 """Handler for SIGHUP.
668
669 @param reopen_fn: List of callback functions for reopening log files
670
671 """
672 logging.info("Reopening log files after receiving SIGHUP")
673
674 for fn in reopen_fn:
675 if fn:
676 fn()
677
678
679 -def GenericMain(daemon_name, optionparser,
680 check_fn, prepare_fn, exec_fn,
681 multithreaded=False, console_logging=False,
682 default_ssl_cert=None, default_ssl_key=None):
683 """Shared main function for daemons.
684
685 @type daemon_name: string
686 @param daemon_name: daemon name
687 @type optionparser: optparse.OptionParser
688 @param optionparser: initialized optionparser with daemon-specific options
689 (common -f -d options will be handled by this module)
690 @type check_fn: function which accepts (options, args)
691 @param check_fn: function that checks start conditions and exits if they're
692 not met
693 @type prepare_fn: function which accepts (options, args)
694 @param prepare_fn: function that is run before forking, or None;
695 it's result will be passed as the third parameter to exec_fn, or
696 if None was passed in, we will just pass None to exec_fn
697 @type exec_fn: function which accepts (options, args, prepare_results)
698 @param exec_fn: function that's executed with the daemon's pid file held, and
699 runs the daemon itself.
700 @type multithreaded: bool
701 @param multithreaded: Whether the daemon uses threads
702 @type console_logging: boolean
703 @param console_logging: if True, the daemon will fall back to the system
704 console if logging fails
705 @type default_ssl_cert: string
706 @param default_ssl_cert: Default SSL certificate path
707 @type default_ssl_key: string
708 @param default_ssl_key: Default SSL key path
709
710 """
711 optionparser.add_option("-f", "--foreground", dest="fork",
712 help="Don't detach from the current terminal",
713 default=True, action="store_false")
714 optionparser.add_option("-d", "--debug", dest="debug",
715 help="Enable some debug messages",
716 default=False, action="store_true")
717 optionparser.add_option("--syslog", dest="syslog",
718 help="Enable logging to syslog (except debug"
719 " messages); one of 'no', 'yes' or 'only' [%s]" %
720 constants.SYSLOG_USAGE,
721 default=constants.SYSLOG_USAGE,
722 choices=["no", "yes", "only"])
723
724 family = ssconf.SimpleStore().GetPrimaryIPFamily()
725
726
727
728 if daemon_name in constants.DAEMONS_PORTS:
729 default_bind_address = constants.IP4_ADDRESS_ANY
730 if family == netutils.IP6Address.family:
731 default_bind_address = constants.IP6_ADDRESS_ANY
732
733 default_port = netutils.GetDaemonPort(daemon_name)
734
735
736 optionparser.add_option("-p", "--port", dest="port",
737 help="Network port (default: %s)" % default_port,
738 default=default_port, type="int")
739 optionparser.add_option("-b", "--bind", dest="bind_address",
740 help=("Bind address (default: '%s')" %
741 default_bind_address),
742 default=default_bind_address, metavar="ADDRESS")
743 optionparser.add_option("-i", "--interface", dest="bind_interface",
744 help=("Bind interface"), metavar="INTERFACE")
745
746 if default_ssl_key is not None and default_ssl_cert is not None:
747 optionparser.add_option("--no-ssl", dest="ssl",
748 help="Do not secure HTTP protocol with SSL",
749 default=True, action="store_false")
750 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
751 help=("SSL key path (default: %s)" %
752 default_ssl_key),
753 default=default_ssl_key, type="string",
754 metavar="SSL_KEY_PATH")
755 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
756 help=("SSL certificate path (default: %s)" %
757 default_ssl_cert),
758 default=default_ssl_cert, type="string",
759 metavar="SSL_CERT_PATH")
760
761
762 if multithreaded:
763 utils.DisableFork()
764
765 options, args = optionparser.parse_args()
766
767 if getattr(options, "bind_interface", None) is not None:
768 if options.bind_address != default_bind_address:
769 msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
770 (options.bind_address, options.bind_interface))
771 print >> sys.stderr, msg
772 sys.exit(constants.EXIT_FAILURE)
773 interface_ip_addresses = \
774 netutils.GetInterfaceIpAddresses(options.bind_interface)
775 if family == netutils.IP6Address.family:
776 if_addresses = interface_ip_addresses[constants.IP6_VERSION]
777 else:
778 if_addresses = interface_ip_addresses[constants.IP4_VERSION]
779 if len(if_addresses) < 1:
780 msg = "Failed to find IP for interface %s" % options.bind_interace
781 print >> sys.stderr, msg
782 sys.exit(constants.EXIT_FAILURE)
783 options.bind_address = if_addresses[0]
784
785 if getattr(options, "ssl", False):
786 ssl_paths = {
787 "certificate": options.ssl_cert,
788 "key": options.ssl_key,
789 }
790
791 for name, path in ssl_paths.iteritems():
792 if not os.path.isfile(path):
793 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
794 sys.exit(constants.EXIT_FAILURE)
795
796
797
798
799
800 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
801 if not result:
802 msg = ("%s started using wrong user ID (%d), expected %d" %
803 (daemon_name, running_uid, expected_uid))
804 print >> sys.stderr, msg
805 sys.exit(constants.EXIT_FAILURE)
806
807 if check_fn is not None:
808 check_fn(options, args)
809
810 log_filename = constants.DAEMONS_LOGFILES[daemon_name]
811
812 if options.fork:
813
814
815
816
817
818
819 noclose_fds = []
820 for fd in os.listdir("/proc/self/fd"):
821 try:
822 if os.readlink(os.path.join("/proc/self/fd", fd)) == "/dev/urandom":
823 noclose_fds.append(int(fd))
824 except EnvironmentError:
825
826
827 continue
828
829 utils.CloseFDs(noclose_fds=noclose_fds)
830 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
831 else:
832 (wpipe, stdio_reopen_fn) = (None, None)
833
834 log_reopen_fn = \
835 utils.SetupLogging(log_filename, daemon_name,
836 debug=options.debug,
837 stderr_logging=not options.fork,
838 multithreaded=multithreaded,
839 syslog=options.syslog,
840 console_logging=console_logging)
841
842
843 signal.signal(signal.SIGHUP,
844 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
845
846 try:
847 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
848 except errors.PidFileLockError, err:
849 print >> sys.stderr, "Error while locking PID file:\n%s" % err
850 sys.exit(constants.EXIT_FAILURE)
851
852 try:
853 try:
854 logging.info("%s daemon startup", daemon_name)
855 if callable(prepare_fn):
856 prep_results = prepare_fn(options, args)
857 else:
858 prep_results = None
859 except Exception, err:
860 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
861 raise
862
863 if wpipe is not None:
864
865
866 os.close(wpipe)
867
868 exec_fn(options, args, prep_results)
869 finally:
870 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
871