Package ganeti :: Module daemon
[hide private]
[frames] | no frames]

Source Code for Module ganeti.daemon

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module with helper classes and functions for daemons""" 
 32   
 33   
 34  import asyncore 
 35  import asynchat 
 36  import collections 
 37  import os 
 38  import signal 
 39  import logging 
 40  import sched 
 41  import time 
 42  import socket 
 43  import select 
 44  import sys 
 45   
 46  from ganeti import utils 
 47  from ganeti import constants 
 48  from ganeti import errors 
 49  from ganeti import netutils 
 50  from ganeti import ssconf 
 51  from ganeti import runtime 
 52  from ganeti import compat 
53 54 55 -class SchedulerBreakout(Exception):
56 """Exception used to get out of the scheduler loop 57 58 """
59
60 61 -def AsyncoreDelayFunction(timeout):
62 """Asyncore-compatible scheduler delay function. 63 64 This is a delay function for sched that, rather than actually sleeping, 65 executes asyncore events happening in the meantime. 66 67 After an event has occurred, rather than returning, it raises a 68 SchedulerBreakout exception, which will force the current scheduler.run() 69 invocation to terminate, so that we can also check for signals. The main loop 70 will then call the scheduler run again, which will allow it to actually 71 process any due events. 72 73 This is needed because scheduler.run() doesn't support a count=..., as 74 asyncore loop, and the scheduler module documents throwing exceptions from 75 inside the delay function as an allowed usage model. 76 77 """ 78 asyncore.loop(timeout=timeout, count=1, use_poll=True) 79 raise SchedulerBreakout()
80
81 82 -class AsyncoreScheduler(sched.scheduler):
83 """Event scheduler integrated with asyncore 84 85 """
86 - def __init__(self, timefunc):
87 """Initializes this class. 88 89 """ 90 sched.scheduler.__init__(self, timefunc, self._LimitedDelay) 91 self._max_delay = None
92
93 - def run(self, max_delay=None): # pylint: disable=W0221
94 """Run any pending events. 95 96 @type max_delay: None or number 97 @param max_delay: Maximum delay (useful if caller has timeouts running) 98 99 """ 100 assert self._max_delay is None 101 102 # The delay function used by the scheduler can't be different on each run, 103 # hence an instance variable must be used. 104 if max_delay is None: 105 self._max_delay = None 106 else: 107 self._max_delay = utils.RunningTimeout(max_delay, False) 108 109 try: 110 return sched.scheduler.run(self) 111 finally: 112 self._max_delay = None
113
114 - def _LimitedDelay(self, duration):
115 """Custom delay function for C{sched.scheduler}. 116 117 """ 118 if self._max_delay is None: 119 timeout = duration 120 else: 121 timeout = min(duration, self._max_delay.Remaining()) 122 123 return AsyncoreDelayFunction(timeout)
124
125 126 -class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
127 """Base Ganeti Asyncore Dispacher 128 129 """ 130 # this method is overriding an asyncore.dispatcher method
131 - def handle_error(self):
132 """Log an error in handling any request, and proceed. 133 134 """ 135 logging.exception("Error while handling asyncore request")
136 137 # this method is overriding an asyncore.dispatcher method
138 - def writable(self):
139 """Most of the time we don't want to check for writability. 140 141 """ 142 return False
143
144 145 -class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
146 """A stream server to use with asyncore. 147 148 Each request is accepted, and then dispatched to a separate asyncore 149 dispatcher to handle. 150 151 """ 152 153 _REQUEST_QUEUE_SIZE = 5 154
155 - def __init__(self, family, address):
156 """Constructor for AsyncStreamServer 157 158 @type family: integer 159 @param family: socket family (one of socket.AF_*) 160 @type address: address family dependent 161 @param address: address to bind the socket to 162 163 """ 164 GanetiBaseAsyncoreDispatcher.__init__(self) 165 self.family = family 166 self.create_socket(self.family, socket.SOCK_STREAM) 167 self.set_reuse_addr() 168 self.bind(address) 169 self.listen(self._REQUEST_QUEUE_SIZE)
170 171 # this method is overriding an asyncore.dispatcher method
172 - def handle_accept(self):
173 """Accept a new client connection. 174 175 Creates a new instance of the handler class, which will use asyncore to 176 serve the client. 177 178 """ 179 accept_result = utils.IgnoreSignals(self.accept) 180 if accept_result is not None: 181 connected_socket, client_address = accept_result 182 if self.family == socket.AF_UNIX: 183 # override the client address, as for unix sockets nothing meaningful 184 # is passed in from accept anyway 185 client_address = netutils.GetSocketCredentials(connected_socket) 186 logging.info("Accepted connection from %s", 187 netutils.FormatAddress(client_address, family=self.family)) 188 self.handle_connection(connected_socket, client_address)
189
190 - def handle_connection(self, connected_socket, client_address):
191 """Handle an already accepted connection. 192 193 """ 194 raise NotImplementedError
195
196 197 -class AsyncTerminatedMessageStream(asynchat.async_chat):
198 """A terminator separated message stream asyncore module. 199 200 Handles a stream connection receiving messages terminated by a defined 201 separator. For each complete message handle_message is called. 202 203 """
204 - def __init__(self, connected_socket, peer_address, terminator, family, 205 unhandled_limit):
206 """AsyncTerminatedMessageStream constructor. 207 208 @type connected_socket: socket.socket 209 @param connected_socket: connected stream socket to receive messages from 210 @param peer_address: family-specific peer address 211 @type terminator: string 212 @param terminator: terminator separating messages in the stream 213 @type family: integer 214 @param family: socket family 215 @type unhandled_limit: integer or None 216 @param unhandled_limit: maximum unanswered messages 217 218 """ 219 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by 220 # using a positional argument rather than a keyword one. 221 asynchat.async_chat.__init__(self, connected_socket) 222 self.connected_socket = connected_socket 223 # on python 2.4 there is no "family" attribute for the socket class 224 # FIXME: when we move to python 2.5 or above remove the family parameter 225 #self.family = self.connected_socket.family 226 self.family = family 227 self.peer_address = peer_address 228 self.terminator = terminator 229 self.unhandled_limit = unhandled_limit 230 self.set_terminator(terminator) 231 self.ibuffer = [] 232 self.receive_count = 0 233 self.send_count = 0 234 self.oqueue = collections.deque() 235 self.iqueue = collections.deque()
236 237 # this method is overriding an asynchat.async_chat method
238 - def collect_incoming_data(self, data):
239 self.ibuffer.append(data)
240
241 - def _can_handle_message(self):
242 return (self.unhandled_limit is None or 243 (self.receive_count < self.send_count + self.unhandled_limit) and 244 not self.iqueue)
245 246 # this method is overriding an asynchat.async_chat method
247 - def found_terminator(self):
248 message = "".join(self.ibuffer) 249 self.ibuffer = [] 250 message_id = self.receive_count 251 # We need to increase the receive_count after checking if the message can 252 # be handled, but before calling handle_message 253 can_handle = self._can_handle_message() 254 self.receive_count += 1 255 if can_handle: 256 self.handle_message(message, message_id) 257 else: 258 self.iqueue.append((message, message_id))
259
260 - def handle_message(self, message, message_id):
261 """Handle a terminated message. 262 263 @type message: string 264 @param message: message to handle 265 @type message_id: integer 266 @param message_id: stream's message sequence number 267 268 """ 269 pass
270 # TODO: move this method to raise NotImplementedError 271 # raise NotImplementedError 272
273 - def send_message(self, message):
274 """Send a message to the remote peer. This function is thread-safe. 275 276 @type message: string 277 @param message: message to send, without the terminator 278 279 @warning: If calling this function from a thread different than the one 280 performing the main asyncore loop, remember that you have to wake that one 281 up. 282 283 """ 284 # If we just append the message we received to the output queue, this 285 # function can be safely called by multiple threads at the same time, and 286 # we don't need locking, since deques are thread safe. handle_write in the 287 # asyncore thread will handle the next input message if there are any 288 # enqueued. 289 self.oqueue.append(message)
290 291 # this method is overriding an asyncore.dispatcher method
292 - def readable(self):
293 # read from the socket if we can handle the next requests 294 return self._can_handle_message() and asynchat.async_chat.readable(self)
295 296 # this method is overriding an asyncore.dispatcher method
297 - def writable(self):
298 # the output queue may become full just after we called writable. This only 299 # works if we know we'll have something else waking us up from the select, 300 # in such case, anyway. 301 return asynchat.async_chat.writable(self) or self.oqueue
302 303 # this method is overriding an asyncore.dispatcher method
304 - def handle_write(self):
305 if self.oqueue: 306 # if we have data in the output queue, then send_message was called. 307 # this means we can process one more message from the input queue, if 308 # there are any. 309 data = self.oqueue.popleft() 310 self.push(data + self.terminator) 311 self.send_count += 1 312 if self.iqueue: 313 self.handle_message(*self.iqueue.popleft()) 314 self.initiate_send()
315
316 - def close_log(self):
317 logging.info("Closing connection from %s", 318 netutils.FormatAddress(self.peer_address, family=self.family)) 319 self.close()
320 321 # this method is overriding an asyncore.dispatcher method
322 - def handle_expt(self):
323 self.close_log()
324 325 # this method is overriding an asyncore.dispatcher method
326 - def handle_error(self):
327 """Log an error in handling any request, and proceed. 328 329 """ 330 logging.exception("Error while handling asyncore request") 331 self.close_log()
332
333 334 -class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
335 """An improved asyncore udp socket. 336 337 """
338 - def __init__(self, family):
339 """Constructor for AsyncUDPSocket 340 341 """ 342 GanetiBaseAsyncoreDispatcher.__init__(self) 343 self._out_queue = [] 344 self._family = family 345 self.create_socket(family, socket.SOCK_DGRAM)
346 347 # this method is overriding an asyncore.dispatcher method
348 - def handle_connect(self):
349 # Python thinks that the first udp message from a source qualifies as a 350 # "connect" and further ones are part of the same connection. We beg to 351 # differ and treat all messages equally. 352 pass
353 354 # this method is overriding an asyncore.dispatcher method
355 - def handle_read(self):
356 recv_result = utils.IgnoreSignals(self.recvfrom, 357 constants.MAX_UDP_DATA_SIZE) 358 if recv_result is not None: 359 payload, address = recv_result 360 if self._family == socket.AF_INET6: 361 # we ignore 'flow info' and 'scope id' as we don't need them 362 ip, port, _, _ = address 363 else: 364 ip, port = address 365 366 self.handle_datagram(payload, ip, port)
367
368 - def handle_datagram(self, payload, ip, port):
369 """Handle an already read udp datagram 370 371 """ 372 raise NotImplementedError
373 374 # this method is overriding an asyncore.dispatcher method
375 - def writable(self):
376 # We should check whether we can write to the socket only if we have 377 # something scheduled to be written 378 return bool(self._out_queue)
379 380 # this method is overriding an asyncore.dispatcher method
381 - def handle_write(self):
382 if not self._out_queue: 383 logging.error("handle_write called with empty output queue") 384 return 385 (ip, port, payload) = self._out_queue[0] 386 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) 387 self._out_queue.pop(0)
388
389 - def enqueue_send(self, ip, port, payload):
390 """Enqueue a datagram to be sent when possible 391 392 """ 393 if len(payload) > constants.MAX_UDP_DATA_SIZE: 394 raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload), 395 constants.MAX_UDP_DATA_SIZE)) 396 self._out_queue.append((ip, port, payload))
397
398 - def process_next_packet(self, timeout=0):
399 """Process the next datagram, waiting for it if necessary. 400 401 @type timeout: float 402 @param timeout: how long to wait for data 403 @rtype: boolean 404 @return: True if some data has been handled, False otherwise 405 406 """ 407 result = utils.WaitForFdCondition(self, select.POLLIN, timeout) 408 if result is not None and result & select.POLLIN: 409 self.handle_read() 410 return True 411 else: 412 return False
413
414 415 -class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
416 """A way to notify the asyncore loop that something is going on. 417 418 If an asyncore daemon is multithreaded when a thread tries to push some data 419 to a socket, the main loop handling asynchronous requests might be sleeping 420 waiting on a select(). To avoid this it can create an instance of the 421 AsyncAwaker, which other threads can use to wake it up. 422 423 """
424 - def __init__(self, signal_fn=None):
425 """Constructor for AsyncAwaker 426 427 @type signal_fn: function 428 @param signal_fn: function to call when awaken 429 430 """ 431 GanetiBaseAsyncoreDispatcher.__init__(self) 432 assert signal_fn is None or callable(signal_fn) 433 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, 434 socket.SOCK_STREAM) 435 self.in_socket.setblocking(0) 436 self.in_socket.shutdown(socket.SHUT_WR) 437 self.out_socket.shutdown(socket.SHUT_RD) 438 self.set_socket(self.in_socket) 439 self.need_signal = True 440 self.signal_fn = signal_fn 441 self.connected = True
442 443 # this method is overriding an asyncore.dispatcher method
444 - def handle_read(self):
445 utils.IgnoreSignals(self.recv, 4096) 446 if self.signal_fn: 447 self.signal_fn() 448 self.need_signal = True
449 450 # this method is overriding an asyncore.dispatcher method
451 - def close(self):
452 asyncore.dispatcher.close(self) 453 self.out_socket.close()
454
455 - def signal(self):
456 """Signal the asyncore main loop. 457 458 Any data we send here will be ignored, but it will cause the select() call 459 to return. 460 461 """ 462 # Yes, there is a race condition here. No, we don't care, at worst we're 463 # sending more than one wakeup token, which doesn't harm at all. 464 if self.need_signal: 465 self.need_signal = False 466 self.out_socket.send(chr(0))
467
468 469 -class _ShutdownCheck(object):
470 """Logic for L{Mainloop} shutdown. 471 472 """
473 - def __init__(self, fn):
474 """Initializes this class. 475 476 @type fn: callable 477 @param fn: Function returning C{None} if mainloop can be stopped or a 478 duration in seconds after which the function should be called again 479 @see: L{Mainloop.Run} 480 481 """ 482 assert callable(fn) 483 484 self._fn = fn 485 self._defer = None
486
487 - def CanShutdown(self):
488 """Checks whether mainloop can be stopped. 489 490 @rtype: bool 491 492 """ 493 if self._defer and self._defer.Remaining() > 0: 494 # A deferred check has already been scheduled 495 return False 496 497 # Ask mainloop driver whether we can stop or should check again 498 timeout = self._fn() 499 500 if timeout is None: 501 # Yes, can stop mainloop 502 return True 503 504 # Schedule another check in the future 505 self._defer = utils.RunningTimeout(timeout, True) 506 507 return False
508
509 510 -class Mainloop(object):
511 """Generic mainloop for daemons 512 513 @ivar scheduler: A sched.scheduler object, which can be used to register 514 timed events 515 516 """ 517 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1) 518
519 - def __init__(self):
520 """Constructs a new Mainloop instance. 521 522 """ 523 self._signal_wait = [] 524 self.scheduler = AsyncoreScheduler(time.time) 525 526 # Resolve uid/gids used 527 runtime.GetEnts()
528 529 @utils.SignalHandled([signal.SIGCHLD]) 530 @utils.SignalHandled([signal.SIGTERM]) 531 @utils.SignalHandled([signal.SIGINT])
532 - def Run(self, shutdown_wait_fn=None, signal_handlers=None):
533 """Runs the mainloop. 534 535 @type shutdown_wait_fn: callable 536 @param shutdown_wait_fn: Function to check whether loop can be terminated; 537 B{important}: function must be idempotent and must return either None 538 for shutting down or a timeout for another call 539 @type signal_handlers: dict 540 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator 541 542 """ 543 assert isinstance(signal_handlers, dict) and \ 544 len(signal_handlers) > 0, \ 545 "Broken SignalHandled decorator" 546 547 # Counter for received signals 548 shutdown_signals = 0 549 550 # Logic to wait for shutdown 551 shutdown_waiter = None 552 553 # Start actual main loop 554 while True: 555 if shutdown_signals == 1 and shutdown_wait_fn is not None: 556 if shutdown_waiter is None: 557 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn) 558 559 # Let mainloop driver decide if we can already abort 560 if shutdown_waiter.CanShutdown(): 561 break 562 563 # Re-evaluate in a second 564 timeout = 1.0 565 566 elif shutdown_signals >= 1: 567 # Abort loop if more than one signal has been sent or no callback has 568 # been given 569 break 570 571 else: 572 # Wait forever on I/O events 573 timeout = None 574 575 if self.scheduler.empty(): 576 asyncore.loop(count=1, timeout=timeout, use_poll=True) 577 else: 578 try: 579 self.scheduler.run(max_delay=timeout) 580 except SchedulerBreakout: 581 pass 582 583 # Check whether a signal was raised 584 for (sig, handler) in signal_handlers.items(): 585 if handler.called: 586 self._CallSignalWaiters(sig) 587 if sig in (signal.SIGTERM, signal.SIGINT): 588 logging.info("Received signal %s asking for shutdown", sig) 589 shutdown_signals += 1 590 handler.Clear()
591
592 - def _CallSignalWaiters(self, signum):
593 """Calls all signal waiters for a certain signal. 594 595 @type signum: int 596 @param signum: Signal number 597 598 """ 599 for owner in self._signal_wait: 600 owner.OnSignal(signum)
601
602 - def RegisterSignal(self, owner):
603 """Registers a receiver for signal notifications 604 605 The receiver must support a "OnSignal(self, signum)" function. 606 607 @type owner: instance 608 @param owner: Receiver 609 610 """ 611 self._signal_wait.append(owner)
612
613 614 -def _VerifyDaemonUser(daemon_name):
615 """Verifies the process uid matches the configured uid. 616 617 This method verifies that a daemon is started as the user it is 618 intended to be run 619 620 @param daemon_name: The name of daemon to be started 621 @return: A tuple with the first item indicating success or not, 622 the second item current uid and third with expected uid 623 624 """ 625 getents = runtime.GetEnts() 626 running_uid = os.getuid() 627 daemon_uids = { 628 constants.MASTERD: getents.masterd_uid, 629 constants.RAPI: getents.rapi_uid, 630 constants.NODED: getents.noded_uid, 631 constants.CONFD: getents.confd_uid, 632 } 633 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name 634 635 return (daemon_uids[daemon_name] == running_uid, running_uid, 636 daemon_uids[daemon_name])
637
638 639 -def _BeautifyError(err):
640 """Try to format an error better. 641 642 Since we're dealing with daemon startup errors, in many cases this 643 will be due to socket error and such, so we try to format these cases better. 644 645 @param err: an exception object 646 @rtype: string 647 @return: the formatted error description 648 649 """ 650 try: 651 if isinstance(err, socket.error): 652 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0]) 653 elif isinstance(err, EnvironmentError): 654 if err.filename is None: 655 return "%s (errno=%s)" % (err.strerror, err.errno) 656 else: 657 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename, 658 err.errno) 659 else: 660 return str(err) 661 except Exception: # pylint: disable=W0703 662 logging.exception("Error while handling existing error %s", err) 663 return "%s" % str(err)
664
665 666 -def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
667 """Handler for SIGHUP. 668 669 @param reopen_fn: List of callback functions for reopening log files 670 671 """ 672 logging.info("Reopening log files after receiving SIGHUP") 673 674 for fn in reopen_fn: 675 if fn: 676 fn() 677
678 679 -def GenericMain(daemon_name, optionparser, 680 check_fn, prepare_fn, exec_fn, 681 multithreaded=False, console_logging=False, 682 default_ssl_cert=None, default_ssl_key=None, 683 warn_breach=False):
684 """Shared main function for daemons. 685 686 @type daemon_name: string 687 @param daemon_name: daemon name 688 @type optionparser: optparse.OptionParser 689 @param optionparser: initialized optionparser with daemon-specific options 690 (common -f -d options will be handled by this module) 691 @type check_fn: function which accepts (options, args) 692 @param check_fn: function that checks start conditions and exits if they're 693 not met 694 @type prepare_fn: function which accepts (options, args) 695 @param prepare_fn: function that is run before forking, or None; 696 it's result will be passed as the third parameter to exec_fn, or 697 if None was passed in, we will just pass None to exec_fn 698 @type exec_fn: function which accepts (options, args, prepare_results) 699 @param exec_fn: function that's executed with the daemon's pid file held, and 700 runs the daemon itself. 701 @type multithreaded: bool 702 @param multithreaded: Whether the daemon uses threads 703 @type console_logging: boolean 704 @param console_logging: if True, the daemon will fall back to the system 705 console if logging fails 706 @type default_ssl_cert: string 707 @param default_ssl_cert: Default SSL certificate path 708 @type default_ssl_key: string 709 @param default_ssl_key: Default SSL key path 710 @type warn_breach: bool 711 @param warn_breach: issue a warning at daemon launch time, before 712 daemonizing, about the possibility of breaking parameter privacy 713 invariants through the otherwise helpful debug logging. 714 715 """ 716 optionparser.add_option("-f", "--foreground", dest="fork", 717 help="Don't detach from the current terminal", 718 default=True, action="store_false") 719 optionparser.add_option("-d", "--debug", dest="debug", 720 help="Enable some debug messages", 721 default=False, action="store_true") 722 optionparser.add_option("--syslog", dest="syslog", 723 help="Enable logging to syslog (except debug" 724 " messages); one of 'no', 'yes' or 'only' [%s]" % 725 constants.SYSLOG_USAGE, 726 default=constants.SYSLOG_USAGE, 727 choices=["no", "yes", "only"]) 728 729 family = ssconf.SimpleStore().GetPrimaryIPFamily() 730 # family will default to AF_INET if there is no ssconf file (e.g. when 731 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters 732 # <= 2.2 can not be AF_INET6 733 if daemon_name in constants.DAEMONS_PORTS: 734 default_bind_address = constants.IP4_ADDRESS_ANY 735 if family == netutils.IP6Address.family: 736 default_bind_address = constants.IP6_ADDRESS_ANY 737 738 default_port = netutils.GetDaemonPort(daemon_name) 739 740 # For networked daemons we allow choosing the port and bind address 741 optionparser.add_option("-p", "--port", dest="port", 742 help="Network port (default: %s)" % default_port, 743 default=default_port, type="int") 744 optionparser.add_option("-b", "--bind", dest="bind_address", 745 help=("Bind address (default: '%s')" % 746 default_bind_address), 747 default=default_bind_address, metavar="ADDRESS") 748 optionparser.add_option("-i", "--interface", dest="bind_interface", 749 help=("Bind interface"), metavar="INTERFACE") 750 751 if default_ssl_key is not None and default_ssl_cert is not None: 752 optionparser.add_option("--no-ssl", dest="ssl", 753 help="Do not secure HTTP protocol with SSL", 754 default=True, action="store_false") 755 optionparser.add_option("-K", "--ssl-key", dest="ssl_key", 756 help=("SSL key path (default: %s)" % 757 default_ssl_key), 758 default=default_ssl_key, type="string", 759 metavar="SSL_KEY_PATH") 760 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", 761 help=("SSL certificate path (default: %s)" % 762 default_ssl_cert), 763 default=default_ssl_cert, type="string", 764 metavar="SSL_CERT_PATH") 765 766 # Disable the use of fork(2) if the daemon uses threads 767 if multithreaded: 768 utils.DisableFork() 769 770 options, args = optionparser.parse_args() 771 772 if getattr(options, "bind_interface", None) is not None: 773 if options.bind_address != default_bind_address: 774 msg = ("Can't specify both, bind address (%s) and bind interface (%s)" % 775 (options.bind_address, options.bind_interface)) 776 print >> sys.stderr, msg 777 sys.exit(constants.EXIT_FAILURE) 778 interface_ip_addresses = \ 779 netutils.GetInterfaceIpAddresses(options.bind_interface) 780 if family == netutils.IP6Address.family: 781 if_addresses = interface_ip_addresses[constants.IP6_VERSION] 782 else: 783 if_addresses = interface_ip_addresses[constants.IP4_VERSION] 784 if len(if_addresses) < 1: 785 msg = "Failed to find IP for interface %s" % options.bind_interace 786 print >> sys.stderr, msg 787 sys.exit(constants.EXIT_FAILURE) 788 options.bind_address = if_addresses[0] 789 790 if getattr(options, "ssl", False): 791 ssl_paths = { 792 "certificate": options.ssl_cert, 793 "key": options.ssl_key, 794 } 795 796 for name, path in ssl_paths.iteritems(): 797 if not os.path.isfile(path): 798 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path) 799 sys.exit(constants.EXIT_FAILURE) 800 801 # TODO: By initiating http.HttpSslParams here we would only read the files 802 # once and have a proper validation (isfile returns False on directories) 803 # at the same time. 804 805 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name) 806 if not result: 807 msg = ("%s started using wrong user ID (%d), expected %d" % 808 (daemon_name, running_uid, expected_uid)) 809 print >> sys.stderr, msg 810 sys.exit(constants.EXIT_FAILURE) 811 812 if check_fn is not None: 813 check_fn(options, args) 814 815 log_filename = constants.DAEMONS_LOGFILES[daemon_name] 816 817 # node-daemon logging in lib/http/server.py, _HandleServerRequestInner 818 if options.debug and warn_breach: 819 sys.stderr.write(constants.DEBUG_MODE_CONFIDENTIALITY_WARNING % daemon_name) 820 821 if options.fork: 822 # Newer GnuTLS versions (>= 3.3.0) use a library constructor for 823 # initialization and open /dev/urandom on library load time, way before we 824 # fork(). Closing /dev/urandom causes subsequent ganeti.http.client 825 # requests to fail and the process to receive a SIGABRT. As we cannot 826 # reliably detect GnuTLS's socket, we work our way around this by keeping 827 # all fds referring to /dev/urandom open. 828 noclose_fds = [] 829 for fd in os.listdir("/proc/self/fd"): 830 try: 831 if os.readlink(os.path.join("/proc/self/fd", fd)) == "/dev/urandom": 832 noclose_fds.append(int(fd)) 833 except EnvironmentError: 834 # The fd might have disappeared (although it shouldn't as we're running 835 # single-threaded). 836 continue 837 838 utils.CloseFDs(noclose_fds=noclose_fds) 839 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename) 840 else: 841 (wpipe, stdio_reopen_fn) = (None, None) 842 843 log_reopen_fn = \ 844 utils.SetupLogging(log_filename, daemon_name, 845 debug=options.debug, 846 stderr_logging=not options.fork, 847 multithreaded=multithreaded, 848 syslog=options.syslog, 849 console_logging=console_logging) 850 851 # Reopen log file(s) on SIGHUP 852 signal.signal(signal.SIGHUP, 853 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn])) 854 855 try: 856 utils.WritePidFile(utils.DaemonPidFileName(daemon_name)) 857 except errors.PidFileLockError, err: 858 print >> sys.stderr, "Error while locking PID file:\n%s" % err 859 sys.exit(constants.EXIT_FAILURE) 860 861 try: 862 try: 863 logging.info("%s daemon startup", daemon_name) 864 if callable(prepare_fn): 865 prep_results = prepare_fn(options, args) 866 else: 867 prep_results = None 868 except Exception, err: 869 utils.WriteErrorToFD(wpipe, _BeautifyError(err)) 870 raise 871 872 if wpipe is not None: 873 # we're done with the preparation phase, we close the pipe to 874 # let the parent know it's safe to exit 875 os.close(wpipe) 876 877 exec_fn(options, args, prep_results) 878 finally: 879 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
880