1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import os
29 import signal
30 import logging
31 import sched
32 import time
33 import socket
34 import select
35 import sys
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
46 """Exception used to get out of the scheduler loop
47
48 """
49
52 """Asyncore-compatible scheduler delay function.
53
54 This is a delay function for sched that, rather than actually sleeping,
55 executes asyncore events happening in the meantime.
56
57 After an event has occurred, rather than returning, it raises a
58 SchedulerBreakout exception, which will force the current scheduler.run()
59 invocation to terminate, so that we can also check for signals. The main loop
60 will then call the scheduler run again, which will allow it to actually
61 process any due events.
62
63 This is needed because scheduler.run() doesn't support a count=..., as
64 asyncore loop, and the scheduler module documents throwing exceptions from
65 inside the delay function as an allowed usage model.
66
67 """
68 asyncore.loop(timeout=timeout, count=1, use_poll=True)
69 raise SchedulerBreakout()
70
73 """Event scheduler integrated with asyncore
74
75 """
78
81 """Base Ganeti Asyncore Dispacher
82
83 """
84
86 """Log an error in handling any request, and proceed.
87
88 """
89 logging.exception("Error while handling asyncore request")
90
91
93 """Most of the time we don't want to check for writability.
94
95 """
96 return False
97
100 """A stream server to use with asyncore.
101
102 Each request is accepted, and then dispatched to a separate asyncore
103 dispatcher to handle.
104
105 """
106
107 _REQUEST_QUEUE_SIZE = 5
108
110 """Constructor for AsyncUnixStreamSocket
111
112 @type family: integer
113 @param family: socket family (one of socket.AF_*)
114 @type address: address family dependent
115 @param address: address to bind the socket to
116
117 """
118 GanetiBaseAsyncoreDispatcher.__init__(self)
119 self.family = family
120 self.create_socket(self.family, socket.SOCK_STREAM)
121 self.set_reuse_addr()
122 self.bind(address)
123 self.listen(self._REQUEST_QUEUE_SIZE)
124
125
127 """Accept a new client connection.
128
129 Creates a new instance of the handler class, which will use asyncore to
130 serve the client.
131
132 """
133 accept_result = utils.IgnoreSignals(self.accept)
134 if accept_result is not None:
135 connected_socket, client_address = accept_result
136 if self.family == socket.AF_UNIX:
137
138
139 client_address = netutils.GetSocketCredentials(connected_socket)
140 logging.info("Accepted connection from %s",
141 netutils.FormatAddress(client_address, family=self.family))
142 self.handle_connection(connected_socket, client_address)
143
145 """Handle an already accepted connection.
146
147 """
148 raise NotImplementedError
149
152 """A terminator separated message stream asyncore module.
153
154 Handles a stream connection receiving messages terminated by a defined
155 separator. For each complete message handle_message is called.
156
157 """
158 - def __init__(self, connected_socket, peer_address, terminator, family,
159 unhandled_limit):
160 """AsyncTerminatedMessageStream constructor.
161
162 @type connected_socket: socket.socket
163 @param connected_socket: connected stream socket to receive messages from
164 @param peer_address: family-specific peer address
165 @type terminator: string
166 @param terminator: terminator separating messages in the stream
167 @type family: integer
168 @param family: socket family
169 @type unhandled_limit: integer or None
170 @param unhandled_limit: maximum unanswered messages
171
172 """
173
174
175 asynchat.async_chat.__init__(self, connected_socket)
176 self.connected_socket = connected_socket
177
178
179
180 self.family = family
181 self.peer_address = peer_address
182 self.terminator = terminator
183 self.unhandled_limit = unhandled_limit
184 self.set_terminator(terminator)
185 self.ibuffer = []
186 self.receive_count = 0
187 self.send_count = 0
188 self.oqueue = collections.deque()
189 self.iqueue = collections.deque()
190
191
193 self.ibuffer.append(data)
194
196 return (self.unhandled_limit is None or
197 (self.receive_count < self.send_count + self.unhandled_limit) and
198 not self.iqueue)
199
200
202 message = "".join(self.ibuffer)
203 self.ibuffer = []
204 message_id = self.receive_count
205
206
207 can_handle = self._can_handle_message()
208 self.receive_count += 1
209 if can_handle:
210 self.handle_message(message, message_id)
211 else:
212 self.iqueue.append((message, message_id))
213
215 """Handle a terminated message.
216
217 @type message: string
218 @param message: message to handle
219 @type message_id: integer
220 @param message_id: stream's message sequence number
221
222 """
223 pass
224
225
226
228 """Send a message to the remote peer. This function is thread-safe.
229
230 @type message: string
231 @param message: message to send, without the terminator
232
233 @warning: If calling this function from a thread different than the one
234 performing the main asyncore loop, remember that you have to wake that one
235 up.
236
237 """
238
239
240
241
242
243 self.oqueue.append(message)
244
245
249
250
252
253
254
255 return asynchat.async_chat.writable(self) or self.oqueue
256
257
259 if self.oqueue:
260
261
262
263 data = self.oqueue.popleft()
264 self.push(data + self.terminator)
265 self.send_count += 1
266 if self.iqueue:
267 self.handle_message(*self.iqueue.popleft())
268 self.initiate_send()
269
274
275
278
279
281 """Log an error in handling any request, and proceed.
282
283 """
284 logging.exception("Error while handling asyncore request")
285 self.close_log()
286
289 """An improved asyncore udp socket.
290
291 """
300
301
307
308
310 recv_result = utils.IgnoreSignals(self.recvfrom,
311 constants.MAX_UDP_DATA_SIZE)
312 if recv_result is not None:
313 payload, address = recv_result
314 if self._family == socket.AF_INET6:
315
316 ip, port, _, _ = address
317 else:
318 ip, port = address
319
320 self.handle_datagram(payload, ip, port)
321
323 """Handle an already read udp datagram
324
325 """
326 raise NotImplementedError
327
328
330
331
332 return bool(self._out_queue)
333
334
336 if not self._out_queue:
337 logging.error("handle_write called with empty output queue")
338 return
339 (ip, port, payload) = self._out_queue[0]
340 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
341 self._out_queue.pop(0)
342
351
353 """Process the next datagram, waiting for it if necessary.
354
355 @type timeout: float
356 @param timeout: how long to wait for data
357 @rtype: boolean
358 @return: True if some data has been handled, False otherwise
359
360 """
361 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
362 if result is not None and result & select.POLLIN:
363 self.handle_read()
364 return True
365 else:
366 return False
367
370 """A way to notify the asyncore loop that something is going on.
371
372 If an asyncore daemon is multithreaded when a thread tries to push some data
373 to a socket, the main loop handling asynchronous requests might be sleeping
374 waiting on a select(). To avoid this it can create an instance of the
375 AsyncAwaker, which other threads can use to wake it up.
376
377 """
379 """Constructor for AsyncAwaker
380
381 @type signal_fn: function
382 @param signal_fn: function to call when awaken
383
384 """
385 GanetiBaseAsyncoreDispatcher.__init__(self)
386 assert signal_fn == None or callable(signal_fn)
387 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
388 socket.SOCK_STREAM)
389 self.in_socket.setblocking(0)
390 self.in_socket.shutdown(socket.SHUT_WR)
391 self.out_socket.shutdown(socket.SHUT_RD)
392 self.set_socket(self.in_socket)
393 self.need_signal = True
394 self.signal_fn = signal_fn
395 self.connected = True
396
397
399 utils.IgnoreSignals(self.recv, 4096)
400 if self.signal_fn:
401 self.signal_fn()
402 self.need_signal = True
403
404
406 asyncore.dispatcher.close(self)
407 self.out_socket.close()
408
410 """Signal the asyncore main loop.
411
412 Any data we send here will be ignored, but it will cause the select() call
413 to return.
414
415 """
416
417
418 if self.need_signal:
419 self.need_signal = False
420 self.out_socket.send("\0")
421
422
423 -class Mainloop(object):
424 """Generic mainloop for daemons
425
426 @ivar scheduler: A sched.scheduler object, which can be used to register
427 timed events
428
429 """
430 - def __init__(self):
431 """Constructs a new Mainloop instance.
432
433 """
434 self._signal_wait = []
435 self.scheduler = AsyncoreScheduler(time.time)
436
437 @utils.SignalHandled([signal.SIGCHLD])
438 @utils.SignalHandled([signal.SIGTERM])
439 @utils.SignalHandled([signal.SIGINT])
440 - def Run(self, signal_handlers=None):
441 """Runs the mainloop.
442
443 @type signal_handlers: dict
444 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
445
446 """
447 assert isinstance(signal_handlers, dict) and \
448 len(signal_handlers) > 0, \
449 "Broken SignalHandled decorator"
450 running = True
451
452 while running:
453 if not self.scheduler.empty():
454 try:
455 self.scheduler.run()
456 except SchedulerBreakout:
457 pass
458 else:
459 asyncore.loop(count=1, use_poll=True)
460
461
462 for sig in signal_handlers:
463 handler = signal_handlers[sig]
464 if handler.called:
465 self._CallSignalWaiters(sig)
466 running = sig not in (signal.SIGTERM, signal.SIGINT)
467 handler.Clear()
468
469 - def _CallSignalWaiters(self, signum):
470 """Calls all signal waiters for a certain signal.
471
472 @type signum: int
473 @param signum: Signal number
474
475 """
476 for owner in self._signal_wait:
477 owner.OnSignal(signum)
478
479 - def RegisterSignal(self, owner):
480 """Registers a receiver for signal notifications
481
482 The receiver must support a "OnSignal(self, signum)" function.
483
484 @type owner: instance
485 @param owner: Receiver
486
487 """
488 self._signal_wait.append(owner)
489
492 """Verifies the process uid matches the configured uid.
493
494 This method verifies that a daemon is started as the user it is
495 intended to be run
496
497 @param daemon_name: The name of daemon to be started
498 @return: A tuple with the first item indicating success or not,
499 the second item current uid and third with expected uid
500
501 """
502 getents = runtime.GetEnts()
503 running_uid = os.getuid()
504 daemon_uids = {
505 constants.MASTERD: getents.masterd_uid,
506 constants.RAPI: getents.rapi_uid,
507 constants.NODED: getents.noded_uid,
508 constants.CONFD: getents.confd_uid,
509 }
510
511 return (daemon_uids[daemon_name] == running_uid, running_uid,
512 daemon_uids[daemon_name])
513
516 """Try to format an error better.
517
518 Since we're dealing with daemon startup errors, in many cases this
519 will be due to socket error and such, so we try to format these cases better.
520
521 @param err: an exception object
522 @rtype: string
523 @return: the formatted error description
524
525 """
526 try:
527 if isinstance(err, socket.error):
528 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
529 elif isinstance(err, EnvironmentError):
530 if err.filename is None:
531 return "%s (errno=%s)" % (err.strerror, err.errno)
532 else:
533 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
534 err.errno)
535 else:
536 return str(err)
537 except Exception:
538 logging.exception("Error while handling existing error %s", err)
539 return "%s" % str(err)
540
541
542 -def GenericMain(daemon_name, optionparser,
543 check_fn, prepare_fn, exec_fn,
544 multithreaded=False, console_logging=False,
545 default_ssl_cert=None, default_ssl_key=None):
546 """Shared main function for daemons.
547
548 @type daemon_name: string
549 @param daemon_name: daemon name
550 @type optionparser: optparse.OptionParser
551 @param optionparser: initialized optionparser with daemon-specific options
552 (common -f -d options will be handled by this module)
553 @type check_fn: function which accepts (options, args)
554 @param check_fn: function that checks start conditions and exits if they're
555 not met
556 @type prepare_fn: function which accepts (options, args)
557 @param prepare_fn: function that is run before forking, or None;
558 it's result will be passed as the third parameter to exec_fn, or
559 if None was passed in, we will just pass None to exec_fn
560 @type exec_fn: function which accepts (options, args, prepare_results)
561 @param exec_fn: function that's executed with the daemon's pid file held, and
562 runs the daemon itself.
563 @type multithreaded: bool
564 @param multithreaded: Whether the daemon uses threads
565 @type console_logging: boolean
566 @param console_logging: if True, the daemon will fall back to the system
567 console if logging fails
568 @type default_ssl_cert: string
569 @param default_ssl_cert: Default SSL certificate path
570 @type default_ssl_key: string
571 @param default_ssl_key: Default SSL key path
572
573 """
574 optionparser.add_option("-f", "--foreground", dest="fork",
575 help="Don't detach from the current terminal",
576 default=True, action="store_false")
577 optionparser.add_option("-d", "--debug", dest="debug",
578 help="Enable some debug messages",
579 default=False, action="store_true")
580 optionparser.add_option("--syslog", dest="syslog",
581 help="Enable logging to syslog (except debug"
582 " messages); one of 'no', 'yes' or 'only' [%s]" %
583 constants.SYSLOG_USAGE,
584 default=constants.SYSLOG_USAGE,
585 choices=["no", "yes", "only"])
586
587 if daemon_name in constants.DAEMONS_PORTS:
588 default_bind_address = constants.IP4_ADDRESS_ANY
589 family = ssconf.SimpleStore().GetPrimaryIPFamily()
590
591
592
593 if family == netutils.IP6Address.family:
594 default_bind_address = constants.IP6_ADDRESS_ANY
595
596 default_port = netutils.GetDaemonPort(daemon_name)
597
598
599 optionparser.add_option("-p", "--port", dest="port",
600 help="Network port (default: %s)" % default_port,
601 default=default_port, type="int")
602 optionparser.add_option("-b", "--bind", dest="bind_address",
603 help=("Bind address (default: '%s')" %
604 default_bind_address),
605 default=default_bind_address, metavar="ADDRESS")
606
607 if default_ssl_key is not None and default_ssl_cert is not None:
608 optionparser.add_option("--no-ssl", dest="ssl",
609 help="Do not secure HTTP protocol with SSL",
610 default=True, action="store_false")
611 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
612 help=("SSL key path (default: %s)" %
613 default_ssl_key),
614 default=default_ssl_key, type="string",
615 metavar="SSL_KEY_PATH")
616 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
617 help=("SSL certificate path (default: %s)" %
618 default_ssl_cert),
619 default=default_ssl_cert, type="string",
620 metavar="SSL_CERT_PATH")
621
622
623 utils.no_fork = multithreaded
624
625 options, args = optionparser.parse_args()
626
627 if getattr(options, "ssl", False):
628 ssl_paths = {
629 "certificate": options.ssl_cert,
630 "key": options.ssl_key,
631 }
632
633 for name, path in ssl_paths.iteritems():
634 if not os.path.isfile(path):
635 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
636 sys.exit(constants.EXIT_FAILURE)
637
638
639
640
641
642 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
643 if not result:
644 msg = ("%s started using wrong user ID (%d), expected %d" %
645 (daemon_name, running_uid, expected_uid))
646 print >> sys.stderr, msg
647 sys.exit(constants.EXIT_FAILURE)
648
649 if check_fn is not None:
650 check_fn(options, args)
651
652 if options.fork:
653 utils.CloseFDs()
654 wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
655 else:
656 wpipe = None
657
658 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
659 try:
660 try:
661 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
662 debug=options.debug,
663 stderr_logging=not options.fork,
664 multithreaded=multithreaded,
665 program=daemon_name,
666 syslog=options.syslog,
667 console_logging=console_logging)
668 if callable(prepare_fn):
669 prep_results = prepare_fn(options, args)
670 else:
671 prep_results = None
672 logging.info("%s daemon startup", daemon_name)
673 except Exception, err:
674 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
675 raise
676
677 if wpipe is not None:
678
679
680 os.close(wpipe)
681
682 exec_fn(options, args, prep_results)
683 finally:
684 utils.RemovePidFile(daemon_name)
685