1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import os
29 import signal
30 import logging
31 import sched
32 import time
33 import socket
34 import select
35 import sys
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
43 from ganeti import compat
47 """Exception used to get out of the scheduler loop
48
49 """
50
53 """Asyncore-compatible scheduler delay function.
54
55 This is a delay function for sched that, rather than actually sleeping,
56 executes asyncore events happening in the meantime.
57
58 After an event has occurred, rather than returning, it raises a
59 SchedulerBreakout exception, which will force the current scheduler.run()
60 invocation to terminate, so that we can also check for signals. The main loop
61 will then call the scheduler run again, which will allow it to actually
62 process any due events.
63
64 This is needed because scheduler.run() doesn't support a count=..., as
65 asyncore loop, and the scheduler module documents throwing exceptions from
66 inside the delay function as an allowed usage model.
67
68 """
69 asyncore.loop(timeout=timeout, count=1, use_poll=True)
70 raise SchedulerBreakout()
71
74 """Event scheduler integrated with asyncore
75
76 """
79
82 """Base Ganeti Asyncore Dispacher
83
84 """
85
87 """Log an error in handling any request, and proceed.
88
89 """
90 logging.exception("Error while handling asyncore request")
91
92
94 """Most of the time we don't want to check for writability.
95
96 """
97 return False
98
101 """A stream server to use with asyncore.
102
103 Each request is accepted, and then dispatched to a separate asyncore
104 dispatcher to handle.
105
106 """
107
108 _REQUEST_QUEUE_SIZE = 5
109
111 """Constructor for AsyncUnixStreamSocket
112
113 @type family: integer
114 @param family: socket family (one of socket.AF_*)
115 @type address: address family dependent
116 @param address: address to bind the socket to
117
118 """
119 GanetiBaseAsyncoreDispatcher.__init__(self)
120 self.family = family
121 self.create_socket(self.family, socket.SOCK_STREAM)
122 self.set_reuse_addr()
123 self.bind(address)
124 self.listen(self._REQUEST_QUEUE_SIZE)
125
126
128 """Accept a new client connection.
129
130 Creates a new instance of the handler class, which will use asyncore to
131 serve the client.
132
133 """
134 accept_result = utils.IgnoreSignals(self.accept)
135 if accept_result is not None:
136 connected_socket, client_address = accept_result
137 if self.family == socket.AF_UNIX:
138
139
140 client_address = netutils.GetSocketCredentials(connected_socket)
141 logging.info("Accepted connection from %s",
142 netutils.FormatAddress(client_address, family=self.family))
143 self.handle_connection(connected_socket, client_address)
144
146 """Handle an already accepted connection.
147
148 """
149 raise NotImplementedError
150
153 """A terminator separated message stream asyncore module.
154
155 Handles a stream connection receiving messages terminated by a defined
156 separator. For each complete message handle_message is called.
157
158 """
159 - def __init__(self, connected_socket, peer_address, terminator, family,
160 unhandled_limit):
161 """AsyncTerminatedMessageStream constructor.
162
163 @type connected_socket: socket.socket
164 @param connected_socket: connected stream socket to receive messages from
165 @param peer_address: family-specific peer address
166 @type terminator: string
167 @param terminator: terminator separating messages in the stream
168 @type family: integer
169 @param family: socket family
170 @type unhandled_limit: integer or None
171 @param unhandled_limit: maximum unanswered messages
172
173 """
174
175
176 asynchat.async_chat.__init__(self, connected_socket)
177 self.connected_socket = connected_socket
178
179
180
181 self.family = family
182 self.peer_address = peer_address
183 self.terminator = terminator
184 self.unhandled_limit = unhandled_limit
185 self.set_terminator(terminator)
186 self.ibuffer = []
187 self.receive_count = 0
188 self.send_count = 0
189 self.oqueue = collections.deque()
190 self.iqueue = collections.deque()
191
192
194 self.ibuffer.append(data)
195
197 return (self.unhandled_limit is None or
198 (self.receive_count < self.send_count + self.unhandled_limit) and
199 not self.iqueue)
200
201
203 message = "".join(self.ibuffer)
204 self.ibuffer = []
205 message_id = self.receive_count
206
207
208 can_handle = self._can_handle_message()
209 self.receive_count += 1
210 if can_handle:
211 self.handle_message(message, message_id)
212 else:
213 self.iqueue.append((message, message_id))
214
216 """Handle a terminated message.
217
218 @type message: string
219 @param message: message to handle
220 @type message_id: integer
221 @param message_id: stream's message sequence number
222
223 """
224 pass
225
226
227
229 """Send a message to the remote peer. This function is thread-safe.
230
231 @type message: string
232 @param message: message to send, without the terminator
233
234 @warning: If calling this function from a thread different than the one
235 performing the main asyncore loop, remember that you have to wake that one
236 up.
237
238 """
239
240
241
242
243
244 self.oqueue.append(message)
245
246
250
251
253
254
255
256 return asynchat.async_chat.writable(self) or self.oqueue
257
258
260 if self.oqueue:
261
262
263
264 data = self.oqueue.popleft()
265 self.push(data + self.terminator)
266 self.send_count += 1
267 if self.iqueue:
268 self.handle_message(*self.iqueue.popleft())
269 self.initiate_send()
270
275
276
279
280
282 """Log an error in handling any request, and proceed.
283
284 """
285 logging.exception("Error while handling asyncore request")
286 self.close_log()
287
290 """An improved asyncore udp socket.
291
292 """
301
302
308
309
311 recv_result = utils.IgnoreSignals(self.recvfrom,
312 constants.MAX_UDP_DATA_SIZE)
313 if recv_result is not None:
314 payload, address = recv_result
315 if self._family == socket.AF_INET6:
316
317 ip, port, _, _ = address
318 else:
319 ip, port = address
320
321 self.handle_datagram(payload, ip, port)
322
324 """Handle an already read udp datagram
325
326 """
327 raise NotImplementedError
328
329
331
332
333 return bool(self._out_queue)
334
335
337 if not self._out_queue:
338 logging.error("handle_write called with empty output queue")
339 return
340 (ip, port, payload) = self._out_queue[0]
341 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
342 self._out_queue.pop(0)
343
352
354 """Process the next datagram, waiting for it if necessary.
355
356 @type timeout: float
357 @param timeout: how long to wait for data
358 @rtype: boolean
359 @return: True if some data has been handled, False otherwise
360
361 """
362 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
363 if result is not None and result & select.POLLIN:
364 self.handle_read()
365 return True
366 else:
367 return False
368
371 """A way to notify the asyncore loop that something is going on.
372
373 If an asyncore daemon is multithreaded when a thread tries to push some data
374 to a socket, the main loop handling asynchronous requests might be sleeping
375 waiting on a select(). To avoid this it can create an instance of the
376 AsyncAwaker, which other threads can use to wake it up.
377
378 """
380 """Constructor for AsyncAwaker
381
382 @type signal_fn: function
383 @param signal_fn: function to call when awaken
384
385 """
386 GanetiBaseAsyncoreDispatcher.__init__(self)
387 assert signal_fn == None or callable(signal_fn)
388 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
389 socket.SOCK_STREAM)
390 self.in_socket.setblocking(0)
391 self.in_socket.shutdown(socket.SHUT_WR)
392 self.out_socket.shutdown(socket.SHUT_RD)
393 self.set_socket(self.in_socket)
394 self.need_signal = True
395 self.signal_fn = signal_fn
396 self.connected = True
397
398
400 utils.IgnoreSignals(self.recv, 4096)
401 if self.signal_fn:
402 self.signal_fn()
403 self.need_signal = True
404
405
407 asyncore.dispatcher.close(self)
408 self.out_socket.close()
409
411 """Signal the asyncore main loop.
412
413 Any data we send here will be ignored, but it will cause the select() call
414 to return.
415
416 """
417
418
419 if self.need_signal:
420 self.need_signal = False
421 self.out_socket.send("\0")
422
423
424 -class Mainloop(object):
425 """Generic mainloop for daemons
426
427 @ivar scheduler: A sched.scheduler object, which can be used to register
428 timed events
429
430 """
431 - def __init__(self):
432 """Constructs a new Mainloop instance.
433
434 """
435 self._signal_wait = []
436 self.scheduler = AsyncoreScheduler(time.time)
437
438
439 runtime.GetEnts()
440
441 @utils.SignalHandled([signal.SIGCHLD])
442 @utils.SignalHandled([signal.SIGTERM])
443 @utils.SignalHandled([signal.SIGINT])
444 - def Run(self, signal_handlers=None):
445 """Runs the mainloop.
446
447 @type signal_handlers: dict
448 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
449
450 """
451 assert isinstance(signal_handlers, dict) and \
452 len(signal_handlers) > 0, \
453 "Broken SignalHandled decorator"
454 running = True
455
456
457 while running:
458 if not self.scheduler.empty():
459 try:
460 self.scheduler.run()
461 except SchedulerBreakout:
462 pass
463 else:
464 asyncore.loop(count=1, use_poll=True)
465
466
467 for sig in signal_handlers:
468 handler = signal_handlers[sig]
469 if handler.called:
470 self._CallSignalWaiters(sig)
471 running = sig not in (signal.SIGTERM, signal.SIGINT)
472 handler.Clear()
473
474 - def _CallSignalWaiters(self, signum):
475 """Calls all signal waiters for a certain signal.
476
477 @type signum: int
478 @param signum: Signal number
479
480 """
481 for owner in self._signal_wait:
482 owner.OnSignal(signum)
483
484 - def RegisterSignal(self, owner):
485 """Registers a receiver for signal notifications
486
487 The receiver must support a "OnSignal(self, signum)" function.
488
489 @type owner: instance
490 @param owner: Receiver
491
492 """
493 self._signal_wait.append(owner)
494
497 """Verifies the process uid matches the configured uid.
498
499 This method verifies that a daemon is started as the user it is
500 intended to be run
501
502 @param daemon_name: The name of daemon to be started
503 @return: A tuple with the first item indicating success or not,
504 the second item current uid and third with expected uid
505
506 """
507 getents = runtime.GetEnts()
508 running_uid = os.getuid()
509 daemon_uids = {
510 constants.MASTERD: getents.masterd_uid,
511 constants.RAPI: getents.rapi_uid,
512 constants.NODED: getents.noded_uid,
513 constants.CONFD: getents.confd_uid,
514 }
515
516 return (daemon_uids[daemon_name] == running_uid, running_uid,
517 daemon_uids[daemon_name])
518
521 """Try to format an error better.
522
523 Since we're dealing with daemon startup errors, in many cases this
524 will be due to socket error and such, so we try to format these cases better.
525
526 @param err: an exception object
527 @rtype: string
528 @return: the formatted error description
529
530 """
531 try:
532 if isinstance(err, socket.error):
533 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
534 elif isinstance(err, EnvironmentError):
535 if err.filename is None:
536 return "%s (errno=%s)" % (err.strerror, err.errno)
537 else:
538 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
539 err.errno)
540 else:
541 return str(err)
542 except Exception:
543 logging.exception("Error while handling existing error %s", err)
544 return "%s" % str(err)
545
548 """Handler for SIGHUP.
549
550 @param reopen_fn: List of callback functions for reopening log files
551
552 """
553 logging.info("Reopening log files after receiving SIGHUP")
554
555 for fn in reopen_fn:
556 if fn:
557 fn()
558
559
560 -def GenericMain(daemon_name, optionparser,
561 check_fn, prepare_fn, exec_fn,
562 multithreaded=False, console_logging=False,
563 default_ssl_cert=None, default_ssl_key=None):
564 """Shared main function for daemons.
565
566 @type daemon_name: string
567 @param daemon_name: daemon name
568 @type optionparser: optparse.OptionParser
569 @param optionparser: initialized optionparser with daemon-specific options
570 (common -f -d options will be handled by this module)
571 @type check_fn: function which accepts (options, args)
572 @param check_fn: function that checks start conditions and exits if they're
573 not met
574 @type prepare_fn: function which accepts (options, args)
575 @param prepare_fn: function that is run before forking, or None;
576 it's result will be passed as the third parameter to exec_fn, or
577 if None was passed in, we will just pass None to exec_fn
578 @type exec_fn: function which accepts (options, args, prepare_results)
579 @param exec_fn: function that's executed with the daemon's pid file held, and
580 runs the daemon itself.
581 @type multithreaded: bool
582 @param multithreaded: Whether the daemon uses threads
583 @type console_logging: boolean
584 @param console_logging: if True, the daemon will fall back to the system
585 console if logging fails
586 @type default_ssl_cert: string
587 @param default_ssl_cert: Default SSL certificate path
588 @type default_ssl_key: string
589 @param default_ssl_key: Default SSL key path
590
591 """
592 optionparser.add_option("-f", "--foreground", dest="fork",
593 help="Don't detach from the current terminal",
594 default=True, action="store_false")
595 optionparser.add_option("-d", "--debug", dest="debug",
596 help="Enable some debug messages",
597 default=False, action="store_true")
598 optionparser.add_option("--syslog", dest="syslog",
599 help="Enable logging to syslog (except debug"
600 " messages); one of 'no', 'yes' or 'only' [%s]" %
601 constants.SYSLOG_USAGE,
602 default=constants.SYSLOG_USAGE,
603 choices=["no", "yes", "only"])
604
605 if daemon_name in constants.DAEMONS_PORTS:
606 default_bind_address = constants.IP4_ADDRESS_ANY
607 family = ssconf.SimpleStore().GetPrimaryIPFamily()
608
609
610
611 if family == netutils.IP6Address.family:
612 default_bind_address = constants.IP6_ADDRESS_ANY
613
614 default_port = netutils.GetDaemonPort(daemon_name)
615
616
617 optionparser.add_option("-p", "--port", dest="port",
618 help="Network port (default: %s)" % default_port,
619 default=default_port, type="int")
620 optionparser.add_option("-b", "--bind", dest="bind_address",
621 help=("Bind address (default: '%s')" %
622 default_bind_address),
623 default=default_bind_address, metavar="ADDRESS")
624
625 if default_ssl_key is not None and default_ssl_cert is not None:
626 optionparser.add_option("--no-ssl", dest="ssl",
627 help="Do not secure HTTP protocol with SSL",
628 default=True, action="store_false")
629 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
630 help=("SSL key path (default: %s)" %
631 default_ssl_key),
632 default=default_ssl_key, type="string",
633 metavar="SSL_KEY_PATH")
634 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
635 help=("SSL certificate path (default: %s)" %
636 default_ssl_cert),
637 default=default_ssl_cert, type="string",
638 metavar="SSL_CERT_PATH")
639
640
641 if multithreaded:
642 utils.DisableFork()
643
644 options, args = optionparser.parse_args()
645
646 if getattr(options, "ssl", False):
647 ssl_paths = {
648 "certificate": options.ssl_cert,
649 "key": options.ssl_key,
650 }
651
652 for name, path in ssl_paths.iteritems():
653 if not os.path.isfile(path):
654 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
655 sys.exit(constants.EXIT_FAILURE)
656
657
658
659
660
661 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
662 if not result:
663 msg = ("%s started using wrong user ID (%d), expected %d" %
664 (daemon_name, running_uid, expected_uid))
665 print >> sys.stderr, msg
666 sys.exit(constants.EXIT_FAILURE)
667
668 if check_fn is not None:
669 check_fn(options, args)
670
671 if options.fork:
672 utils.CloseFDs()
673 (wpipe, stdio_reopen_fn) = \
674 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
675 else:
676 (wpipe, stdio_reopen_fn) = (None, None)
677
678 log_reopen_fn = \
679 utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
680 debug=options.debug,
681 stderr_logging=not options.fork,
682 multithreaded=multithreaded,
683 syslog=options.syslog,
684 console_logging=console_logging)
685
686
687 signal.signal(signal.SIGHUP,
688 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
689
690 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
691 try:
692 try:
693 logging.info("%s daemon startup", daemon_name)
694 if callable(prepare_fn):
695 prep_results = prepare_fn(options, args)
696 else:
697 prep_results = None
698 except Exception, err:
699 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
700 raise
701
702 if wpipe is not None:
703
704
705 os.close(wpipe)
706
707 exec_fn(options, args, prep_results)
708 finally:
709 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
710