Package ganeti :: Module daemon
[hide private]
[frames] | no frames]

Source Code for Module ganeti.daemon

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module with helper classes and functions for daemons""" 
 32   
 33   
 34  import asyncore 
 35  import asynchat 
 36  import collections 
 37  import os 
 38  import signal 
 39  import logging 
 40  import sched 
 41  import time 
 42  import socket 
 43  import select 
 44  import sys 
 45   
 46  from ganeti import utils 
 47  from ganeti import constants 
 48  from ganeti import errors 
 49  from ganeti import netutils 
 50  from ganeti import ssconf 
 51  from ganeti import runtime 
 52  from ganeti import compat 
53 54 55 -class SchedulerBreakout(Exception):
56 """Exception used to get out of the scheduler loop 57 58 """
59
60 61 -def AsyncoreDelayFunction(timeout):
62 """Asyncore-compatible scheduler delay function. 63 64 This is a delay function for sched that, rather than actually sleeping, 65 executes asyncore events happening in the meantime. 66 67 After an event has occurred, rather than returning, it raises a 68 SchedulerBreakout exception, which will force the current scheduler.run() 69 invocation to terminate, so that we can also check for signals. The main loop 70 will then call the scheduler run again, which will allow it to actually 71 process any due events. 72 73 This is needed because scheduler.run() doesn't support a count=..., as 74 asyncore loop, and the scheduler module documents throwing exceptions from 75 inside the delay function as an allowed usage model. 76 77 """ 78 asyncore.loop(timeout=timeout, count=1, use_poll=True) 79 raise SchedulerBreakout()
80
81 82 -class AsyncoreScheduler(sched.scheduler):
83 """Event scheduler integrated with asyncore 84 85 """
86 - def __init__(self, timefunc):
87 """Initializes this class. 88 89 """ 90 sched.scheduler.__init__(self, timefunc, self._LimitedDelay) 91 self._max_delay = None
92
93 - def run(self, max_delay=None): # pylint: disable=W0221
94 """Run any pending events. 95 96 @type max_delay: None or number 97 @param max_delay: Maximum delay (useful if caller has timeouts running) 98 99 """ 100 assert self._max_delay is None 101 102 # The delay function used by the scheduler can't be different on each run, 103 # hence an instance variable must be used. 104 if max_delay is None: 105 self._max_delay = None 106 else: 107 self._max_delay = utils.RunningTimeout(max_delay, False) 108 109 try: 110 return sched.scheduler.run(self) 111 finally: 112 self._max_delay = None
113
114 - def _LimitedDelay(self, duration):
115 """Custom delay function for C{sched.scheduler}. 116 117 """ 118 if self._max_delay is None: 119 timeout = duration 120 else: 121 timeout = min(duration, self._max_delay.Remaining()) 122 123 return AsyncoreDelayFunction(timeout)
124
125 126 -class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
127 """Base Ganeti Asyncore Dispacher 128 129 """ 130 # this method is overriding an asyncore.dispatcher method
131 - def handle_error(self):
132 """Log an error in handling any request, and proceed. 133 134 """ 135 logging.exception("Error while handling asyncore request")
136 137 # this method is overriding an asyncore.dispatcher method
138 - def writable(self):
139 """Most of the time we don't want to check for writability. 140 141 """ 142 return False
143
144 145 -class AsyncTerminatedMessageStream(asynchat.async_chat):
146 """A terminator separated message stream asyncore module. 147 148 Handles a stream connection receiving messages terminated by a defined 149 separator. For each complete message handle_message is called. 150 151 """
152 - def __init__(self, connected_socket, peer_address, terminator, family, 153 unhandled_limit):
154 """AsyncTerminatedMessageStream constructor. 155 156 @type connected_socket: socket.socket 157 @param connected_socket: connected stream socket to receive messages from 158 @param peer_address: family-specific peer address 159 @type terminator: string 160 @param terminator: terminator separating messages in the stream 161 @type family: integer 162 @param family: socket family 163 @type unhandled_limit: integer or None 164 @param unhandled_limit: maximum unanswered messages 165 166 """ 167 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by 168 # using a positional argument rather than a keyword one. 169 asynchat.async_chat.__init__(self, connected_socket) 170 self.connected_socket = connected_socket 171 # on python 2.4 there is no "family" attribute for the socket class 172 # FIXME: when we move to python 2.5 or above remove the family parameter 173 #self.family = self.connected_socket.family 174 self.family = family 175 self.peer_address = peer_address 176 self.terminator = terminator 177 self.unhandled_limit = unhandled_limit 178 self.set_terminator(terminator) 179 self.ibuffer = [] 180 self.receive_count = 0 181 self.send_count = 0 182 self.oqueue = collections.deque() 183 self.iqueue = collections.deque()
184 185 # this method is overriding an asynchat.async_chat method
186 - def collect_incoming_data(self, data):
187 self.ibuffer.append(data)
188
189 - def _can_handle_message(self):
190 return (self.unhandled_limit is None or 191 (self.receive_count < self.send_count + self.unhandled_limit) and 192 not self.iqueue)
193 194 # this method is overriding an asynchat.async_chat method
195 - def found_terminator(self):
196 message = "".join(self.ibuffer) 197 self.ibuffer = [] 198 message_id = self.receive_count 199 # We need to increase the receive_count after checking if the message can 200 # be handled, but before calling handle_message 201 can_handle = self._can_handle_message() 202 self.receive_count += 1 203 if can_handle: 204 self.handle_message(message, message_id) 205 else: 206 self.iqueue.append((message, message_id))
207
208 - def handle_message(self, message, message_id):
209 """Handle a terminated message. 210 211 @type message: string 212 @param message: message to handle 213 @type message_id: integer 214 @param message_id: stream's message sequence number 215 216 """ 217 pass
218 # TODO: move this method to raise NotImplementedError 219 # raise NotImplementedError 220
221 - def send_message(self, message):
222 """Send a message to the remote peer. This function is thread-safe. 223 224 @type message: string 225 @param message: message to send, without the terminator 226 227 @warning: If calling this function from a thread different than the one 228 performing the main asyncore loop, remember that you have to wake that one 229 up. 230 231 """ 232 # If we just append the message we received to the output queue, this 233 # function can be safely called by multiple threads at the same time, and 234 # we don't need locking, since deques are thread safe. handle_write in the 235 # asyncore thread will handle the next input message if there are any 236 # enqueued. 237 self.oqueue.append(message)
238 239 # this method is overriding an asyncore.dispatcher method
240 - def readable(self):
241 # read from the socket if we can handle the next requests 242 return self._can_handle_message() and asynchat.async_chat.readable(self)
243 244 # this method is overriding an asyncore.dispatcher method
245 - def writable(self):
246 # the output queue may become full just after we called writable. This only 247 # works if we know we'll have something else waking us up from the select, 248 # in such case, anyway. 249 return asynchat.async_chat.writable(self) or self.oqueue
250 251 # this method is overriding an asyncore.dispatcher method
252 - def handle_write(self):
253 if self.oqueue: 254 # if we have data in the output queue, then send_message was called. 255 # this means we can process one more message from the input queue, if 256 # there are any. 257 data = self.oqueue.popleft() 258 self.push(data + self.terminator) 259 self.send_count += 1 260 if self.iqueue: 261 self.handle_message(*self.iqueue.popleft()) 262 self.initiate_send()
263
264 - def close_log(self):
265 logging.info("Closing connection from %s", 266 netutils.FormatAddress(self.peer_address, family=self.family)) 267 self.close()
268 269 # this method is overriding an asyncore.dispatcher method
270 - def handle_expt(self):
271 self.close_log()
272 273 # this method is overriding an asyncore.dispatcher method
274 - def handle_error(self):
275 """Log an error in handling any request, and proceed. 276 277 """ 278 logging.exception("Error while handling asyncore request") 279 self.close_log()
280
281 282 -class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
283 """An improved asyncore udp socket. 284 285 """
286 - def __init__(self, family):
287 """Constructor for AsyncUDPSocket 288 289 """ 290 GanetiBaseAsyncoreDispatcher.__init__(self) 291 self._out_queue = [] 292 self._family = family 293 self.create_socket(family, socket.SOCK_DGRAM)
294 295 # this method is overriding an asyncore.dispatcher method
296 - def handle_connect(self):
297 # Python thinks that the first udp message from a source qualifies as a 298 # "connect" and further ones are part of the same connection. We beg to 299 # differ and treat all messages equally. 300 pass
301 302 # this method is overriding an asyncore.dispatcher method
303 - def handle_read(self):
304 recv_result = utils.IgnoreSignals(self.recvfrom, 305 constants.MAX_UDP_DATA_SIZE) 306 if recv_result is not None: 307 payload, address = recv_result 308 if self._family == socket.AF_INET6: 309 # we ignore 'flow info' and 'scope id' as we don't need them 310 ip, port, _, _ = address 311 else: 312 ip, port = address 313 314 self.handle_datagram(payload, ip, port)
315
316 - def handle_datagram(self, payload, ip, port):
317 """Handle an already read udp datagram 318 319 """ 320 raise NotImplementedError
321 322 # this method is overriding an asyncore.dispatcher method
323 - def writable(self):
324 # We should check whether we can write to the socket only if we have 325 # something scheduled to be written 326 return bool(self._out_queue)
327 328 # this method is overriding an asyncore.dispatcher method
329 - def handle_write(self):
330 if not self._out_queue: 331 logging.error("handle_write called with empty output queue") 332 return 333 (ip, port, payload) = self._out_queue[0] 334 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) 335 self._out_queue.pop(0)
336
337 - def enqueue_send(self, ip, port, payload):
338 """Enqueue a datagram to be sent when possible 339 340 """ 341 if len(payload) > constants.MAX_UDP_DATA_SIZE: 342 raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload), 343 constants.MAX_UDP_DATA_SIZE)) 344 self._out_queue.append((ip, port, payload))
345
346 - def process_next_packet(self, timeout=0):
347 """Process the next datagram, waiting for it if necessary. 348 349 @type timeout: float 350 @param timeout: how long to wait for data 351 @rtype: boolean 352 @return: True if some data has been handled, False otherwise 353 354 """ 355 result = utils.WaitForFdCondition(self, select.POLLIN, timeout) 356 if result is not None and result & select.POLLIN: 357 self.handle_read() 358 return True 359 else: 360 return False
361
362 363 -class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
364 """A way to notify the asyncore loop that something is going on. 365 366 If an asyncore daemon is multithreaded when a thread tries to push some data 367 to a socket, the main loop handling asynchronous requests might be sleeping 368 waiting on a select(). To avoid this it can create an instance of the 369 AsyncAwaker, which other threads can use to wake it up. 370 371 """
372 - def __init__(self, signal_fn=None):
373 """Constructor for AsyncAwaker 374 375 @type signal_fn: function 376 @param signal_fn: function to call when awaken 377 378 """ 379 GanetiBaseAsyncoreDispatcher.__init__(self) 380 assert signal_fn is None or callable(signal_fn) 381 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, 382 socket.SOCK_STREAM) 383 self.in_socket.setblocking(0) 384 self.in_socket.shutdown(socket.SHUT_WR) 385 self.out_socket.shutdown(socket.SHUT_RD) 386 self.set_socket(self.in_socket) 387 self.need_signal = True 388 self.signal_fn = signal_fn 389 self.connected = True
390 391 # this method is overriding an asyncore.dispatcher method
392 - def handle_read(self):
393 utils.IgnoreSignals(self.recv, 4096) 394 if self.signal_fn: 395 self.signal_fn() 396 self.need_signal = True
397 398 # this method is overriding an asyncore.dispatcher method
399 - def close(self):
400 asyncore.dispatcher.close(self) 401 self.out_socket.close()
402
403 - def signal(self):
404 """Signal the asyncore main loop. 405 406 Any data we send here will be ignored, but it will cause the select() call 407 to return. 408 409 """ 410 # Yes, there is a race condition here. No, we don't care, at worst we're 411 # sending more than one wakeup token, which doesn't harm at all. 412 if self.need_signal: 413 self.need_signal = False 414 self.out_socket.send(chr(0))
415
416 417 -class _ShutdownCheck(object):
418 """Logic for L{Mainloop} shutdown. 419 420 """
421 - def __init__(self, fn):
422 """Initializes this class. 423 424 @type fn: callable 425 @param fn: Function returning C{None} if mainloop can be stopped or a 426 duration in seconds after which the function should be called again 427 @see: L{Mainloop.Run} 428 429 """ 430 assert callable(fn) 431 432 self._fn = fn 433 self._defer = None
434
435 - def CanShutdown(self):
436 """Checks whether mainloop can be stopped. 437 438 @rtype: bool 439 440 """ 441 if self._defer and self._defer.Remaining() > 0: 442 # A deferred check has already been scheduled 443 return False 444 445 # Ask mainloop driver whether we can stop or should check again 446 timeout = self._fn() 447 448 if timeout is None: 449 # Yes, can stop mainloop 450 return True 451 452 # Schedule another check in the future 453 self._defer = utils.RunningTimeout(timeout, True) 454 455 return False
456
457 458 -class Mainloop(object):
459 """Generic mainloop for daemons 460 461 @ivar scheduler: A sched.scheduler object, which can be used to register 462 timed events 463 464 """ 465 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1) 466
467 - def __init__(self):
468 """Constructs a new Mainloop instance. 469 470 """ 471 self._signal_wait = [] 472 self.scheduler = AsyncoreScheduler(time.time) 473 474 # Resolve uid/gids used 475 runtime.GetEnts()
476 477 @utils.SignalHandled([signal.SIGCHLD]) 478 @utils.SignalHandled([signal.SIGTERM]) 479 @utils.SignalHandled([signal.SIGINT])
480 - def Run(self, shutdown_wait_fn=None, signal_handlers=None):
481 """Runs the mainloop. 482 483 @type shutdown_wait_fn: callable 484 @param shutdown_wait_fn: Function to check whether loop can be terminated; 485 B{important}: function must be idempotent and must return either None 486 for shutting down or a timeout for another call 487 @type signal_handlers: dict 488 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator 489 490 """ 491 assert isinstance(signal_handlers, dict) and \ 492 len(signal_handlers) > 0, \ 493 "Broken SignalHandled decorator" 494 495 # Counter for received signals 496 shutdown_signals = 0 497 498 # Logic to wait for shutdown 499 shutdown_waiter = None 500 501 # Start actual main loop 502 while True: 503 if shutdown_signals == 1 and shutdown_wait_fn is not None: 504 if shutdown_waiter is None: 505 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn) 506 507 # Let mainloop driver decide if we can already abort 508 if shutdown_waiter.CanShutdown(): 509 break 510 511 # Re-evaluate in a second 512 timeout = 1.0 513 514 elif shutdown_signals >= 1: 515 # Abort loop if more than one signal has been sent or no callback has 516 # been given 517 break 518 519 else: 520 # Wait forever on I/O events 521 timeout = None 522 523 if self.scheduler.empty(): 524 asyncore.loop(count=1, timeout=timeout, use_poll=True) 525 else: 526 try: 527 self.scheduler.run(max_delay=timeout) 528 except SchedulerBreakout: 529 pass 530 531 # Check whether a signal was raised 532 for (sig, handler) in signal_handlers.items(): 533 if handler.called: 534 self._CallSignalWaiters(sig) 535 if sig in (signal.SIGTERM, signal.SIGINT): 536 logging.info("Received signal %s asking for shutdown", sig) 537 shutdown_signals += 1 538 handler.Clear()
539
540 - def _CallSignalWaiters(self, signum):
541 """Calls all signal waiters for a certain signal. 542 543 @type signum: int 544 @param signum: Signal number 545 546 """ 547 for owner in self._signal_wait: 548 owner.OnSignal(signum)
549
550 - def RegisterSignal(self, owner):
551 """Registers a receiver for signal notifications 552 553 The receiver must support a "OnSignal(self, signum)" function. 554 555 @type owner: instance 556 @param owner: Receiver 557 558 """ 559 self._signal_wait.append(owner)
560
561 562 -def _VerifyDaemonUser(daemon_name):
563 """Verifies the process uid matches the configured uid. 564 565 This method verifies that a daemon is started as the user it is 566 intended to be run 567 568 @param daemon_name: The name of daemon to be started 569 @return: A tuple with the first item indicating success or not, 570 the second item current uid and third with expected uid 571 572 """ 573 getents = runtime.GetEnts() 574 running_uid = os.getuid() 575 daemon_uids = { 576 constants.MASTERD: getents.masterd_uid, 577 constants.RAPI: getents.rapi_uid, 578 constants.NODED: getents.noded_uid, 579 constants.CONFD: getents.confd_uid, 580 } 581 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name 582 583 return (daemon_uids[daemon_name] == running_uid, running_uid, 584 daemon_uids[daemon_name])
585
586 587 -def _BeautifyError(err):
588 """Try to format an error better. 589 590 Since we're dealing with daemon startup errors, in many cases this 591 will be due to socket error and such, so we try to format these cases better. 592 593 @param err: an exception object 594 @rtype: string 595 @return: the formatted error description 596 597 """ 598 try: 599 if isinstance(err, socket.error): 600 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0]) 601 elif isinstance(err, EnvironmentError): 602 if err.filename is None: 603 return "%s (errno=%s)" % (err.strerror, err.errno) 604 else: 605 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename, 606 err.errno) 607 else: 608 return str(err) 609 except Exception: # pylint: disable=W0703 610 logging.exception("Error while handling existing error %s", err) 611 return "%s" % str(err)
612
613 614 -def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
615 """Handler for SIGHUP. 616 617 @param reopen_fn: List of callback functions for reopening log files 618 619 """ 620 logging.info("Reopening log files after receiving SIGHUP") 621 622 for fn in reopen_fn: 623 if fn: 624 fn() 625
626 627 -def GenericMain(daemon_name, optionparser, 628 check_fn, prepare_fn, exec_fn, 629 multithreaded=False, console_logging=False, 630 default_ssl_cert=None, default_ssl_key=None, 631 warn_breach=False):
632 """Shared main function for daemons. 633 634 @type daemon_name: string 635 @param daemon_name: daemon name 636 @type optionparser: optparse.OptionParser 637 @param optionparser: initialized optionparser with daemon-specific options 638 (common -f -d options will be handled by this module) 639 @type check_fn: function which accepts (options, args) 640 @param check_fn: function that checks start conditions and exits if they're 641 not met 642 @type prepare_fn: function which accepts (options, args) 643 @param prepare_fn: function that is run before forking, or None; 644 it's result will be passed as the third parameter to exec_fn, or 645 if None was passed in, we will just pass None to exec_fn 646 @type exec_fn: function which accepts (options, args, prepare_results) 647 @param exec_fn: function that's executed with the daemon's pid file held, and 648 runs the daemon itself. 649 @type multithreaded: bool 650 @param multithreaded: Whether the daemon uses threads 651 @type console_logging: boolean 652 @param console_logging: if True, the daemon will fall back to the system 653 console if logging fails 654 @type default_ssl_cert: string 655 @param default_ssl_cert: Default SSL certificate path 656 @type default_ssl_key: string 657 @param default_ssl_key: Default SSL key path 658 @type warn_breach: bool 659 @param warn_breach: issue a warning at daemon launch time, before 660 daemonizing, about the possibility of breaking parameter privacy 661 invariants through the otherwise helpful debug logging. 662 663 """ 664 optionparser.add_option("-f", "--foreground", dest="fork", 665 help="Don't detach from the current terminal", 666 default=True, action="store_false") 667 optionparser.add_option("-d", "--debug", dest="debug", 668 help="Enable some debug messages", 669 default=False, action="store_true") 670 optionparser.add_option("--syslog", dest="syslog", 671 help="Enable logging to syslog (except debug" 672 " messages); one of 'no', 'yes' or 'only' [%s]" % 673 constants.SYSLOG_USAGE, 674 default=constants.SYSLOG_USAGE, 675 choices=["no", "yes", "only"]) 676 677 family = ssconf.SimpleStore().GetPrimaryIPFamily() 678 # family will default to AF_INET if there is no ssconf file (e.g. when 679 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters 680 # <= 2.2 can not be AF_INET6 681 if daemon_name in constants.DAEMONS_PORTS: 682 default_bind_address = constants.IP4_ADDRESS_ANY 683 if family == netutils.IP6Address.family: 684 default_bind_address = constants.IP6_ADDRESS_ANY 685 686 default_port = netutils.GetDaemonPort(daemon_name) 687 688 # For networked daemons we allow choosing the port and bind address 689 optionparser.add_option("-p", "--port", dest="port", 690 help="Network port (default: %s)" % default_port, 691 default=default_port, type="int") 692 optionparser.add_option("-b", "--bind", dest="bind_address", 693 help=("Bind address (default: '%s')" % 694 default_bind_address), 695 default=default_bind_address, metavar="ADDRESS") 696 optionparser.add_option("-i", "--interface", dest="bind_interface", 697 help=("Bind interface"), metavar="INTERFACE") 698 699 if default_ssl_key is not None and default_ssl_cert is not None: 700 optionparser.add_option("--no-ssl", dest="ssl", 701 help="Do not secure HTTP protocol with SSL", 702 default=True, action="store_false") 703 optionparser.add_option("-K", "--ssl-key", dest="ssl_key", 704 help=("SSL key path (default: %s)" % 705 default_ssl_key), 706 default=default_ssl_key, type="string", 707 metavar="SSL_KEY_PATH") 708 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", 709 help=("SSL certificate path (default: %s)" % 710 default_ssl_cert), 711 default=default_ssl_cert, type="string", 712 metavar="SSL_CERT_PATH") 713 714 # Disable the use of fork(2) if the daemon uses threads 715 if multithreaded: 716 utils.DisableFork() 717 718 options, args = optionparser.parse_args() 719 720 if getattr(options, "bind_interface", None) is not None: 721 if options.bind_address != default_bind_address: 722 msg = ("Can't specify both, bind address (%s) and bind interface (%s)" % 723 (options.bind_address, options.bind_interface)) 724 print >> sys.stderr, msg 725 sys.exit(constants.EXIT_FAILURE) 726 interface_ip_addresses = \ 727 netutils.GetInterfaceIpAddresses(options.bind_interface) 728 if family == netutils.IP6Address.family: 729 if_addresses = interface_ip_addresses[constants.IP6_VERSION] 730 else: 731 if_addresses = interface_ip_addresses[constants.IP4_VERSION] 732 if len(if_addresses) < 1: 733 msg = "Failed to find IP for interface %s" % options.bind_interace 734 print >> sys.stderr, msg 735 sys.exit(constants.EXIT_FAILURE) 736 options.bind_address = if_addresses[0] 737 738 if getattr(options, "ssl", False): 739 ssl_paths = { 740 "certificate": options.ssl_cert, 741 "key": options.ssl_key, 742 } 743 744 for name, path in ssl_paths.iteritems(): 745 if not os.path.isfile(path): 746 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path) 747 sys.exit(constants.EXIT_FAILURE) 748 749 # TODO: By initiating http.HttpSslParams here we would only read the files 750 # once and have a proper validation (isfile returns False on directories) 751 # at the same time. 752 753 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name) 754 if not result: 755 msg = ("%s started using wrong user ID (%d), expected %d" % 756 (daemon_name, running_uid, expected_uid)) 757 print >> sys.stderr, msg 758 sys.exit(constants.EXIT_FAILURE) 759 760 if check_fn is not None: 761 check_fn(options, args) 762 763 log_filename = constants.DAEMONS_LOGFILES[daemon_name] 764 765 # node-daemon logging in lib/http/server.py, _HandleServerRequestInner 766 if options.debug and warn_breach: 767 sys.stderr.write(constants.DEBUG_MODE_CONFIDENTIALITY_WARNING % daemon_name) 768 769 if options.fork: 770 # Newer GnuTLS versions (>= 3.3.0) use a library constructor for 771 # initialization and open /dev/urandom on library load time, way before we 772 # fork(). Closing /dev/urandom causes subsequent ganeti.http.client 773 # requests to fail and the process to receive a SIGABRT. As we cannot 774 # reliably detect GnuTLS's socket, we work our way around this by keeping 775 # all fds referring to /dev/urandom open. 776 noclose_fds = [] 777 for fd in os.listdir("/proc/self/fd"): 778 try: 779 if os.readlink(os.path.join("/proc/self/fd", fd)) == "/dev/urandom": 780 noclose_fds.append(int(fd)) 781 except EnvironmentError: 782 # The fd might have disappeared (although it shouldn't as we're running 783 # single-threaded). 784 continue 785 786 utils.CloseFDs(noclose_fds=noclose_fds) 787 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename) 788 else: 789 (wpipe, stdio_reopen_fn) = (None, None) 790 791 log_reopen_fn = \ 792 utils.SetupLogging(log_filename, daemon_name, 793 debug=options.debug, 794 stderr_logging=not options.fork, 795 multithreaded=multithreaded, 796 syslog=options.syslog, 797 console_logging=console_logging) 798 799 # Reopen log file(s) on SIGHUP 800 signal.signal(signal.SIGHUP, 801 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn])) 802 803 try: 804 utils.WritePidFile(utils.DaemonPidFileName(daemon_name)) 805 except errors.PidFileLockError, err: 806 print >> sys.stderr, "Error while locking PID file:\n%s" % err 807 sys.exit(constants.EXIT_FAILURE) 808 809 try: 810 try: 811 logging.info("%s daemon startup", daemon_name) 812 if callable(prepare_fn): 813 prep_results = prepare_fn(options, args) 814 else: 815 prep_results = None 816 except Exception, err: 817 utils.WriteErrorToFD(wpipe, _BeautifyError(err)) 818 raise 819 820 if wpipe is not None: 821 # we're done with the preparation phase, we close the pipe to 822 # let the parent know it's safe to exit 823 os.close(wpipe) 824 825 exec_fn(options, args, prep_results) 826 finally: 827 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
828