Package ganeti :: Module daemon
[hide private]
[frames] | no frames]

Source Code for Module ganeti.daemon

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module with helper classes and functions for daemons""" 
 23   
 24   
 25  import asyncore 
 26  import asynchat 
 27  import collections 
 28  import os 
 29  import signal 
 30  import logging 
 31  import sched 
 32  import time 
 33  import socket 
 34  import select 
 35  import sys 
 36   
 37  from ganeti import utils 
 38  from ganeti import constants 
 39  from ganeti import errors 
 40  from ganeti import netutils 
 41  from ganeti import ssconf 
 42  from ganeti import runtime 
 43  from ganeti import compat 
44 45 46 -class SchedulerBreakout(Exception):
47 """Exception used to get out of the scheduler loop 48 49 """
50
51 52 -def AsyncoreDelayFunction(timeout):
53 """Asyncore-compatible scheduler delay function. 54 55 This is a delay function for sched that, rather than actually sleeping, 56 executes asyncore events happening in the meantime. 57 58 After an event has occurred, rather than returning, it raises a 59 SchedulerBreakout exception, which will force the current scheduler.run() 60 invocation to terminate, so that we can also check for signals. The main loop 61 will then call the scheduler run again, which will allow it to actually 62 process any due events. 63 64 This is needed because scheduler.run() doesn't support a count=..., as 65 asyncore loop, and the scheduler module documents throwing exceptions from 66 inside the delay function as an allowed usage model. 67 68 """ 69 asyncore.loop(timeout=timeout, count=1, use_poll=True) 70 raise SchedulerBreakout()
71
72 73 -class AsyncoreScheduler(sched.scheduler):
74 """Event scheduler integrated with asyncore 75 76 """
77 - def __init__(self, timefunc):
78 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
79
80 81 -class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
82 """Base Ganeti Asyncore Dispacher 83 84 """ 85 # this method is overriding an asyncore.dispatcher method
86 - def handle_error(self):
87 """Log an error in handling any request, and proceed. 88 89 """ 90 logging.exception("Error while handling asyncore request")
91 92 # this method is overriding an asyncore.dispatcher method
93 - def writable(self):
94 """Most of the time we don't want to check for writability. 95 96 """ 97 return False
98
99 100 -class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
101 """A stream server to use with asyncore. 102 103 Each request is accepted, and then dispatched to a separate asyncore 104 dispatcher to handle. 105 106 """ 107 108 _REQUEST_QUEUE_SIZE = 5 109
110 - def __init__(self, family, address):
111 """Constructor for AsyncUnixStreamSocket 112 113 @type family: integer 114 @param family: socket family (one of socket.AF_*) 115 @type address: address family dependent 116 @param address: address to bind the socket to 117 118 """ 119 GanetiBaseAsyncoreDispatcher.__init__(self) 120 self.family = family 121 self.create_socket(self.family, socket.SOCK_STREAM) 122 self.set_reuse_addr() 123 self.bind(address) 124 self.listen(self._REQUEST_QUEUE_SIZE)
125 126 # this method is overriding an asyncore.dispatcher method
127 - def handle_accept(self):
128 """Accept a new client connection. 129 130 Creates a new instance of the handler class, which will use asyncore to 131 serve the client. 132 133 """ 134 accept_result = utils.IgnoreSignals(self.accept) 135 if accept_result is not None: 136 connected_socket, client_address = accept_result 137 if self.family == socket.AF_UNIX: 138 # override the client address, as for unix sockets nothing meaningful 139 # is passed in from accept anyway 140 client_address = netutils.GetSocketCredentials(connected_socket) 141 logging.info("Accepted connection from %s", 142 netutils.FormatAddress(client_address, family=self.family)) 143 self.handle_connection(connected_socket, client_address)
144
145 - def handle_connection(self, connected_socket, client_address):
146 """Handle an already accepted connection. 147 148 """ 149 raise NotImplementedError
150
151 152 -class AsyncTerminatedMessageStream(asynchat.async_chat):
153 """A terminator separated message stream asyncore module. 154 155 Handles a stream connection receiving messages terminated by a defined 156 separator. For each complete message handle_message is called. 157 158 """
159 - def __init__(self, connected_socket, peer_address, terminator, family, 160 unhandled_limit):
161 """AsyncTerminatedMessageStream constructor. 162 163 @type connected_socket: socket.socket 164 @param connected_socket: connected stream socket to receive messages from 165 @param peer_address: family-specific peer address 166 @type terminator: string 167 @param terminator: terminator separating messages in the stream 168 @type family: integer 169 @param family: socket family 170 @type unhandled_limit: integer or None 171 @param unhandled_limit: maximum unanswered messages 172 173 """ 174 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by 175 # using a positional argument rather than a keyword one. 176 asynchat.async_chat.__init__(self, connected_socket) 177 self.connected_socket = connected_socket 178 # on python 2.4 there is no "family" attribute for the socket class 179 # FIXME: when we move to python 2.5 or above remove the family parameter 180 #self.family = self.connected_socket.family 181 self.family = family 182 self.peer_address = peer_address 183 self.terminator = terminator 184 self.unhandled_limit = unhandled_limit 185 self.set_terminator(terminator) 186 self.ibuffer = [] 187 self.receive_count = 0 188 self.send_count = 0 189 self.oqueue = collections.deque() 190 self.iqueue = collections.deque()
191 192 # this method is overriding an asynchat.async_chat method
193 - def collect_incoming_data(self, data):
194 self.ibuffer.append(data)
195
196 - def _can_handle_message(self):
197 return (self.unhandled_limit is None or 198 (self.receive_count < self.send_count + self.unhandled_limit) and 199 not self.iqueue)
200 201 # this method is overriding an asynchat.async_chat method
202 - def found_terminator(self):
203 message = "".join(self.ibuffer) 204 self.ibuffer = [] 205 message_id = self.receive_count 206 # We need to increase the receive_count after checking if the message can 207 # be handled, but before calling handle_message 208 can_handle = self._can_handle_message() 209 self.receive_count += 1 210 if can_handle: 211 self.handle_message(message, message_id) 212 else: 213 self.iqueue.append((message, message_id))
214
215 - def handle_message(self, message, message_id):
216 """Handle a terminated message. 217 218 @type message: string 219 @param message: message to handle 220 @type message_id: integer 221 @param message_id: stream's message sequence number 222 223 """ 224 pass
225 # TODO: move this method to raise NotImplementedError 226 # raise NotImplementedError 227
228 - def send_message(self, message):
229 """Send a message to the remote peer. This function is thread-safe. 230 231 @type message: string 232 @param message: message to send, without the terminator 233 234 @warning: If calling this function from a thread different than the one 235 performing the main asyncore loop, remember that you have to wake that one 236 up. 237 238 """ 239 # If we just append the message we received to the output queue, this 240 # function can be safely called by multiple threads at the same time, and 241 # we don't need locking, since deques are thread safe. handle_write in the 242 # asyncore thread will handle the next input message if there are any 243 # enqueued. 244 self.oqueue.append(message)
245 246 # this method is overriding an asyncore.dispatcher method
247 - def readable(self):
248 # read from the socket if we can handle the next requests 249 return self._can_handle_message() and asynchat.async_chat.readable(self)
250 251 # this method is overriding an asyncore.dispatcher method
252 - def writable(self):
253 # the output queue may become full just after we called writable. This only 254 # works if we know we'll have something else waking us up from the select, 255 # in such case, anyway. 256 return asynchat.async_chat.writable(self) or self.oqueue
257 258 # this method is overriding an asyncore.dispatcher method
259 - def handle_write(self):
260 if self.oqueue: 261 # if we have data in the output queue, then send_message was called. 262 # this means we can process one more message from the input queue, if 263 # there are any. 264 data = self.oqueue.popleft() 265 self.push(data + self.terminator) 266 self.send_count += 1 267 if self.iqueue: 268 self.handle_message(*self.iqueue.popleft()) 269 self.initiate_send()
270
271 - def close_log(self):
272 logging.info("Closing connection from %s", 273 netutils.FormatAddress(self.peer_address, family=self.family)) 274 self.close()
275 276 # this method is overriding an asyncore.dispatcher method
277 - def handle_expt(self):
278 self.close_log()
279 280 # this method is overriding an asyncore.dispatcher method
281 - def handle_error(self):
282 """Log an error in handling any request, and proceed. 283 284 """ 285 logging.exception("Error while handling asyncore request") 286 self.close_log()
287
288 289 -class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
290 """An improved asyncore udp socket. 291 292 """
293 - def __init__(self, family):
294 """Constructor for AsyncUDPSocket 295 296 """ 297 GanetiBaseAsyncoreDispatcher.__init__(self) 298 self._out_queue = [] 299 self._family = family 300 self.create_socket(family, socket.SOCK_DGRAM)
301 302 # this method is overriding an asyncore.dispatcher method
303 - def handle_connect(self):
304 # Python thinks that the first udp message from a source qualifies as a 305 # "connect" and further ones are part of the same connection. We beg to 306 # differ and treat all messages equally. 307 pass
308 309 # this method is overriding an asyncore.dispatcher method
310 - def handle_read(self):
311 recv_result = utils.IgnoreSignals(self.recvfrom, 312 constants.MAX_UDP_DATA_SIZE) 313 if recv_result is not None: 314 payload, address = recv_result 315 if self._family == socket.AF_INET6: 316 # we ignore 'flow info' and 'scope id' as we don't need them 317 ip, port, _, _ = address 318 else: 319 ip, port = address 320 321 self.handle_datagram(payload, ip, port)
322
323 - def handle_datagram(self, payload, ip, port):
324 """Handle an already read udp datagram 325 326 """ 327 raise NotImplementedError
328 329 # this method is overriding an asyncore.dispatcher method
330 - def writable(self):
331 # We should check whether we can write to the socket only if we have 332 # something scheduled to be written 333 return bool(self._out_queue)
334 335 # this method is overriding an asyncore.dispatcher method
336 - def handle_write(self):
337 if not self._out_queue: 338 logging.error("handle_write called with empty output queue") 339 return 340 (ip, port, payload) = self._out_queue[0] 341 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) 342 self._out_queue.pop(0)
343
344 - def enqueue_send(self, ip, port, payload):
345 """Enqueue a datagram to be sent when possible 346 347 """ 348 if len(payload) > constants.MAX_UDP_DATA_SIZE: 349 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload), 350 constants.MAX_UDP_DATA_SIZE)) 351 self._out_queue.append((ip, port, payload))
352
353 - def process_next_packet(self, timeout=0):
354 """Process the next datagram, waiting for it if necessary. 355 356 @type timeout: float 357 @param timeout: how long to wait for data 358 @rtype: boolean 359 @return: True if some data has been handled, False otherwise 360 361 """ 362 result = utils.WaitForFdCondition(self, select.POLLIN, timeout) 363 if result is not None and result & select.POLLIN: 364 self.handle_read() 365 return True 366 else: 367 return False
368
369 370 -class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
371 """A way to notify the asyncore loop that something is going on. 372 373 If an asyncore daemon is multithreaded when a thread tries to push some data 374 to a socket, the main loop handling asynchronous requests might be sleeping 375 waiting on a select(). To avoid this it can create an instance of the 376 AsyncAwaker, which other threads can use to wake it up. 377 378 """
379 - def __init__(self, signal_fn=None):
380 """Constructor for AsyncAwaker 381 382 @type signal_fn: function 383 @param signal_fn: function to call when awaken 384 385 """ 386 GanetiBaseAsyncoreDispatcher.__init__(self) 387 assert signal_fn == None or callable(signal_fn) 388 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, 389 socket.SOCK_STREAM) 390 self.in_socket.setblocking(0) 391 self.in_socket.shutdown(socket.SHUT_WR) 392 self.out_socket.shutdown(socket.SHUT_RD) 393 self.set_socket(self.in_socket) 394 self.need_signal = True 395 self.signal_fn = signal_fn 396 self.connected = True
397 398 # this method is overriding an asyncore.dispatcher method
399 - def handle_read(self):
400 utils.IgnoreSignals(self.recv, 4096) 401 if self.signal_fn: 402 self.signal_fn() 403 self.need_signal = True
404 405 # this method is overriding an asyncore.dispatcher method
406 - def close(self):
407 asyncore.dispatcher.close(self) 408 self.out_socket.close()
409
410 - def signal(self):
411 """Signal the asyncore main loop. 412 413 Any data we send here will be ignored, but it will cause the select() call 414 to return. 415 416 """ 417 # Yes, there is a race condition here. No, we don't care, at worst we're 418 # sending more than one wakeup token, which doesn't harm at all. 419 if self.need_signal: 420 self.need_signal = False 421 self.out_socket.send("\0")
422
423 424 -class Mainloop(object):
425 """Generic mainloop for daemons 426 427 @ivar scheduler: A sched.scheduler object, which can be used to register 428 timed events 429 430 """
431 - def __init__(self):
432 """Constructs a new Mainloop instance. 433 434 """ 435 self._signal_wait = [] 436 self.scheduler = AsyncoreScheduler(time.time) 437 438 # Resolve uid/gids used 439 runtime.GetEnts()
440 441 @utils.SignalHandled([signal.SIGCHLD]) 442 @utils.SignalHandled([signal.SIGTERM]) 443 @utils.SignalHandled([signal.SIGINT])
444 - def Run(self, signal_handlers=None):
445 """Runs the mainloop. 446 447 @type signal_handlers: dict 448 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator 449 450 """ 451 assert isinstance(signal_handlers, dict) and \ 452 len(signal_handlers) > 0, \ 453 "Broken SignalHandled decorator" 454 running = True 455 456 # Start actual main loop 457 while running: 458 if not self.scheduler.empty(): 459 try: 460 self.scheduler.run() 461 except SchedulerBreakout: 462 pass 463 else: 464 asyncore.loop(count=1, use_poll=True) 465 466 # Check whether a signal was raised 467 for sig in signal_handlers: 468 handler = signal_handlers[sig] 469 if handler.called: 470 self._CallSignalWaiters(sig) 471 running = sig not in (signal.SIGTERM, signal.SIGINT) 472 handler.Clear()
473
474 - def _CallSignalWaiters(self, signum):
475 """Calls all signal waiters for a certain signal. 476 477 @type signum: int 478 @param signum: Signal number 479 480 """ 481 for owner in self._signal_wait: 482 owner.OnSignal(signum)
483
484 - def RegisterSignal(self, owner):
485 """Registers a receiver for signal notifications 486 487 The receiver must support a "OnSignal(self, signum)" function. 488 489 @type owner: instance 490 @param owner: Receiver 491 492 """ 493 self._signal_wait.append(owner)
494
495 496 -def _VerifyDaemonUser(daemon_name):
497 """Verifies the process uid matches the configured uid. 498 499 This method verifies that a daemon is started as the user it is 500 intended to be run 501 502 @param daemon_name: The name of daemon to be started 503 @return: A tuple with the first item indicating success or not, 504 the second item current uid and third with expected uid 505 506 """ 507 getents = runtime.GetEnts() 508 running_uid = os.getuid() 509 daemon_uids = { 510 constants.MASTERD: getents.masterd_uid, 511 constants.RAPI: getents.rapi_uid, 512 constants.NODED: getents.noded_uid, 513 constants.CONFD: getents.confd_uid, 514 } 515 516 return (daemon_uids[daemon_name] == running_uid, running_uid, 517 daemon_uids[daemon_name])
518
519 520 -def _BeautifyError(err):
521 """Try to format an error better. 522 523 Since we're dealing with daemon startup errors, in many cases this 524 will be due to socket error and such, so we try to format these cases better. 525 526 @param err: an exception object 527 @rtype: string 528 @return: the formatted error description 529 530 """ 531 try: 532 if isinstance(err, socket.error): 533 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0]) 534 elif isinstance(err, EnvironmentError): 535 if err.filename is None: 536 return "%s (errno=%s)" % (err.strerror, err.errno) 537 else: 538 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename, 539 err.errno) 540 else: 541 return str(err) 542 except Exception: # pylint: disable=W0703 543 logging.exception("Error while handling existing error %s", err) 544 return "%s" % str(err)
545
546 547 -def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
548 """Handler for SIGHUP. 549 550 @param reopen_fn: List of callback functions for reopening log files 551 552 """ 553 logging.info("Reopening log files after receiving SIGHUP") 554 555 for fn in reopen_fn: 556 if fn: 557 fn() 558
559 560 -def GenericMain(daemon_name, optionparser, 561 check_fn, prepare_fn, exec_fn, 562 multithreaded=False, console_logging=False, 563 default_ssl_cert=None, default_ssl_key=None):
564 """Shared main function for daemons. 565 566 @type daemon_name: string 567 @param daemon_name: daemon name 568 @type optionparser: optparse.OptionParser 569 @param optionparser: initialized optionparser with daemon-specific options 570 (common -f -d options will be handled by this module) 571 @type check_fn: function which accepts (options, args) 572 @param check_fn: function that checks start conditions and exits if they're 573 not met 574 @type prepare_fn: function which accepts (options, args) 575 @param prepare_fn: function that is run before forking, or None; 576 it's result will be passed as the third parameter to exec_fn, or 577 if None was passed in, we will just pass None to exec_fn 578 @type exec_fn: function which accepts (options, args, prepare_results) 579 @param exec_fn: function that's executed with the daemon's pid file held, and 580 runs the daemon itself. 581 @type multithreaded: bool 582 @param multithreaded: Whether the daemon uses threads 583 @type console_logging: boolean 584 @param console_logging: if True, the daemon will fall back to the system 585 console if logging fails 586 @type default_ssl_cert: string 587 @param default_ssl_cert: Default SSL certificate path 588 @type default_ssl_key: string 589 @param default_ssl_key: Default SSL key path 590 591 """ 592 optionparser.add_option("-f", "--foreground", dest="fork", 593 help="Don't detach from the current terminal", 594 default=True, action="store_false") 595 optionparser.add_option("-d", "--debug", dest="debug", 596 help="Enable some debug messages", 597 default=False, action="store_true") 598 optionparser.add_option("--syslog", dest="syslog", 599 help="Enable logging to syslog (except debug" 600 " messages); one of 'no', 'yes' or 'only' [%s]" % 601 constants.SYSLOG_USAGE, 602 default=constants.SYSLOG_USAGE, 603 choices=["no", "yes", "only"]) 604 605 if daemon_name in constants.DAEMONS_PORTS: 606 default_bind_address = constants.IP4_ADDRESS_ANY 607 family = ssconf.SimpleStore().GetPrimaryIPFamily() 608 # family will default to AF_INET if there is no ssconf file (e.g. when 609 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters 610 # <= 2.2 can not be AF_INET6 611 if family == netutils.IP6Address.family: 612 default_bind_address = constants.IP6_ADDRESS_ANY 613 614 default_port = netutils.GetDaemonPort(daemon_name) 615 616 # For networked daemons we allow choosing the port and bind address 617 optionparser.add_option("-p", "--port", dest="port", 618 help="Network port (default: %s)" % default_port, 619 default=default_port, type="int") 620 optionparser.add_option("-b", "--bind", dest="bind_address", 621 help=("Bind address (default: '%s')" % 622 default_bind_address), 623 default=default_bind_address, metavar="ADDRESS") 624 625 if default_ssl_key is not None and default_ssl_cert is not None: 626 optionparser.add_option("--no-ssl", dest="ssl", 627 help="Do not secure HTTP protocol with SSL", 628 default=True, action="store_false") 629 optionparser.add_option("-K", "--ssl-key", dest="ssl_key", 630 help=("SSL key path (default: %s)" % 631 default_ssl_key), 632 default=default_ssl_key, type="string", 633 metavar="SSL_KEY_PATH") 634 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", 635 help=("SSL certificate path (default: %s)" % 636 default_ssl_cert), 637 default=default_ssl_cert, type="string", 638 metavar="SSL_CERT_PATH") 639 640 # Disable the use of fork(2) if the daemon uses threads 641 if multithreaded: 642 utils.DisableFork() 643 644 options, args = optionparser.parse_args() 645 646 if getattr(options, "ssl", False): 647 ssl_paths = { 648 "certificate": options.ssl_cert, 649 "key": options.ssl_key, 650 } 651 652 for name, path in ssl_paths.iteritems(): 653 if not os.path.isfile(path): 654 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path) 655 sys.exit(constants.EXIT_FAILURE) 656 657 # TODO: By initiating http.HttpSslParams here we would only read the files 658 # once and have a proper validation (isfile returns False on directories) 659 # at the same time. 660 661 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name) 662 if not result: 663 msg = ("%s started using wrong user ID (%d), expected %d" % 664 (daemon_name, running_uid, expected_uid)) 665 print >> sys.stderr, msg 666 sys.exit(constants.EXIT_FAILURE) 667 668 if check_fn is not None: 669 check_fn(options, args) 670 671 if options.fork: 672 utils.CloseFDs() 673 (wpipe, stdio_reopen_fn) = \ 674 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name]) 675 else: 676 (wpipe, stdio_reopen_fn) = (None, None) 677 678 log_reopen_fn = \ 679 utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name, 680 debug=options.debug, 681 stderr_logging=not options.fork, 682 multithreaded=multithreaded, 683 syslog=options.syslog, 684 console_logging=console_logging) 685 686 # Reopen log file(s) on SIGHUP 687 signal.signal(signal.SIGHUP, 688 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn])) 689 690 utils.WritePidFile(utils.DaemonPidFileName(daemon_name)) 691 try: 692 try: 693 logging.info("%s daemon startup", daemon_name) 694 if callable(prepare_fn): 695 prep_results = prepare_fn(options, args) 696 else: 697 prep_results = None 698 except Exception, err: 699 utils.WriteErrorToFD(wpipe, _BeautifyError(err)) 700 raise 701 702 if wpipe is not None: 703 # we're done with the preparation phase, we close the pipe to 704 # let the parent know it's safe to exit 705 os.close(wpipe) 706 707 exec_fn(options, args, prep_results) 708 finally: 709 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
710