Package ganeti :: Module daemon
[hide private]
[frames] | no frames]

Source Code for Module ganeti.daemon

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module with helper classes and functions for daemons""" 
 32   
 33   
 34  import asyncore 
 35  import asynchat 
 36  import collections 
 37  import os 
 38  import signal 
 39  import logging 
 40  import sched 
 41  import time 
 42  import socket 
 43  import select 
 44  import sys 
 45   
 46  from ganeti import utils 
 47  from ganeti import constants 
 48  from ganeti import errors 
 49  from ganeti import netutils 
 50  from ganeti import ssconf 
 51  from ganeti import runtime 
 52  from ganeti import compat 
53 54 55 -class SchedulerBreakout(Exception):
56 """Exception used to get out of the scheduler loop 57 58 """
59
60 61 -def AsyncoreDelayFunction(timeout):
62 """Asyncore-compatible scheduler delay function. 63 64 This is a delay function for sched that, rather than actually sleeping, 65 executes asyncore events happening in the meantime. 66 67 After an event has occurred, rather than returning, it raises a 68 SchedulerBreakout exception, which will force the current scheduler.run() 69 invocation to terminate, so that we can also check for signals. The main loop 70 will then call the scheduler run again, which will allow it to actually 71 process any due events. 72 73 This is needed because scheduler.run() doesn't support a count=..., as 74 asyncore loop, and the scheduler module documents throwing exceptions from 75 inside the delay function as an allowed usage model. 76 77 """ 78 asyncore.loop(timeout=timeout, count=1, use_poll=True) 79 raise SchedulerBreakout()
80
81 82 -class AsyncoreScheduler(sched.scheduler):
83 """Event scheduler integrated with asyncore 84 85 """
86 - def __init__(self, timefunc):
87 """Initializes this class. 88 89 """ 90 sched.scheduler.__init__(self, timefunc, self._LimitedDelay) 91 self._max_delay = None
92
93 - def run(self, max_delay=None): # pylint: disable=W0221
94 """Run any pending events. 95 96 @type max_delay: None or number 97 @param max_delay: Maximum delay (useful if caller has timeouts running) 98 99 """ 100 assert self._max_delay is None 101 102 # The delay function used by the scheduler can't be different on each run, 103 # hence an instance variable must be used. 104 if max_delay is None: 105 self._max_delay = None 106 else: 107 self._max_delay = utils.RunningTimeout(max_delay, False) 108 109 try: 110 return sched.scheduler.run(self) 111 finally: 112 self._max_delay = None
113
114 - def _LimitedDelay(self, duration):
115 """Custom delay function for C{sched.scheduler}. 116 117 """ 118 if self._max_delay is None: 119 timeout = duration 120 else: 121 timeout = min(duration, self._max_delay.Remaining()) 122 123 return AsyncoreDelayFunction(timeout)
124
125 126 -class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
127 """Base Ganeti Asyncore Dispacher 128 129 """ 130 # this method is overriding an asyncore.dispatcher method
131 - def handle_error(self):
132 """Log an error in handling any request, and proceed. 133 134 """ 135 logging.exception("Error while handling asyncore request")
136 137 # this method is overriding an asyncore.dispatcher method
138 - def writable(self):
139 """Most of the time we don't want to check for writability. 140 141 """ 142 return False
143
144 145 -class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
146 """A stream server to use with asyncore. 147 148 Each request is accepted, and then dispatched to a separate asyncore 149 dispatcher to handle. 150 151 """ 152 153 _REQUEST_QUEUE_SIZE = 5 154
155 - def __init__(self, family, address):
156 """Constructor for AsyncUnixStreamSocket 157 158 @type family: integer 159 @param family: socket family (one of socket.AF_*) 160 @type address: address family dependent 161 @param address: address to bind the socket to 162 163 """ 164 GanetiBaseAsyncoreDispatcher.__init__(self) 165 self.family = family 166 self.create_socket(self.family, socket.SOCK_STREAM) 167 self.set_reuse_addr() 168 self.bind(address) 169 self.listen(self._REQUEST_QUEUE_SIZE)
170 171 # this method is overriding an asyncore.dispatcher method
172 - def handle_accept(self):
173 """Accept a new client connection. 174 175 Creates a new instance of the handler class, which will use asyncore to 176 serve the client. 177 178 """ 179 accept_result = utils.IgnoreSignals(self.accept) 180 if accept_result is not None: 181 connected_socket, client_address = accept_result 182 if self.family == socket.AF_UNIX: 183 # override the client address, as for unix sockets nothing meaningful 184 # is passed in from accept anyway 185 client_address = netutils.GetSocketCredentials(connected_socket) 186 logging.info("Accepted connection from %s", 187 netutils.FormatAddress(client_address, family=self.family)) 188 self.handle_connection(connected_socket, client_address)
189
190 - def handle_connection(self, connected_socket, client_address):
191 """Handle an already accepted connection. 192 193 """ 194 raise NotImplementedError
195
196 197 -class AsyncTerminatedMessageStream(asynchat.async_chat):
198 """A terminator separated message stream asyncore module. 199 200 Handles a stream connection receiving messages terminated by a defined 201 separator. For each complete message handle_message is called. 202 203 """
204 - def __init__(self, connected_socket, peer_address, terminator, family, 205 unhandled_limit):
206 """AsyncTerminatedMessageStream constructor. 207 208 @type connected_socket: socket.socket 209 @param connected_socket: connected stream socket to receive messages from 210 @param peer_address: family-specific peer address 211 @type terminator: string 212 @param terminator: terminator separating messages in the stream 213 @type family: integer 214 @param family: socket family 215 @type unhandled_limit: integer or None 216 @param unhandled_limit: maximum unanswered messages 217 218 """ 219 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by 220 # using a positional argument rather than a keyword one. 221 asynchat.async_chat.__init__(self, connected_socket) 222 self.connected_socket = connected_socket 223 # on python 2.4 there is no "family" attribute for the socket class 224 # FIXME: when we move to python 2.5 or above remove the family parameter 225 #self.family = self.connected_socket.family 226 self.family = family 227 self.peer_address = peer_address 228 self.terminator = terminator 229 self.unhandled_limit = unhandled_limit 230 self.set_terminator(terminator) 231 self.ibuffer = [] 232 self.receive_count = 0 233 self.send_count = 0 234 self.oqueue = collections.deque() 235 self.iqueue = collections.deque()
236 237 # this method is overriding an asynchat.async_chat method
238 - def collect_incoming_data(self, data):
239 self.ibuffer.append(data)
240
241 - def _can_handle_message(self):
242 return (self.unhandled_limit is None or 243 (self.receive_count < self.send_count + self.unhandled_limit) and 244 not self.iqueue)
245 246 # this method is overriding an asynchat.async_chat method
247 - def found_terminator(self):
248 message = "".join(self.ibuffer) 249 self.ibuffer = [] 250 message_id = self.receive_count 251 # We need to increase the receive_count after checking if the message can 252 # be handled, but before calling handle_message 253 can_handle = self._can_handle_message() 254 self.receive_count += 1 255 if can_handle: 256 self.handle_message(message, message_id) 257 else: 258 self.iqueue.append((message, message_id))
259
260 - def handle_message(self, message, message_id):
261 """Handle a terminated message. 262 263 @type message: string 264 @param message: message to handle 265 @type message_id: integer 266 @param message_id: stream's message sequence number 267 268 """ 269 pass
270 # TODO: move this method to raise NotImplementedError 271 # raise NotImplementedError 272
273 - def send_message(self, message):
274 """Send a message to the remote peer. This function is thread-safe. 275 276 @type message: string 277 @param message: message to send, without the terminator 278 279 @warning: If calling this function from a thread different than the one 280 performing the main asyncore loop, remember that you have to wake that one 281 up. 282 283 """ 284 # If we just append the message we received to the output queue, this 285 # function can be safely called by multiple threads at the same time, and 286 # we don't need locking, since deques are thread safe. handle_write in the 287 # asyncore thread will handle the next input message if there are any 288 # enqueued. 289 self.oqueue.append(message)
290 291 # this method is overriding an asyncore.dispatcher method
292 - def readable(self):
293 # read from the socket if we can handle the next requests 294 return self._can_handle_message() and asynchat.async_chat.readable(self)
295 296 # this method is overriding an asyncore.dispatcher method
297 - def writable(self):
298 # the output queue may become full just after we called writable. This only 299 # works if we know we'll have something else waking us up from the select, 300 # in such case, anyway. 301 return asynchat.async_chat.writable(self) or self.oqueue
302 303 # this method is overriding an asyncore.dispatcher method
304 - def handle_write(self):
305 if self.oqueue: 306 # if we have data in the output queue, then send_message was called. 307 # this means we can process one more message from the input queue, if 308 # there are any. 309 data = self.oqueue.popleft() 310 self.push(data + self.terminator) 311 self.send_count += 1 312 if self.iqueue: 313 self.handle_message(*self.iqueue.popleft()) 314 self.initiate_send()
315
316 - def close_log(self):
317 logging.info("Closing connection from %s", 318 netutils.FormatAddress(self.peer_address, family=self.family)) 319 self.close()
320 321 # this method is overriding an asyncore.dispatcher method
322 - def handle_expt(self):
323 self.close_log()
324 325 # this method is overriding an asyncore.dispatcher method
326 - def handle_error(self):
327 """Log an error in handling any request, and proceed. 328 329 """ 330 logging.exception("Error while handling asyncore request") 331 self.close_log()
332
333 334 -class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
335 """An improved asyncore udp socket. 336 337 """
338 - def __init__(self, family):
339 """Constructor for AsyncUDPSocket 340 341 """ 342 GanetiBaseAsyncoreDispatcher.__init__(self) 343 self._out_queue = [] 344 self._family = family 345 self.create_socket(family, socket.SOCK_DGRAM)
346 347 # this method is overriding an asyncore.dispatcher method
348 - def handle_connect(self):
349 # Python thinks that the first udp message from a source qualifies as a 350 # "connect" and further ones are part of the same connection. We beg to 351 # differ and treat all messages equally. 352 pass
353 354 # this method is overriding an asyncore.dispatcher method
355 - def handle_read(self):
356 recv_result = utils.IgnoreSignals(self.recvfrom, 357 constants.MAX_UDP_DATA_SIZE) 358 if recv_result is not None: 359 payload, address = recv_result 360 if self._family == socket.AF_INET6: 361 # we ignore 'flow info' and 'scope id' as we don't need them 362 ip, port, _, _ = address 363 else: 364 ip, port = address 365 366 self.handle_datagram(payload, ip, port)
367
368 - def handle_datagram(self, payload, ip, port):
369 """Handle an already read udp datagram 370 371 """ 372 raise NotImplementedError
373 374 # this method is overriding an asyncore.dispatcher method
375 - def writable(self):
376 # We should check whether we can write to the socket only if we have 377 # something scheduled to be written 378 return bool(self._out_queue)
379 380 # this method is overriding an asyncore.dispatcher method
381 - def handle_write(self):
382 if not self._out_queue: 383 logging.error("handle_write called with empty output queue") 384 return 385 (ip, port, payload) = self._out_queue[0] 386 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) 387 self._out_queue.pop(0)
388
389 - def enqueue_send(self, ip, port, payload):
390 """Enqueue a datagram to be sent when possible 391 392 """ 393 if len(payload) > constants.MAX_UDP_DATA_SIZE: 394 raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload), 395 constants.MAX_UDP_DATA_SIZE)) 396 self._out_queue.append((ip, port, payload))
397
398 - def process_next_packet(self, timeout=0):
399 """Process the next datagram, waiting for it if necessary. 400 401 @type timeout: float 402 @param timeout: how long to wait for data 403 @rtype: boolean 404 @return: True if some data has been handled, False otherwise 405 406 """ 407 result = utils.WaitForFdCondition(self, select.POLLIN, timeout) 408 if result is not None and result & select.POLLIN: 409 self.handle_read() 410 return True 411 else: 412 return False
413
414 415 -class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
416 """A way to notify the asyncore loop that something is going on. 417 418 If an asyncore daemon is multithreaded when a thread tries to push some data 419 to a socket, the main loop handling asynchronous requests might be sleeping 420 waiting on a select(). To avoid this it can create an instance of the 421 AsyncAwaker, which other threads can use to wake it up. 422 423 """
424 - def __init__(self, signal_fn=None):
425 """Constructor for AsyncAwaker 426 427 @type signal_fn: function 428 @param signal_fn: function to call when awaken 429 430 """ 431 GanetiBaseAsyncoreDispatcher.__init__(self) 432 assert signal_fn is None or callable(signal_fn) 433 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, 434 socket.SOCK_STREAM) 435 self.in_socket.setblocking(0) 436 self.in_socket.shutdown(socket.SHUT_WR) 437 self.out_socket.shutdown(socket.SHUT_RD) 438 self.set_socket(self.in_socket) 439 self.need_signal = True 440 self.signal_fn = signal_fn 441 self.connected = True
442 443 # this method is overriding an asyncore.dispatcher method
444 - def handle_read(self):
445 utils.IgnoreSignals(self.recv, 4096) 446 if self.signal_fn: 447 self.signal_fn() 448 self.need_signal = True
449 450 # this method is overriding an asyncore.dispatcher method
451 - def close(self):
452 asyncore.dispatcher.close(self) 453 self.out_socket.close()
454
455 - def signal(self):
456 """Signal the asyncore main loop. 457 458 Any data we send here will be ignored, but it will cause the select() call 459 to return. 460 461 """ 462 # Yes, there is a race condition here. No, we don't care, at worst we're 463 # sending more than one wakeup token, which doesn't harm at all. 464 if self.need_signal: 465 self.need_signal = False 466 self.out_socket.send(chr(0))
467
468 469 -class _ShutdownCheck(object):
470 """Logic for L{Mainloop} shutdown. 471 472 """
473 - def __init__(self, fn):
474 """Initializes this class. 475 476 @type fn: callable 477 @param fn: Function returning C{None} if mainloop can be stopped or a 478 duration in seconds after which the function should be called again 479 @see: L{Mainloop.Run} 480 481 """ 482 assert callable(fn) 483 484 self._fn = fn 485 self._defer = None
486
487 - def CanShutdown(self):
488 """Checks whether mainloop can be stopped. 489 490 @rtype: bool 491 492 """ 493 if self._defer and self._defer.Remaining() > 0: 494 # A deferred check has already been scheduled 495 return False 496 497 # Ask mainloop driver whether we can stop or should check again 498 timeout = self._fn() 499 500 if timeout is None: 501 # Yes, can stop mainloop 502 return True 503 504 # Schedule another check in the future 505 self._defer = utils.RunningTimeout(timeout, True) 506 507 return False
508
509 510 -class Mainloop(object):
511 """Generic mainloop for daemons 512 513 @ivar scheduler: A sched.scheduler object, which can be used to register 514 timed events 515 516 """ 517 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1) 518
519 - def __init__(self):
520 """Constructs a new Mainloop instance. 521 522 """ 523 self._signal_wait = [] 524 self.scheduler = AsyncoreScheduler(time.time) 525 526 # Resolve uid/gids used 527 runtime.GetEnts()
528 529 @utils.SignalHandled([signal.SIGCHLD]) 530 @utils.SignalHandled([signal.SIGTERM]) 531 @utils.SignalHandled([signal.SIGINT])
532 - def Run(self, shutdown_wait_fn=None, signal_handlers=None):
533 """Runs the mainloop. 534 535 @type shutdown_wait_fn: callable 536 @param shutdown_wait_fn: Function to check whether loop can be terminated; 537 B{important}: function must be idempotent and must return either None 538 for shutting down or a timeout for another call 539 @type signal_handlers: dict 540 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator 541 542 """ 543 assert isinstance(signal_handlers, dict) and \ 544 len(signal_handlers) > 0, \ 545 "Broken SignalHandled decorator" 546 547 # Counter for received signals 548 shutdown_signals = 0 549 550 # Logic to wait for shutdown 551 shutdown_waiter = None 552 553 # Start actual main loop 554 while True: 555 if shutdown_signals == 1 and shutdown_wait_fn is not None: 556 if shutdown_waiter is None: 557 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn) 558 559 # Let mainloop driver decide if we can already abort 560 if shutdown_waiter.CanShutdown(): 561 break 562 563 # Re-evaluate in a second 564 timeout = 1.0 565 566 elif shutdown_signals >= 1: 567 # Abort loop if more than one signal has been sent or no callback has 568 # been given 569 break 570 571 else: 572 # Wait forever on I/O events 573 timeout = None 574 575 if self.scheduler.empty(): 576 asyncore.loop(count=1, timeout=timeout, use_poll=True) 577 else: 578 try: 579 self.scheduler.run(max_delay=timeout) 580 except SchedulerBreakout: 581 pass 582 583 # Check whether a signal was raised 584 for (sig, handler) in signal_handlers.items(): 585 if handler.called: 586 self._CallSignalWaiters(sig) 587 if sig in (signal.SIGTERM, signal.SIGINT): 588 logging.info("Received signal %s asking for shutdown", sig) 589 shutdown_signals += 1 590 handler.Clear()
591
592 - def _CallSignalWaiters(self, signum):
593 """Calls all signal waiters for a certain signal. 594 595 @type signum: int 596 @param signum: Signal number 597 598 """ 599 for owner in self._signal_wait: 600 owner.OnSignal(signum)
601
602 - def RegisterSignal(self, owner):
603 """Registers a receiver for signal notifications 604 605 The receiver must support a "OnSignal(self, signum)" function. 606 607 @type owner: instance 608 @param owner: Receiver 609 610 """ 611 self._signal_wait.append(owner)
612
613 614 -def _VerifyDaemonUser(daemon_name):
615 """Verifies the process uid matches the configured uid. 616 617 This method verifies that a daemon is started as the user it is 618 intended to be run 619 620 @param daemon_name: The name of daemon to be started 621 @return: A tuple with the first item indicating success or not, 622 the second item current uid and third with expected uid 623 624 """ 625 getents = runtime.GetEnts() 626 running_uid = os.getuid() 627 daemon_uids = { 628 constants.MASTERD: getents.masterd_uid, 629 constants.RAPI: getents.rapi_uid, 630 constants.NODED: getents.noded_uid, 631 constants.CONFD: getents.confd_uid, 632 } 633 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name 634 635 return (daemon_uids[daemon_name] == running_uid, running_uid, 636 daemon_uids[daemon_name])
637
638 639 -def _BeautifyError(err):
640 """Try to format an error better. 641 642 Since we're dealing with daemon startup errors, in many cases this 643 will be due to socket error and such, so we try to format these cases better. 644 645 @param err: an exception object 646 @rtype: string 647 @return: the formatted error description 648 649 """ 650 try: 651 if isinstance(err, socket.error): 652 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0]) 653 elif isinstance(err, EnvironmentError): 654 if err.filename is None: 655 return "%s (errno=%s)" % (err.strerror, err.errno) 656 else: 657 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename, 658 err.errno) 659 else: 660 return str(err) 661 except Exception: # pylint: disable=W0703 662 logging.exception("Error while handling existing error %s", err) 663 return "%s" % str(err)
664
665 666 -def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
667 """Handler for SIGHUP. 668 669 @param reopen_fn: List of callback functions for reopening log files 670 671 """ 672 logging.info("Reopening log files after receiving SIGHUP") 673 674 for fn in reopen_fn: 675 if fn: 676 fn() 677
678 679 -def GenericMain(daemon_name, optionparser, 680 check_fn, prepare_fn, exec_fn, 681 multithreaded=False, console_logging=False, 682 default_ssl_cert=None, default_ssl_key=None):
683 """Shared main function for daemons. 684 685 @type daemon_name: string 686 @param daemon_name: daemon name 687 @type optionparser: optparse.OptionParser 688 @param optionparser: initialized optionparser with daemon-specific options 689 (common -f -d options will be handled by this module) 690 @type check_fn: function which accepts (options, args) 691 @param check_fn: function that checks start conditions and exits if they're 692 not met 693 @type prepare_fn: function which accepts (options, args) 694 @param prepare_fn: function that is run before forking, or None; 695 it's result will be passed as the third parameter to exec_fn, or 696 if None was passed in, we will just pass None to exec_fn 697 @type exec_fn: function which accepts (options, args, prepare_results) 698 @param exec_fn: function that's executed with the daemon's pid file held, and 699 runs the daemon itself. 700 @type multithreaded: bool 701 @param multithreaded: Whether the daemon uses threads 702 @type console_logging: boolean 703 @param console_logging: if True, the daemon will fall back to the system 704 console if logging fails 705 @type default_ssl_cert: string 706 @param default_ssl_cert: Default SSL certificate path 707 @type default_ssl_key: string 708 @param default_ssl_key: Default SSL key path 709 710 """ 711 optionparser.add_option("-f", "--foreground", dest="fork", 712 help="Don't detach from the current terminal", 713 default=True, action="store_false") 714 optionparser.add_option("-d", "--debug", dest="debug", 715 help="Enable some debug messages", 716 default=False, action="store_true") 717 optionparser.add_option("--syslog", dest="syslog", 718 help="Enable logging to syslog (except debug" 719 " messages); one of 'no', 'yes' or 'only' [%s]" % 720 constants.SYSLOG_USAGE, 721 default=constants.SYSLOG_USAGE, 722 choices=["no", "yes", "only"]) 723 724 family = ssconf.SimpleStore().GetPrimaryIPFamily() 725 # family will default to AF_INET if there is no ssconf file (e.g. when 726 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters 727 # <= 2.2 can not be AF_INET6 728 if daemon_name in constants.DAEMONS_PORTS: 729 default_bind_address = constants.IP4_ADDRESS_ANY 730 if family == netutils.IP6Address.family: 731 default_bind_address = constants.IP6_ADDRESS_ANY 732 733 default_port = netutils.GetDaemonPort(daemon_name) 734 735 # For networked daemons we allow choosing the port and bind address 736 optionparser.add_option("-p", "--port", dest="port", 737 help="Network port (default: %s)" % default_port, 738 default=default_port, type="int") 739 optionparser.add_option("-b", "--bind", dest="bind_address", 740 help=("Bind address (default: '%s')" % 741 default_bind_address), 742 default=default_bind_address, metavar="ADDRESS") 743 optionparser.add_option("-i", "--interface", dest="bind_interface", 744 help=("Bind interface"), metavar="INTERFACE") 745 746 if default_ssl_key is not None and default_ssl_cert is not None: 747 optionparser.add_option("--no-ssl", dest="ssl", 748 help="Do not secure HTTP protocol with SSL", 749 default=True, action="store_false") 750 optionparser.add_option("-K", "--ssl-key", dest="ssl_key", 751 help=("SSL key path (default: %s)" % 752 default_ssl_key), 753 default=default_ssl_key, type="string", 754 metavar="SSL_KEY_PATH") 755 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", 756 help=("SSL certificate path (default: %s)" % 757 default_ssl_cert), 758 default=default_ssl_cert, type="string", 759 metavar="SSL_CERT_PATH") 760 761 # Disable the use of fork(2) if the daemon uses threads 762 if multithreaded: 763 utils.DisableFork() 764 765 options, args = optionparser.parse_args() 766 767 if getattr(options, "bind_interface", None) is not None: 768 if options.bind_address != default_bind_address: 769 msg = ("Can't specify both, bind address (%s) and bind interface (%s)" % 770 (options.bind_address, options.bind_interface)) 771 print >> sys.stderr, msg 772 sys.exit(constants.EXIT_FAILURE) 773 interface_ip_addresses = \ 774 netutils.GetInterfaceIpAddresses(options.bind_interface) 775 if family == netutils.IP6Address.family: 776 if_addresses = interface_ip_addresses[constants.IP6_VERSION] 777 else: 778 if_addresses = interface_ip_addresses[constants.IP4_VERSION] 779 if len(if_addresses) < 1: 780 msg = "Failed to find IP for interface %s" % options.bind_interace 781 print >> sys.stderr, msg 782 sys.exit(constants.EXIT_FAILURE) 783 options.bind_address = if_addresses[0] 784 785 if getattr(options, "ssl", False): 786 ssl_paths = { 787 "certificate": options.ssl_cert, 788 "key": options.ssl_key, 789 } 790 791 for name, path in ssl_paths.iteritems(): 792 if not os.path.isfile(path): 793 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path) 794 sys.exit(constants.EXIT_FAILURE) 795 796 # TODO: By initiating http.HttpSslParams here we would only read the files 797 # once and have a proper validation (isfile returns False on directories) 798 # at the same time. 799 800 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name) 801 if not result: 802 msg = ("%s started using wrong user ID (%d), expected %d" % 803 (daemon_name, running_uid, expected_uid)) 804 print >> sys.stderr, msg 805 sys.exit(constants.EXIT_FAILURE) 806 807 if check_fn is not None: 808 check_fn(options, args) 809 810 log_filename = constants.DAEMONS_LOGFILES[daemon_name] 811 812 if options.fork: 813 # Newer GnuTLS versions (>= 3.3.0) use a library constructor for 814 # initialization and open /dev/urandom on library load time, way before we 815 # fork(). Closing /dev/urandom causes subsequent ganeti.http.client 816 # requests to fail and the process to receive a SIGABRT. As we cannot 817 # reliably detect GnuTLS's socket, we work our way around this by keeping 818 # all fds referring to /dev/urandom open. 819 noclose_fds = [] 820 for fd in os.listdir("/proc/self/fd"): 821 try: 822 if os.readlink(os.path.join("/proc/self/fd", fd)) == "/dev/urandom": 823 noclose_fds.append(int(fd)) 824 except EnvironmentError: 825 # The fd might have disappeared (although it shouldn't as we're running 826 # single-threaded). 827 continue 828 829 utils.CloseFDs(noclose_fds=noclose_fds) 830 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename) 831 else: 832 (wpipe, stdio_reopen_fn) = (None, None) 833 834 log_reopen_fn = \ 835 utils.SetupLogging(log_filename, daemon_name, 836 debug=options.debug, 837 stderr_logging=not options.fork, 838 multithreaded=multithreaded, 839 syslog=options.syslog, 840 console_logging=console_logging) 841 842 # Reopen log file(s) on SIGHUP 843 signal.signal(signal.SIGHUP, 844 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn])) 845 846 try: 847 utils.WritePidFile(utils.DaemonPidFileName(daemon_name)) 848 except errors.PidFileLockError, err: 849 print >> sys.stderr, "Error while locking PID file:\n%s" % err 850 sys.exit(constants.EXIT_FAILURE) 851 852 try: 853 try: 854 logging.info("%s daemon startup", daemon_name) 855 if callable(prepare_fn): 856 prep_results = prepare_fn(options, args) 857 else: 858 prep_results = None 859 except Exception, err: 860 utils.WriteErrorToFD(wpipe, _BeautifyError(err)) 861 raise 862 863 if wpipe is not None: 864 # we're done with the preparation phase, we close the pipe to 865 # let the parent know it's safe to exit 866 os.close(wpipe) 867 868 exec_fn(options, args, prep_results) 869 finally: 870 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
871