Package ganeti :: Module daemon
[hide private]
[frames] | no frames]

Source Code for Module ganeti.daemon

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module with helper classes and functions for daemons""" 
 23   
 24   
 25  import asyncore 
 26  import asynchat 
 27  import collections 
 28  import os 
 29  import signal 
 30  import logging 
 31  import sched 
 32  import time 
 33  import socket 
 34  import select 
 35  import sys 
 36   
 37  from ganeti import utils 
 38  from ganeti import constants 
 39  from ganeti import errors 
 40  from ganeti import netutils 
 41  from ganeti import ssconf 
 42  from ganeti import runtime 
43 44 45 -class SchedulerBreakout(Exception):
46 """Exception used to get out of the scheduler loop 47 48 """
49
50 51 -def AsyncoreDelayFunction(timeout):
52 """Asyncore-compatible scheduler delay function. 53 54 This is a delay function for sched that, rather than actually sleeping, 55 executes asyncore events happening in the meantime. 56 57 After an event has occurred, rather than returning, it raises a 58 SchedulerBreakout exception, which will force the current scheduler.run() 59 invocation to terminate, so that we can also check for signals. The main loop 60 will then call the scheduler run again, which will allow it to actually 61 process any due events. 62 63 This is needed because scheduler.run() doesn't support a count=..., as 64 asyncore loop, and the scheduler module documents throwing exceptions from 65 inside the delay function as an allowed usage model. 66 67 """ 68 asyncore.loop(timeout=timeout, count=1, use_poll=True) 69 raise SchedulerBreakout()
70
71 72 -class AsyncoreScheduler(sched.scheduler):
73 """Event scheduler integrated with asyncore 74 75 """
76 - def __init__(self, timefunc):
77 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
78
79 80 -class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
81 """Base Ganeti Asyncore Dispacher 82 83 """ 84 # this method is overriding an asyncore.dispatcher method
85 - def handle_error(self):
86 """Log an error in handling any request, and proceed. 87 88 """ 89 logging.exception("Error while handling asyncore request")
90 91 # this method is overriding an asyncore.dispatcher method
92 - def writable(self):
93 """Most of the time we don't want to check for writability. 94 95 """ 96 return False
97
98 99 -class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
100 """A stream server to use with asyncore. 101 102 Each request is accepted, and then dispatched to a separate asyncore 103 dispatcher to handle. 104 105 """ 106 107 _REQUEST_QUEUE_SIZE = 5 108
109 - def __init__(self, family, address):
110 """Constructor for AsyncUnixStreamSocket 111 112 @type family: integer 113 @param family: socket family (one of socket.AF_*) 114 @type address: address family dependent 115 @param address: address to bind the socket to 116 117 """ 118 GanetiBaseAsyncoreDispatcher.__init__(self) 119 self.family = family 120 self.create_socket(self.family, socket.SOCK_STREAM) 121 self.set_reuse_addr() 122 self.bind(address) 123 self.listen(self._REQUEST_QUEUE_SIZE)
124 125 # this method is overriding an asyncore.dispatcher method
126 - def handle_accept(self):
127 """Accept a new client connection. 128 129 Creates a new instance of the handler class, which will use asyncore to 130 serve the client. 131 132 """ 133 accept_result = utils.IgnoreSignals(self.accept) 134 if accept_result is not None: 135 connected_socket, client_address = accept_result 136 if self.family == socket.AF_UNIX: 137 # override the client address, as for unix sockets nothing meaningful 138 # is passed in from accept anyway 139 client_address = netutils.GetSocketCredentials(connected_socket) 140 logging.info("Accepted connection from %s", 141 netutils.FormatAddress(client_address, family=self.family)) 142 self.handle_connection(connected_socket, client_address)
143
144 - def handle_connection(self, connected_socket, client_address):
145 """Handle an already accepted connection. 146 147 """ 148 raise NotImplementedError
149
150 151 -class AsyncTerminatedMessageStream(asynchat.async_chat):
152 """A terminator separated message stream asyncore module. 153 154 Handles a stream connection receiving messages terminated by a defined 155 separator. For each complete message handle_message is called. 156 157 """
158 - def __init__(self, connected_socket, peer_address, terminator, family, 159 unhandled_limit):
160 """AsyncTerminatedMessageStream constructor. 161 162 @type connected_socket: socket.socket 163 @param connected_socket: connected stream socket to receive messages from 164 @param peer_address: family-specific peer address 165 @type terminator: string 166 @param terminator: terminator separating messages in the stream 167 @type family: integer 168 @param family: socket family 169 @type unhandled_limit: integer or None 170 @param unhandled_limit: maximum unanswered messages 171 172 """ 173 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by 174 # using a positional argument rather than a keyword one. 175 asynchat.async_chat.__init__(self, connected_socket) 176 self.connected_socket = connected_socket 177 # on python 2.4 there is no "family" attribute for the socket class 178 # FIXME: when we move to python 2.5 or above remove the family parameter 179 #self.family = self.connected_socket.family 180 self.family = family 181 self.peer_address = peer_address 182 self.terminator = terminator 183 self.unhandled_limit = unhandled_limit 184 self.set_terminator(terminator) 185 self.ibuffer = [] 186 self.receive_count = 0 187 self.send_count = 0 188 self.oqueue = collections.deque() 189 self.iqueue = collections.deque()
190 191 # this method is overriding an asynchat.async_chat method
192 - def collect_incoming_data(self, data):
193 self.ibuffer.append(data)
194
195 - def _can_handle_message(self):
196 return (self.unhandled_limit is None or 197 (self.receive_count < self.send_count + self.unhandled_limit) and 198 not self.iqueue)
199 200 # this method is overriding an asynchat.async_chat method
201 - def found_terminator(self):
202 message = "".join(self.ibuffer) 203 self.ibuffer = [] 204 message_id = self.receive_count 205 # We need to increase the receive_count after checking if the message can 206 # be handled, but before calling handle_message 207 can_handle = self._can_handle_message() 208 self.receive_count += 1 209 if can_handle: 210 self.handle_message(message, message_id) 211 else: 212 self.iqueue.append((message, message_id))
213
214 - def handle_message(self, message, message_id):
215 """Handle a terminated message. 216 217 @type message: string 218 @param message: message to handle 219 @type message_id: integer 220 @param message_id: stream's message sequence number 221 222 """ 223 pass
224 # TODO: move this method to raise NotImplementedError 225 # raise NotImplementedError 226
227 - def send_message(self, message):
228 """Send a message to the remote peer. This function is thread-safe. 229 230 @type message: string 231 @param message: message to send, without the terminator 232 233 @warning: If calling this function from a thread different than the one 234 performing the main asyncore loop, remember that you have to wake that one 235 up. 236 237 """ 238 # If we just append the message we received to the output queue, this 239 # function can be safely called by multiple threads at the same time, and 240 # we don't need locking, since deques are thread safe. handle_write in the 241 # asyncore thread will handle the next input message if there are any 242 # enqueued. 243 self.oqueue.append(message)
244 245 # this method is overriding an asyncore.dispatcher method
246 - def readable(self):
247 # read from the socket if we can handle the next requests 248 return self._can_handle_message() and asynchat.async_chat.readable(self)
249 250 # this method is overriding an asyncore.dispatcher method
251 - def writable(self):
252 # the output queue may become full just after we called writable. This only 253 # works if we know we'll have something else waking us up from the select, 254 # in such case, anyway. 255 return asynchat.async_chat.writable(self) or self.oqueue
256 257 # this method is overriding an asyncore.dispatcher method
258 - def handle_write(self):
259 if self.oqueue: 260 # if we have data in the output queue, then send_message was called. 261 # this means we can process one more message from the input queue, if 262 # there are any. 263 data = self.oqueue.popleft() 264 self.push(data + self.terminator) 265 self.send_count += 1 266 if self.iqueue: 267 self.handle_message(*self.iqueue.popleft()) 268 self.initiate_send()
269
270 - def close_log(self):
271 logging.info("Closing connection from %s", 272 netutils.FormatAddress(self.peer_address, family=self.family)) 273 self.close()
274 275 # this method is overriding an asyncore.dispatcher method
276 - def handle_expt(self):
277 self.close_log()
278 279 # this method is overriding an asyncore.dispatcher method
280 - def handle_error(self):
281 """Log an error in handling any request, and proceed. 282 283 """ 284 logging.exception("Error while handling asyncore request") 285 self.close_log()
286
287 288 -class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
289 """An improved asyncore udp socket. 290 291 """
292 - def __init__(self, family):
293 """Constructor for AsyncUDPSocket 294 295 """ 296 GanetiBaseAsyncoreDispatcher.__init__(self) 297 self._out_queue = [] 298 self._family = family 299 self.create_socket(family, socket.SOCK_DGRAM)
300 301 # this method is overriding an asyncore.dispatcher method
302 - def handle_connect(self):
303 # Python thinks that the first udp message from a source qualifies as a 304 # "connect" and further ones are part of the same connection. We beg to 305 # differ and treat all messages equally. 306 pass
307 308 # this method is overriding an asyncore.dispatcher method
309 - def handle_read(self):
310 recv_result = utils.IgnoreSignals(self.recvfrom, 311 constants.MAX_UDP_DATA_SIZE) 312 if recv_result is not None: 313 payload, address = recv_result 314 if self._family == socket.AF_INET6: 315 # we ignore 'flow info' and 'scope id' as we don't need them 316 ip, port, _, _ = address 317 else: 318 ip, port = address 319 320 self.handle_datagram(payload, ip, port)
321
322 - def handle_datagram(self, payload, ip, port):
323 """Handle an already read udp datagram 324 325 """ 326 raise NotImplementedError
327 328 # this method is overriding an asyncore.dispatcher method
329 - def writable(self):
330 # We should check whether we can write to the socket only if we have 331 # something scheduled to be written 332 return bool(self._out_queue)
333 334 # this method is overriding an asyncore.dispatcher method
335 - def handle_write(self):
336 if not self._out_queue: 337 logging.error("handle_write called with empty output queue") 338 return 339 (ip, port, payload) = self._out_queue[0] 340 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) 341 self._out_queue.pop(0)
342
343 - def enqueue_send(self, ip, port, payload):
344 """Enqueue a datagram to be sent when possible 345 346 """ 347 if len(payload) > constants.MAX_UDP_DATA_SIZE: 348 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload), 349 constants.MAX_UDP_DATA_SIZE)) 350 self._out_queue.append((ip, port, payload))
351
352 - def process_next_packet(self, timeout=0):
353 """Process the next datagram, waiting for it if necessary. 354 355 @type timeout: float 356 @param timeout: how long to wait for data 357 @rtype: boolean 358 @return: True if some data has been handled, False otherwise 359 360 """ 361 result = utils.WaitForFdCondition(self, select.POLLIN, timeout) 362 if result is not None and result & select.POLLIN: 363 self.handle_read() 364 return True 365 else: 366 return False
367
368 369 -class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
370 """A way to notify the asyncore loop that something is going on. 371 372 If an asyncore daemon is multithreaded when a thread tries to push some data 373 to a socket, the main loop handling asynchronous requests might be sleeping 374 waiting on a select(). To avoid this it can create an instance of the 375 AsyncAwaker, which other threads can use to wake it up. 376 377 """
378 - def __init__(self, signal_fn=None):
379 """Constructor for AsyncAwaker 380 381 @type signal_fn: function 382 @param signal_fn: function to call when awaken 383 384 """ 385 GanetiBaseAsyncoreDispatcher.__init__(self) 386 assert signal_fn == None or callable(signal_fn) 387 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, 388 socket.SOCK_STREAM) 389 self.in_socket.setblocking(0) 390 self.in_socket.shutdown(socket.SHUT_WR) 391 self.out_socket.shutdown(socket.SHUT_RD) 392 self.set_socket(self.in_socket) 393 self.need_signal = True 394 self.signal_fn = signal_fn 395 self.connected = True
396 397 # this method is overriding an asyncore.dispatcher method
398 - def handle_read(self):
399 utils.IgnoreSignals(self.recv, 4096) 400 if self.signal_fn: 401 self.signal_fn() 402 self.need_signal = True
403 404 # this method is overriding an asyncore.dispatcher method
405 - def close(self):
406 asyncore.dispatcher.close(self) 407 self.out_socket.close()
408
409 - def signal(self):
410 """Signal the asyncore main loop. 411 412 Any data we send here will be ignored, but it will cause the select() call 413 to return. 414 415 """ 416 # Yes, there is a race condition here. No, we don't care, at worst we're 417 # sending more than one wakeup token, which doesn't harm at all. 418 if self.need_signal: 419 self.need_signal = False 420 self.out_socket.send("\0")
421
422 423 -class Mainloop(object):
424 """Generic mainloop for daemons 425 426 @ivar scheduler: A sched.scheduler object, which can be used to register 427 timed events 428 429 """
430 - def __init__(self):
431 """Constructs a new Mainloop instance. 432 433 """ 434 self._signal_wait = [] 435 self.scheduler = AsyncoreScheduler(time.time)
436 437 @utils.SignalHandled([signal.SIGCHLD]) 438 @utils.SignalHandled([signal.SIGTERM]) 439 @utils.SignalHandled([signal.SIGINT])
440 - def Run(self, signal_handlers=None):
441 """Runs the mainloop. 442 443 @type signal_handlers: dict 444 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator 445 446 """ 447 assert isinstance(signal_handlers, dict) and \ 448 len(signal_handlers) > 0, \ 449 "Broken SignalHandled decorator" 450 running = True 451 # Start actual main loop 452 while running: 453 if not self.scheduler.empty(): 454 try: 455 self.scheduler.run() 456 except SchedulerBreakout: 457 pass 458 else: 459 asyncore.loop(count=1, use_poll=True) 460 461 # Check whether a signal was raised 462 for sig in signal_handlers: 463 handler = signal_handlers[sig] 464 if handler.called: 465 self._CallSignalWaiters(sig) 466 running = sig not in (signal.SIGTERM, signal.SIGINT) 467 handler.Clear()
468
469 - def _CallSignalWaiters(self, signum):
470 """Calls all signal waiters for a certain signal. 471 472 @type signum: int 473 @param signum: Signal number 474 475 """ 476 for owner in self._signal_wait: 477 owner.OnSignal(signum)
478
479 - def RegisterSignal(self, owner):
480 """Registers a receiver for signal notifications 481 482 The receiver must support a "OnSignal(self, signum)" function. 483 484 @type owner: instance 485 @param owner: Receiver 486 487 """ 488 self._signal_wait.append(owner)
489
490 491 -def _VerifyDaemonUser(daemon_name):
492 """Verifies the process uid matches the configured uid. 493 494 This method verifies that a daemon is started as the user it is 495 intended to be run 496 497 @param daemon_name: The name of daemon to be started 498 @return: A tuple with the first item indicating success or not, 499 the second item current uid and third with expected uid 500 501 """ 502 getents = runtime.GetEnts() 503 running_uid = os.getuid() 504 daemon_uids = { 505 constants.MASTERD: getents.masterd_uid, 506 constants.RAPI: getents.rapi_uid, 507 constants.NODED: getents.noded_uid, 508 constants.CONFD: getents.confd_uid, 509 } 510 511 return (daemon_uids[daemon_name] == running_uid, running_uid, 512 daemon_uids[daemon_name])
513
514 515 -def _BeautifyError(err):
516 """Try to format an error better. 517 518 Since we're dealing with daemon startup errors, in many cases this 519 will be due to socket error and such, so we try to format these cases better. 520 521 @param err: an exception object 522 @rtype: string 523 @return: the formatted error description 524 525 """ 526 try: 527 if isinstance(err, socket.error): 528 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0]) 529 elif isinstance(err, EnvironmentError): 530 if err.filename is None: 531 return "%s (errno=%s)" % (err.strerror, err.errno) 532 else: 533 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename, 534 err.errno) 535 else: 536 return str(err) 537 except Exception: # pylint: disable-msg=W0703 538 logging.exception("Error while handling existing error %s", err) 539 return "%s" % str(err)
540
541 542 -def GenericMain(daemon_name, optionparser, 543 check_fn, prepare_fn, exec_fn, 544 multithreaded=False, console_logging=False, 545 default_ssl_cert=None, default_ssl_key=None):
546 """Shared main function for daemons. 547 548 @type daemon_name: string 549 @param daemon_name: daemon name 550 @type optionparser: optparse.OptionParser 551 @param optionparser: initialized optionparser with daemon-specific options 552 (common -f -d options will be handled by this module) 553 @type check_fn: function which accepts (options, args) 554 @param check_fn: function that checks start conditions and exits if they're 555 not met 556 @type prepare_fn: function which accepts (options, args) 557 @param prepare_fn: function that is run before forking, or None; 558 it's result will be passed as the third parameter to exec_fn, or 559 if None was passed in, we will just pass None to exec_fn 560 @type exec_fn: function which accepts (options, args, prepare_results) 561 @param exec_fn: function that's executed with the daemon's pid file held, and 562 runs the daemon itself. 563 @type multithreaded: bool 564 @param multithreaded: Whether the daemon uses threads 565 @type console_logging: boolean 566 @param console_logging: if True, the daemon will fall back to the system 567 console if logging fails 568 @type default_ssl_cert: string 569 @param default_ssl_cert: Default SSL certificate path 570 @type default_ssl_key: string 571 @param default_ssl_key: Default SSL key path 572 573 """ 574 optionparser.add_option("-f", "--foreground", dest="fork", 575 help="Don't detach from the current terminal", 576 default=True, action="store_false") 577 optionparser.add_option("-d", "--debug", dest="debug", 578 help="Enable some debug messages", 579 default=False, action="store_true") 580 optionparser.add_option("--syslog", dest="syslog", 581 help="Enable logging to syslog (except debug" 582 " messages); one of 'no', 'yes' or 'only' [%s]" % 583 constants.SYSLOG_USAGE, 584 default=constants.SYSLOG_USAGE, 585 choices=["no", "yes", "only"]) 586 587 if daemon_name in constants.DAEMONS_PORTS: 588 default_bind_address = constants.IP4_ADDRESS_ANY 589 family = ssconf.SimpleStore().GetPrimaryIPFamily() 590 # family will default to AF_INET if there is no ssconf file (e.g. when 591 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters 592 # <= 2.2 can not be AF_INET6 593 if family == netutils.IP6Address.family: 594 default_bind_address = constants.IP6_ADDRESS_ANY 595 596 default_port = netutils.GetDaemonPort(daemon_name) 597 598 # For networked daemons we allow choosing the port and bind address 599 optionparser.add_option("-p", "--port", dest="port", 600 help="Network port (default: %s)" % default_port, 601 default=default_port, type="int") 602 optionparser.add_option("-b", "--bind", dest="bind_address", 603 help=("Bind address (default: '%s')" % 604 default_bind_address), 605 default=default_bind_address, metavar="ADDRESS") 606 607 if default_ssl_key is not None and default_ssl_cert is not None: 608 optionparser.add_option("--no-ssl", dest="ssl", 609 help="Do not secure HTTP protocol with SSL", 610 default=True, action="store_false") 611 optionparser.add_option("-K", "--ssl-key", dest="ssl_key", 612 help=("SSL key path (default: %s)" % 613 default_ssl_key), 614 default=default_ssl_key, type="string", 615 metavar="SSL_KEY_PATH") 616 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", 617 help=("SSL certificate path (default: %s)" % 618 default_ssl_cert), 619 default=default_ssl_cert, type="string", 620 metavar="SSL_CERT_PATH") 621 622 # Disable the use of fork(2) if the daemon uses threads 623 utils.no_fork = multithreaded 624 625 options, args = optionparser.parse_args() 626 627 if getattr(options, "ssl", False): 628 ssl_paths = { 629 "certificate": options.ssl_cert, 630 "key": options.ssl_key, 631 } 632 633 for name, path in ssl_paths.iteritems(): 634 if not os.path.isfile(path): 635 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path) 636 sys.exit(constants.EXIT_FAILURE) 637 638 # TODO: By initiating http.HttpSslParams here we would only read the files 639 # once and have a proper validation (isfile returns False on directories) 640 # at the same time. 641 642 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name) 643 if not result: 644 msg = ("%s started using wrong user ID (%d), expected %d" % 645 (daemon_name, running_uid, expected_uid)) 646 print >> sys.stderr, msg 647 sys.exit(constants.EXIT_FAILURE) 648 649 if check_fn is not None: 650 check_fn(options, args) 651 652 if options.fork: 653 utils.CloseFDs() 654 wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name]) 655 else: 656 wpipe = None 657 658 utils.WritePidFile(utils.DaemonPidFileName(daemon_name)) 659 try: 660 try: 661 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name], 662 debug=options.debug, 663 stderr_logging=not options.fork, 664 multithreaded=multithreaded, 665 program=daemon_name, 666 syslog=options.syslog, 667 console_logging=console_logging) 668 if callable(prepare_fn): 669 prep_results = prepare_fn(options, args) 670 else: 671 prep_results = None 672 logging.info("%s daemon startup", daemon_name) 673 except Exception, err: 674 utils.WriteErrorToFD(wpipe, _BeautifyError(err)) 675 raise 676 677 if wpipe is not None: 678 # we're done with the preparation phase, we close the pipe to 679 # let the parent know it's safe to exit 680 os.close(wpipe) 681 682 exec_fn(options, args, prep_results) 683 finally: 684 utils.RemovePidFile(daemon_name)
685