Package ganeti :: Module daemon
[hide private]
[frames] | no frames]

Source Code for Module ganeti.daemon

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module with helper classes and functions for daemons""" 
 23   
 24   
 25  import asyncore 
 26  import asynchat 
 27  import collections 
 28  import grp 
 29  import os 
 30  import pwd 
 31  import signal 
 32  import logging 
 33  import sched 
 34  import time 
 35  import socket 
 36  import select 
 37  import sys 
 38   
 39  from ganeti import utils 
 40  from ganeti import constants 
 41  from ganeti import errors 
 42  from ganeti import netutils 
 43   
 44   
 45  _DEFAULT_RUN_USER = "root" 
 46  _DEFAULT_RUN_GROUP = "root" 
47 48 49 -class SchedulerBreakout(Exception):
50 """Exception used to get out of the scheduler loop 51 52 """
53
54 55 -def AsyncoreDelayFunction(timeout):
56 """Asyncore-compatible scheduler delay function. 57 58 This is a delay function for sched that, rather than actually sleeping, 59 executes asyncore events happening in the meantime. 60 61 After an event has occurred, rather than returning, it raises a 62 SchedulerBreakout exception, which will force the current scheduler.run() 63 invocation to terminate, so that we can also check for signals. The main loop 64 will then call the scheduler run again, which will allow it to actually 65 process any due events. 66 67 This is needed because scheduler.run() doesn't support a count=..., as 68 asyncore loop, and the scheduler module documents throwing exceptions from 69 inside the delay function as an allowed usage model. 70 71 """ 72 asyncore.loop(timeout=timeout, count=1, use_poll=True) 73 raise SchedulerBreakout()
74
75 76 -class AsyncoreScheduler(sched.scheduler):
77 """Event scheduler integrated with asyncore 78 79 """
80 - def __init__(self, timefunc):
81 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
82
83 84 -class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
85 """Base Ganeti Asyncore Dispacher 86 87 """ 88 # this method is overriding an asyncore.dispatcher method
89 - def handle_error(self):
90 """Log an error in handling any request, and proceed. 91 92 """ 93 logging.exception("Error while handling asyncore request")
94 95 # this method is overriding an asyncore.dispatcher method
96 - def writable(self):
97 """Most of the time we don't want to check for writability. 98 99 """ 100 return False
101
102 103 -def FormatAddress(family, address):
104 """Format a client's address 105 106 @type family: integer 107 @param family: socket family (one of socket.AF_*) 108 @type address: family specific (usually tuple) 109 @param address: address, as reported by this class 110 111 """ 112 if family == socket.AF_INET and len(address) == 2: 113 return "%s:%d" % address 114 elif family == socket.AF_UNIX and len(address) == 3: 115 return "pid=%s, uid=%s, gid=%s" % address 116 else: 117 return str(address)
118
119 120 -class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
121 """A stream server to use with asyncore. 122 123 Each request is accepted, and then dispatched to a separate asyncore 124 dispatcher to handle. 125 126 """ 127 128 _REQUEST_QUEUE_SIZE = 5 129
130 - def __init__(self, family, address):
131 """Constructor for AsyncUnixStreamSocket 132 133 @type family: integer 134 @param family: socket family (one of socket.AF_*) 135 @type address: address family dependent 136 @param address: address to bind the socket to 137 138 """ 139 GanetiBaseAsyncoreDispatcher.__init__(self) 140 self.family = family 141 self.create_socket(self.family, socket.SOCK_STREAM) 142 self.set_reuse_addr() 143 self.bind(address) 144 self.listen(self._REQUEST_QUEUE_SIZE)
145 146 # this method is overriding an asyncore.dispatcher method
147 - def handle_accept(self):
148 """Accept a new client connection. 149 150 Creates a new instance of the handler class, which will use asyncore to 151 serve the client. 152 153 """ 154 accept_result = utils.IgnoreSignals(self.accept) 155 if accept_result is not None: 156 connected_socket, client_address = accept_result 157 if self.family == socket.AF_UNIX: 158 # override the client address, as for unix sockets nothing meaningful 159 # is passed in from accept anyway 160 client_address = netutils.GetSocketCredentials(connected_socket) 161 logging.info("Accepted connection from %s", 162 FormatAddress(self.family, client_address)) 163 self.handle_connection(connected_socket, client_address)
164
165 - def handle_connection(self, connected_socket, client_address):
166 """Handle an already accepted connection. 167 168 """ 169 raise NotImplementedError
170
171 172 -class AsyncTerminatedMessageStream(asynchat.async_chat):
173 """A terminator separated message stream asyncore module. 174 175 Handles a stream connection receiving messages terminated by a defined 176 separator. For each complete message handle_message is called. 177 178 """
179 - def __init__(self, connected_socket, peer_address, terminator, family, 180 unhandled_limit):
181 """AsyncTerminatedMessageStream constructor. 182 183 @type connected_socket: socket.socket 184 @param connected_socket: connected stream socket to receive messages from 185 @param peer_address: family-specific peer address 186 @type terminator: string 187 @param terminator: terminator separating messages in the stream 188 @type family: integer 189 @param family: socket family 190 @type unhandled_limit: integer or None 191 @param unhandled_limit: maximum unanswered messages 192 193 """ 194 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by 195 # using a positional argument rather than a keyword one. 196 asynchat.async_chat.__init__(self, connected_socket) 197 self.connected_socket = connected_socket 198 # on python 2.4 there is no "family" attribute for the socket class 199 # FIXME: when we move to python 2.5 or above remove the family parameter 200 #self.family = self.connected_socket.family 201 self.family = family 202 self.peer_address = peer_address 203 self.terminator = terminator 204 self.unhandled_limit = unhandled_limit 205 self.set_terminator(terminator) 206 self.ibuffer = [] 207 self.receive_count = 0 208 self.send_count = 0 209 self.oqueue = collections.deque() 210 self.iqueue = collections.deque()
211 212 # this method is overriding an asynchat.async_chat method
213 - def collect_incoming_data(self, data):
214 self.ibuffer.append(data)
215
216 - def _can_handle_message(self):
217 return (self.unhandled_limit is None or 218 (self.receive_count < self.send_count + self.unhandled_limit) and 219 not self.iqueue)
220 221 # this method is overriding an asynchat.async_chat method
222 - def found_terminator(self):
223 message = "".join(self.ibuffer) 224 self.ibuffer = [] 225 message_id = self.receive_count 226 # We need to increase the receive_count after checking if the message can 227 # be handled, but before calling handle_message 228 can_handle = self._can_handle_message() 229 self.receive_count += 1 230 if can_handle: 231 self.handle_message(message, message_id) 232 else: 233 self.iqueue.append((message, message_id))
234
235 - def handle_message(self, message, message_id):
236 """Handle a terminated message. 237 238 @type message: string 239 @param message: message to handle 240 @type message_id: integer 241 @param message_id: stream's message sequence number 242 243 """ 244 pass
245 # TODO: move this method to raise NotImplementedError 246 # raise NotImplementedError 247
248 - def send_message(self, message):
249 """Send a message to the remote peer. This function is thread-safe. 250 251 @type message: string 252 @param message: message to send, without the terminator 253 254 @warning: If calling this function from a thread different than the one 255 performing the main asyncore loop, remember that you have to wake that one 256 up. 257 258 """ 259 # If we just append the message we received to the output queue, this 260 # function can be safely called by multiple threads at the same time, and 261 # we don't need locking, since deques are thread safe. handle_write in the 262 # asyncore thread will handle the next input message if there are any 263 # enqueued. 264 self.oqueue.append(message)
265 266 # this method is overriding an asyncore.dispatcher method
267 - def readable(self):
268 # read from the socket if we can handle the next requests 269 return self._can_handle_message() and asynchat.async_chat.readable(self)
270 271 # this method is overriding an asyncore.dispatcher method
272 - def writable(self):
273 # the output queue may become full just after we called writable. This only 274 # works if we know we'll have something else waking us up from the select, 275 # in such case, anyway. 276 return asynchat.async_chat.writable(self) or self.oqueue
277 278 # this method is overriding an asyncore.dispatcher method
279 - def handle_write(self):
280 if self.oqueue: 281 # if we have data in the output queue, then send_message was called. 282 # this means we can process one more message from the input queue, if 283 # there are any. 284 data = self.oqueue.popleft() 285 self.push(data + self.terminator) 286 self.send_count += 1 287 if self.iqueue: 288 self.handle_message(*self.iqueue.popleft()) 289 self.initiate_send()
290
291 - def close_log(self):
292 logging.info("Closing connection from %s", 293 FormatAddress(self.family, self.peer_address)) 294 self.close()
295 296 # this method is overriding an asyncore.dispatcher method
297 - def handle_expt(self):
298 self.close_log()
299 300 # this method is overriding an asyncore.dispatcher method
301 - def handle_error(self):
302 """Log an error in handling any request, and proceed. 303 304 """ 305 logging.exception("Error while handling asyncore request") 306 self.close_log()
307
308 309 -class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
310 """An improved asyncore udp socket. 311 312 """
313 - def __init__(self, family):
314 """Constructor for AsyncUDPSocket 315 316 """ 317 GanetiBaseAsyncoreDispatcher.__init__(self) 318 self._out_queue = [] 319 self._family = family 320 self.create_socket(family, socket.SOCK_DGRAM)
321 322 # this method is overriding an asyncore.dispatcher method
323 - def handle_connect(self):
324 # Python thinks that the first udp message from a source qualifies as a 325 # "connect" and further ones are part of the same connection. We beg to 326 # differ and treat all messages equally. 327 pass
328 329 # this method is overriding an asyncore.dispatcher method
330 - def handle_read(self):
331 recv_result = utils.IgnoreSignals(self.recvfrom, 332 constants.MAX_UDP_DATA_SIZE) 333 if recv_result is not None: 334 payload, address = recv_result 335 if self._family == socket.AF_INET6: 336 # we ignore 'flow info' and 'scope id' as we don't need them 337 ip, port, _, _ = address 338 else: 339 ip, port = address 340 341 self.handle_datagram(payload, ip, port)
342
343 - def handle_datagram(self, payload, ip, port):
344 """Handle an already read udp datagram 345 346 """ 347 raise NotImplementedError
348 349 # this method is overriding an asyncore.dispatcher method
350 - def writable(self):
351 # We should check whether we can write to the socket only if we have 352 # something scheduled to be written 353 return bool(self._out_queue)
354 355 # this method is overriding an asyncore.dispatcher method
356 - def handle_write(self):
357 if not self._out_queue: 358 logging.error("handle_write called with empty output queue") 359 return 360 (ip, port, payload) = self._out_queue[0] 361 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) 362 self._out_queue.pop(0)
363
364 - def enqueue_send(self, ip, port, payload):
365 """Enqueue a datagram to be sent when possible 366 367 """ 368 if len(payload) > constants.MAX_UDP_DATA_SIZE: 369 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload), 370 constants.MAX_UDP_DATA_SIZE)) 371 self._out_queue.append((ip, port, payload))
372
373 - def process_next_packet(self, timeout=0):
374 """Process the next datagram, waiting for it if necessary. 375 376 @type timeout: float 377 @param timeout: how long to wait for data 378 @rtype: boolean 379 @return: True if some data has been handled, False otherwise 380 381 """ 382 result = utils.WaitForFdCondition(self, select.POLLIN, timeout) 383 if result is not None and result & select.POLLIN: 384 self.handle_read() 385 return True 386 else: 387 return False
388
389 390 -class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
391 """A way to notify the asyncore loop that something is going on. 392 393 If an asyncore daemon is multithreaded when a thread tries to push some data 394 to a socket, the main loop handling asynchronous requests might be sleeping 395 waiting on a select(). To avoid this it can create an instance of the 396 AsyncAwaker, which other threads can use to wake it up. 397 398 """
399 - def __init__(self, signal_fn=None):
400 """Constructor for AsyncAwaker 401 402 @type signal_fn: function 403 @param signal_fn: function to call when awaken 404 405 """ 406 GanetiBaseAsyncoreDispatcher.__init__(self) 407 assert signal_fn == None or callable(signal_fn) 408 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, 409 socket.SOCK_STREAM) 410 self.in_socket.setblocking(0) 411 self.in_socket.shutdown(socket.SHUT_WR) 412 self.out_socket.shutdown(socket.SHUT_RD) 413 self.set_socket(self.in_socket) 414 self.need_signal = True 415 self.signal_fn = signal_fn 416 self.connected = True
417 418 # this method is overriding an asyncore.dispatcher method
419 - def handle_read(self):
420 utils.IgnoreSignals(self.recv, 4096) 421 if self.signal_fn: 422 self.signal_fn() 423 self.need_signal = True
424 425 # this method is overriding an asyncore.dispatcher method
426 - def close(self):
427 asyncore.dispatcher.close(self) 428 self.out_socket.close()
429
430 - def signal(self):
431 """Signal the asyncore main loop. 432 433 Any data we send here will be ignored, but it will cause the select() call 434 to return. 435 436 """ 437 # Yes, there is a race condition here. No, we don't care, at worst we're 438 # sending more than one wakeup token, which doesn't harm at all. 439 if self.need_signal: 440 self.need_signal = False 441 self.out_socket.send("\0")
442
443 444 -class Mainloop(object):
445 """Generic mainloop for daemons 446 447 @ivar scheduler: A sched.scheduler object, which can be used to register 448 timed events 449 450 """
451 - def __init__(self):
452 """Constructs a new Mainloop instance. 453 454 """ 455 self._signal_wait = [] 456 self.scheduler = AsyncoreScheduler(time.time)
457 458 @utils.SignalHandled([signal.SIGCHLD]) 459 @utils.SignalHandled([signal.SIGTERM]) 460 @utils.SignalHandled([signal.SIGINT])
461 - def Run(self, signal_handlers=None):
462 """Runs the mainloop. 463 464 @type signal_handlers: dict 465 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator 466 467 """ 468 assert isinstance(signal_handlers, dict) and \ 469 len(signal_handlers) > 0, \ 470 "Broken SignalHandled decorator" 471 running = True 472 # Start actual main loop 473 while running: 474 if not self.scheduler.empty(): 475 try: 476 self.scheduler.run() 477 except SchedulerBreakout: 478 pass 479 else: 480 asyncore.loop(count=1, use_poll=True) 481 482 # Check whether a signal was raised 483 for sig in signal_handlers: 484 handler = signal_handlers[sig] 485 if handler.called: 486 self._CallSignalWaiters(sig) 487 running = sig not in (signal.SIGTERM, signal.SIGINT) 488 handler.Clear()
489
490 - def _CallSignalWaiters(self, signum):
491 """Calls all signal waiters for a certain signal. 492 493 @type signum: int 494 @param signum: Signal number 495 496 """ 497 for owner in self._signal_wait: 498 owner.OnSignal(signum)
499
500 - def RegisterSignal(self, owner):
501 """Registers a receiver for signal notifications 502 503 The receiver must support a "OnSignal(self, signum)" function. 504 505 @type owner: instance 506 @param owner: Receiver 507 508 """ 509 self._signal_wait.append(owner)
510
511 512 -def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn, 513 multithreaded=False, console_logging=False, 514 default_ssl_cert=None, default_ssl_key=None, 515 user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
516 """Shared main function for daemons. 517 518 @type daemon_name: string 519 @param daemon_name: daemon name 520 @type optionparser: optparse.OptionParser 521 @param optionparser: initialized optionparser with daemon-specific options 522 (common -f -d options will be handled by this module) 523 @type dirs: list of (string, integer) 524 @param dirs: list of directories that must be created if they don't exist, 525 and the permissions to be used to create them 526 @type check_fn: function which accepts (options, args) 527 @param check_fn: function that checks start conditions and exits if they're 528 not met 529 @type exec_fn: function which accepts (options, args) 530 @param exec_fn: function that's executed with the daemon's pid file held, and 531 runs the daemon itself. 532 @type multithreaded: bool 533 @param multithreaded: Whether the daemon uses threads 534 @type console_logging: boolean 535 @param console_logging: if True, the daemon will fall back to the system 536 console if logging fails 537 @type default_ssl_cert: string 538 @param default_ssl_cert: Default SSL certificate path 539 @type default_ssl_key: string 540 @param default_ssl_key: Default SSL key path 541 @param user: Default user to run as 542 @type user: string 543 @param group: Default group to run as 544 @type group: string 545 546 """ 547 optionparser.add_option("-f", "--foreground", dest="fork", 548 help="Don't detach from the current terminal", 549 default=True, action="store_false") 550 optionparser.add_option("-d", "--debug", dest="debug", 551 help="Enable some debug messages", 552 default=False, action="store_true") 553 optionparser.add_option("--syslog", dest="syslog", 554 help="Enable logging to syslog (except debug" 555 " messages); one of 'no', 'yes' or 'only' [%s]" % 556 constants.SYSLOG_USAGE, 557 default=constants.SYSLOG_USAGE, 558 choices=["no", "yes", "only"]) 559 560 if daemon_name in constants.DAEMONS_PORTS: 561 default_bind_address = constants.IP4_ADDRESS_ANY 562 default_port = netutils.GetDaemonPort(daemon_name) 563 564 # For networked daemons we allow choosing the port and bind address 565 optionparser.add_option("-p", "--port", dest="port", 566 help="Network port (default: %s)" % default_port, 567 default=default_port, type="int") 568 optionparser.add_option("-b", "--bind", dest="bind_address", 569 help=("Bind address (default: %s)" % 570 default_bind_address), 571 default=default_bind_address, metavar="ADDRESS") 572 573 if default_ssl_key is not None and default_ssl_cert is not None: 574 optionparser.add_option("--no-ssl", dest="ssl", 575 help="Do not secure HTTP protocol with SSL", 576 default=True, action="store_false") 577 optionparser.add_option("-K", "--ssl-key", dest="ssl_key", 578 help=("SSL key path (default: %s)" % 579 default_ssl_key), 580 default=default_ssl_key, type="string", 581 metavar="SSL_KEY_PATH") 582 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", 583 help=("SSL certificate path (default: %s)" % 584 default_ssl_cert), 585 default=default_ssl_cert, type="string", 586 metavar="SSL_CERT_PATH") 587 588 # Disable the use of fork(2) if the daemon uses threads 589 utils.no_fork = multithreaded 590 591 options, args = optionparser.parse_args() 592 593 if getattr(options, "ssl", False): 594 ssl_paths = { 595 "certificate": options.ssl_cert, 596 "key": options.ssl_key, 597 } 598 599 for name, path in ssl_paths.iteritems(): 600 if not os.path.isfile(path): 601 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path) 602 sys.exit(constants.EXIT_FAILURE) 603 604 # TODO: By initiating http.HttpSslParams here we would only read the files 605 # once and have a proper validation (isfile returns False on directories) 606 # at the same time. 607 608 if check_fn is not None: 609 check_fn(options, args) 610 611 utils.EnsureDirs(dirs) 612 613 if options.fork: 614 try: 615 uid = pwd.getpwnam(user).pw_uid 616 gid = grp.getgrnam(group).gr_gid 617 except KeyError: 618 raise errors.ConfigurationError("User or group not existing on system:" 619 " %s:%s" % (user, group)) 620 utils.CloseFDs() 621 utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid) 622 623 utils.WritePidFile(daemon_name) 624 try: 625 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name], 626 debug=options.debug, 627 stderr_logging=not options.fork, 628 multithreaded=multithreaded, 629 program=daemon_name, 630 syslog=options.syslog, 631 console_logging=console_logging) 632 logging.info("%s daemon startup", daemon_name) 633 exec_fn(options, args) 634 finally: 635 utils.RemovePidFile(daemon_name)
636