1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import grp
29 import os
30 import pwd
31 import signal
32 import logging
33 import sched
34 import time
35 import socket
36 import select
37 import sys
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import netutils
43
44
45 _DEFAULT_RUN_USER = "root"
46 _DEFAULT_RUN_GROUP = "root"
50 """Exception used to get out of the scheduler loop
51
52 """
53
56 """Asyncore-compatible scheduler delay function.
57
58 This is a delay function for sched that, rather than actually sleeping,
59 executes asyncore events happening in the meantime.
60
61 After an event has occurred, rather than returning, it raises a
62 SchedulerBreakout exception, which will force the current scheduler.run()
63 invocation to terminate, so that we can also check for signals. The main loop
64 will then call the scheduler run again, which will allow it to actually
65 process any due events.
66
67 This is needed because scheduler.run() doesn't support a count=..., as
68 asyncore loop, and the scheduler module documents throwing exceptions from
69 inside the delay function as an allowed usage model.
70
71 """
72 asyncore.loop(timeout=timeout, count=1, use_poll=True)
73 raise SchedulerBreakout()
74
77 """Event scheduler integrated with asyncore
78
79 """
82
85 """Base Ganeti Asyncore Dispacher
86
87 """
88
90 """Log an error in handling any request, and proceed.
91
92 """
93 logging.exception("Error while handling asyncore request")
94
95
97 """Most of the time we don't want to check for writability.
98
99 """
100 return False
101
118
121 """A stream server to use with asyncore.
122
123 Each request is accepted, and then dispatched to a separate asyncore
124 dispatcher to handle.
125
126 """
127
128 _REQUEST_QUEUE_SIZE = 5
129
131 """Constructor for AsyncUnixStreamSocket
132
133 @type family: integer
134 @param family: socket family (one of socket.AF_*)
135 @type address: address family dependent
136 @param address: address to bind the socket to
137
138 """
139 GanetiBaseAsyncoreDispatcher.__init__(self)
140 self.family = family
141 self.create_socket(self.family, socket.SOCK_STREAM)
142 self.set_reuse_addr()
143 self.bind(address)
144 self.listen(self._REQUEST_QUEUE_SIZE)
145
146
148 """Accept a new client connection.
149
150 Creates a new instance of the handler class, which will use asyncore to
151 serve the client.
152
153 """
154 accept_result = utils.IgnoreSignals(self.accept)
155 if accept_result is not None:
156 connected_socket, client_address = accept_result
157 if self.family == socket.AF_UNIX:
158
159
160 client_address = netutils.GetSocketCredentials(connected_socket)
161 logging.info("Accepted connection from %s",
162 FormatAddress(self.family, client_address))
163 self.handle_connection(connected_socket, client_address)
164
166 """Handle an already accepted connection.
167
168 """
169 raise NotImplementedError
170
173 """A terminator separated message stream asyncore module.
174
175 Handles a stream connection receiving messages terminated by a defined
176 separator. For each complete message handle_message is called.
177
178 """
179 - def __init__(self, connected_socket, peer_address, terminator, family,
180 unhandled_limit):
181 """AsyncTerminatedMessageStream constructor.
182
183 @type connected_socket: socket.socket
184 @param connected_socket: connected stream socket to receive messages from
185 @param peer_address: family-specific peer address
186 @type terminator: string
187 @param terminator: terminator separating messages in the stream
188 @type family: integer
189 @param family: socket family
190 @type unhandled_limit: integer or None
191 @param unhandled_limit: maximum unanswered messages
192
193 """
194
195
196 asynchat.async_chat.__init__(self, connected_socket)
197 self.connected_socket = connected_socket
198
199
200
201 self.family = family
202 self.peer_address = peer_address
203 self.terminator = terminator
204 self.unhandled_limit = unhandled_limit
205 self.set_terminator(terminator)
206 self.ibuffer = []
207 self.receive_count = 0
208 self.send_count = 0
209 self.oqueue = collections.deque()
210 self.iqueue = collections.deque()
211
212
214 self.ibuffer.append(data)
215
217 return (self.unhandled_limit is None or
218 (self.receive_count < self.send_count + self.unhandled_limit) and
219 not self.iqueue)
220
221
223 message = "".join(self.ibuffer)
224 self.ibuffer = []
225 message_id = self.receive_count
226
227
228 can_handle = self._can_handle_message()
229 self.receive_count += 1
230 if can_handle:
231 self.handle_message(message, message_id)
232 else:
233 self.iqueue.append((message, message_id))
234
236 """Handle a terminated message.
237
238 @type message: string
239 @param message: message to handle
240 @type message_id: integer
241 @param message_id: stream's message sequence number
242
243 """
244 pass
245
246
247
249 """Send a message to the remote peer. This function is thread-safe.
250
251 @type message: string
252 @param message: message to send, without the terminator
253
254 @warning: If calling this function from a thread different than the one
255 performing the main asyncore loop, remember that you have to wake that one
256 up.
257
258 """
259
260
261
262
263
264 self.oqueue.append(message)
265
266
270
271
273
274
275
276 return asynchat.async_chat.writable(self) or self.oqueue
277
278
280 if self.oqueue:
281
282
283
284 data = self.oqueue.popleft()
285 self.push(data + self.terminator)
286 self.send_count += 1
287 if self.iqueue:
288 self.handle_message(*self.iqueue.popleft())
289 self.initiate_send()
290
295
296
299
300
302 """Log an error in handling any request, and proceed.
303
304 """
305 logging.exception("Error while handling asyncore request")
306 self.close_log()
307
310 """An improved asyncore udp socket.
311
312 """
321
322
328
329
331 recv_result = utils.IgnoreSignals(self.recvfrom,
332 constants.MAX_UDP_DATA_SIZE)
333 if recv_result is not None:
334 payload, address = recv_result
335 if self._family == socket.AF_INET6:
336
337 ip, port, _, _ = address
338 else:
339 ip, port = address
340
341 self.handle_datagram(payload, ip, port)
342
344 """Handle an already read udp datagram
345
346 """
347 raise NotImplementedError
348
349
351
352
353 return bool(self._out_queue)
354
355
357 if not self._out_queue:
358 logging.error("handle_write called with empty output queue")
359 return
360 (ip, port, payload) = self._out_queue[0]
361 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
362 self._out_queue.pop(0)
363
372
374 """Process the next datagram, waiting for it if necessary.
375
376 @type timeout: float
377 @param timeout: how long to wait for data
378 @rtype: boolean
379 @return: True if some data has been handled, False otherwise
380
381 """
382 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
383 if result is not None and result & select.POLLIN:
384 self.handle_read()
385 return True
386 else:
387 return False
388
391 """A way to notify the asyncore loop that something is going on.
392
393 If an asyncore daemon is multithreaded when a thread tries to push some data
394 to a socket, the main loop handling asynchronous requests might be sleeping
395 waiting on a select(). To avoid this it can create an instance of the
396 AsyncAwaker, which other threads can use to wake it up.
397
398 """
400 """Constructor for AsyncAwaker
401
402 @type signal_fn: function
403 @param signal_fn: function to call when awaken
404
405 """
406 GanetiBaseAsyncoreDispatcher.__init__(self)
407 assert signal_fn == None or callable(signal_fn)
408 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
409 socket.SOCK_STREAM)
410 self.in_socket.setblocking(0)
411 self.in_socket.shutdown(socket.SHUT_WR)
412 self.out_socket.shutdown(socket.SHUT_RD)
413 self.set_socket(self.in_socket)
414 self.need_signal = True
415 self.signal_fn = signal_fn
416 self.connected = True
417
418
420 utils.IgnoreSignals(self.recv, 4096)
421 if self.signal_fn:
422 self.signal_fn()
423 self.need_signal = True
424
425
427 asyncore.dispatcher.close(self)
428 self.out_socket.close()
429
431 """Signal the asyncore main loop.
432
433 Any data we send here will be ignored, but it will cause the select() call
434 to return.
435
436 """
437
438
439 if self.need_signal:
440 self.need_signal = False
441 self.out_socket.send("\0")
442
443
444 -class Mainloop(object):
445 """Generic mainloop for daemons
446
447 @ivar scheduler: A sched.scheduler object, which can be used to register
448 timed events
449
450 """
451 - def __init__(self):
452 """Constructs a new Mainloop instance.
453
454 """
455 self._signal_wait = []
456 self.scheduler = AsyncoreScheduler(time.time)
457
458 @utils.SignalHandled([signal.SIGCHLD])
459 @utils.SignalHandled([signal.SIGTERM])
460 @utils.SignalHandled([signal.SIGINT])
461 - def Run(self, signal_handlers=None):
462 """Runs the mainloop.
463
464 @type signal_handlers: dict
465 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
466
467 """
468 assert isinstance(signal_handlers, dict) and \
469 len(signal_handlers) > 0, \
470 "Broken SignalHandled decorator"
471 running = True
472
473 while running:
474 if not self.scheduler.empty():
475 try:
476 self.scheduler.run()
477 except SchedulerBreakout:
478 pass
479 else:
480 asyncore.loop(count=1, use_poll=True)
481
482
483 for sig in signal_handlers:
484 handler = signal_handlers[sig]
485 if handler.called:
486 self._CallSignalWaiters(sig)
487 running = sig not in (signal.SIGTERM, signal.SIGINT)
488 handler.Clear()
489
490 - def _CallSignalWaiters(self, signum):
491 """Calls all signal waiters for a certain signal.
492
493 @type signum: int
494 @param signum: Signal number
495
496 """
497 for owner in self._signal_wait:
498 owner.OnSignal(signum)
499
500 - def RegisterSignal(self, owner):
501 """Registers a receiver for signal notifications
502
503 The receiver must support a "OnSignal(self, signum)" function.
504
505 @type owner: instance
506 @param owner: Receiver
507
508 """
509 self._signal_wait.append(owner)
510
511
512 -def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
513 multithreaded=False, console_logging=False,
514 default_ssl_cert=None, default_ssl_key=None,
515 user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
516 """Shared main function for daemons.
517
518 @type daemon_name: string
519 @param daemon_name: daemon name
520 @type optionparser: optparse.OptionParser
521 @param optionparser: initialized optionparser with daemon-specific options
522 (common -f -d options will be handled by this module)
523 @type dirs: list of (string, integer)
524 @param dirs: list of directories that must be created if they don't exist,
525 and the permissions to be used to create them
526 @type check_fn: function which accepts (options, args)
527 @param check_fn: function that checks start conditions and exits if they're
528 not met
529 @type exec_fn: function which accepts (options, args)
530 @param exec_fn: function that's executed with the daemon's pid file held, and
531 runs the daemon itself.
532 @type multithreaded: bool
533 @param multithreaded: Whether the daemon uses threads
534 @type console_logging: boolean
535 @param console_logging: if True, the daemon will fall back to the system
536 console if logging fails
537 @type default_ssl_cert: string
538 @param default_ssl_cert: Default SSL certificate path
539 @type default_ssl_key: string
540 @param default_ssl_key: Default SSL key path
541 @param user: Default user to run as
542 @type user: string
543 @param group: Default group to run as
544 @type group: string
545
546 """
547 optionparser.add_option("-f", "--foreground", dest="fork",
548 help="Don't detach from the current terminal",
549 default=True, action="store_false")
550 optionparser.add_option("-d", "--debug", dest="debug",
551 help="Enable some debug messages",
552 default=False, action="store_true")
553 optionparser.add_option("--syslog", dest="syslog",
554 help="Enable logging to syslog (except debug"
555 " messages); one of 'no', 'yes' or 'only' [%s]" %
556 constants.SYSLOG_USAGE,
557 default=constants.SYSLOG_USAGE,
558 choices=["no", "yes", "only"])
559
560 if daemon_name in constants.DAEMONS_PORTS:
561 default_bind_address = constants.IP4_ADDRESS_ANY
562 default_port = netutils.GetDaemonPort(daemon_name)
563
564
565 optionparser.add_option("-p", "--port", dest="port",
566 help="Network port (default: %s)" % default_port,
567 default=default_port, type="int")
568 optionparser.add_option("-b", "--bind", dest="bind_address",
569 help=("Bind address (default: %s)" %
570 default_bind_address),
571 default=default_bind_address, metavar="ADDRESS")
572
573 if default_ssl_key is not None and default_ssl_cert is not None:
574 optionparser.add_option("--no-ssl", dest="ssl",
575 help="Do not secure HTTP protocol with SSL",
576 default=True, action="store_false")
577 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
578 help=("SSL key path (default: %s)" %
579 default_ssl_key),
580 default=default_ssl_key, type="string",
581 metavar="SSL_KEY_PATH")
582 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
583 help=("SSL certificate path (default: %s)" %
584 default_ssl_cert),
585 default=default_ssl_cert, type="string",
586 metavar="SSL_CERT_PATH")
587
588
589 utils.no_fork = multithreaded
590
591 options, args = optionparser.parse_args()
592
593 if getattr(options, "ssl", False):
594 ssl_paths = {
595 "certificate": options.ssl_cert,
596 "key": options.ssl_key,
597 }
598
599 for name, path in ssl_paths.iteritems():
600 if not os.path.isfile(path):
601 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
602 sys.exit(constants.EXIT_FAILURE)
603
604
605
606
607
608 if check_fn is not None:
609 check_fn(options, args)
610
611 utils.EnsureDirs(dirs)
612
613 if options.fork:
614 try:
615 uid = pwd.getpwnam(user).pw_uid
616 gid = grp.getgrnam(group).gr_gid
617 except KeyError:
618 raise errors.ConfigurationError("User or group not existing on system:"
619 " %s:%s" % (user, group))
620 utils.CloseFDs()
621 utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
622
623 utils.WritePidFile(daemon_name)
624 try:
625 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
626 debug=options.debug,
627 stderr_logging=not options.fork,
628 multithreaded=multithreaded,
629 program=daemon_name,
630 syslog=options.syslog,
631 console_logging=console_logging)
632 logging.info("%s daemon startup", daemon_name)
633 exec_fn(options, args)
634 finally:
635 utils.RemovePidFile(daemon_name)
636