Package ganeti :: Module daemon
[hide private]
[frames] | no frames]

Source Code for Module ganeti.daemon

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module with helper classes and functions for daemons""" 
 23   
 24   
 25  import asyncore 
 26  import asynchat 
 27  import collections 
 28  import os 
 29  import signal 
 30  import logging 
 31  import sched 
 32  import time 
 33  import socket 
 34  import select 
 35  import sys 
 36   
 37  from ganeti import utils 
 38  from ganeti import constants 
 39  from ganeti import errors 
 40  from ganeti import netutils 
 41  from ganeti import ssconf 
 42  from ganeti import runtime 
 43  from ganeti import compat 
44 45 46 -class SchedulerBreakout(Exception):
47 """Exception used to get out of the scheduler loop 48 49 """
50
51 52 -def AsyncoreDelayFunction(timeout):
53 """Asyncore-compatible scheduler delay function. 54 55 This is a delay function for sched that, rather than actually sleeping, 56 executes asyncore events happening in the meantime. 57 58 After an event has occurred, rather than returning, it raises a 59 SchedulerBreakout exception, which will force the current scheduler.run() 60 invocation to terminate, so that we can also check for signals. The main loop 61 will then call the scheduler run again, which will allow it to actually 62 process any due events. 63 64 This is needed because scheduler.run() doesn't support a count=..., as 65 asyncore loop, and the scheduler module documents throwing exceptions from 66 inside the delay function as an allowed usage model. 67 68 """ 69 asyncore.loop(timeout=timeout, count=1, use_poll=True) 70 raise SchedulerBreakout()
71
72 73 -class AsyncoreScheduler(sched.scheduler):
74 """Event scheduler integrated with asyncore 75 76 """
77 - def __init__(self, timefunc):
78 """Initializes this class. 79 80 """ 81 sched.scheduler.__init__(self, timefunc, self._LimitedDelay) 82 self._max_delay = None
83
84 - def run(self, max_delay=None): # pylint: disable=W0221
85 """Run any pending events. 86 87 @type max_delay: None or number 88 @param max_delay: Maximum delay (useful if caller has timeouts running) 89 90 """ 91 assert self._max_delay is None 92 93 # The delay function used by the scheduler can't be different on each run, 94 # hence an instance variable must be used. 95 if max_delay is None: 96 self._max_delay = None 97 else: 98 self._max_delay = utils.RunningTimeout(max_delay, False) 99 100 try: 101 return sched.scheduler.run(self) 102 finally: 103 self._max_delay = None
104
105 - def _LimitedDelay(self, duration):
106 """Custom delay function for C{sched.scheduler}. 107 108 """ 109 if self._max_delay is None: 110 timeout = duration 111 else: 112 timeout = min(duration, self._max_delay.Remaining()) 113 114 return AsyncoreDelayFunction(timeout)
115
116 117 -class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
118 """Base Ganeti Asyncore Dispacher 119 120 """ 121 # this method is overriding an asyncore.dispatcher method
122 - def handle_error(self):
123 """Log an error in handling any request, and proceed. 124 125 """ 126 logging.exception("Error while handling asyncore request")
127 128 # this method is overriding an asyncore.dispatcher method
129 - def writable(self):
130 """Most of the time we don't want to check for writability. 131 132 """ 133 return False
134
135 136 -class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
137 """A stream server to use with asyncore. 138 139 Each request is accepted, and then dispatched to a separate asyncore 140 dispatcher to handle. 141 142 """ 143 144 _REQUEST_QUEUE_SIZE = 5 145
146 - def __init__(self, family, address):
147 """Constructor for AsyncUnixStreamSocket 148 149 @type family: integer 150 @param family: socket family (one of socket.AF_*) 151 @type address: address family dependent 152 @param address: address to bind the socket to 153 154 """ 155 GanetiBaseAsyncoreDispatcher.__init__(self) 156 self.family = family 157 self.create_socket(self.family, socket.SOCK_STREAM) 158 self.set_reuse_addr() 159 self.bind(address) 160 self.listen(self._REQUEST_QUEUE_SIZE)
161 162 # this method is overriding an asyncore.dispatcher method
163 - def handle_accept(self):
164 """Accept a new client connection. 165 166 Creates a new instance of the handler class, which will use asyncore to 167 serve the client. 168 169 """ 170 accept_result = utils.IgnoreSignals(self.accept) 171 if accept_result is not None: 172 connected_socket, client_address = accept_result 173 if self.family == socket.AF_UNIX: 174 # override the client address, as for unix sockets nothing meaningful 175 # is passed in from accept anyway 176 client_address = netutils.GetSocketCredentials(connected_socket) 177 logging.info("Accepted connection from %s", 178 netutils.FormatAddress(client_address, family=self.family)) 179 self.handle_connection(connected_socket, client_address)
180
181 - def handle_connection(self, connected_socket, client_address):
182 """Handle an already accepted connection. 183 184 """ 185 raise NotImplementedError
186
187 188 -class AsyncTerminatedMessageStream(asynchat.async_chat):
189 """A terminator separated message stream asyncore module. 190 191 Handles a stream connection receiving messages terminated by a defined 192 separator. For each complete message handle_message is called. 193 194 """
195 - def __init__(self, connected_socket, peer_address, terminator, family, 196 unhandled_limit):
197 """AsyncTerminatedMessageStream constructor. 198 199 @type connected_socket: socket.socket 200 @param connected_socket: connected stream socket to receive messages from 201 @param peer_address: family-specific peer address 202 @type terminator: string 203 @param terminator: terminator separating messages in the stream 204 @type family: integer 205 @param family: socket family 206 @type unhandled_limit: integer or None 207 @param unhandled_limit: maximum unanswered messages 208 209 """ 210 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by 211 # using a positional argument rather than a keyword one. 212 asynchat.async_chat.__init__(self, connected_socket) 213 self.connected_socket = connected_socket 214 # on python 2.4 there is no "family" attribute for the socket class 215 # FIXME: when we move to python 2.5 or above remove the family parameter 216 #self.family = self.connected_socket.family 217 self.family = family 218 self.peer_address = peer_address 219 self.terminator = terminator 220 self.unhandled_limit = unhandled_limit 221 self.set_terminator(terminator) 222 self.ibuffer = [] 223 self.receive_count = 0 224 self.send_count = 0 225 self.oqueue = collections.deque() 226 self.iqueue = collections.deque()
227 228 # this method is overriding an asynchat.async_chat method
229 - def collect_incoming_data(self, data):
230 self.ibuffer.append(data)
231
232 - def _can_handle_message(self):
233 return (self.unhandled_limit is None or 234 (self.receive_count < self.send_count + self.unhandled_limit) and 235 not self.iqueue)
236 237 # this method is overriding an asynchat.async_chat method
238 - def found_terminator(self):
239 message = "".join(self.ibuffer) 240 self.ibuffer = [] 241 message_id = self.receive_count 242 # We need to increase the receive_count after checking if the message can 243 # be handled, but before calling handle_message 244 can_handle = self._can_handle_message() 245 self.receive_count += 1 246 if can_handle: 247 self.handle_message(message, message_id) 248 else: 249 self.iqueue.append((message, message_id))
250
251 - def handle_message(self, message, message_id):
252 """Handle a terminated message. 253 254 @type message: string 255 @param message: message to handle 256 @type message_id: integer 257 @param message_id: stream's message sequence number 258 259 """ 260 pass
261 # TODO: move this method to raise NotImplementedError 262 # raise NotImplementedError 263
264 - def send_message(self, message):
265 """Send a message to the remote peer. This function is thread-safe. 266 267 @type message: string 268 @param message: message to send, without the terminator 269 270 @warning: If calling this function from a thread different than the one 271 performing the main asyncore loop, remember that you have to wake that one 272 up. 273 274 """ 275 # If we just append the message we received to the output queue, this 276 # function can be safely called by multiple threads at the same time, and 277 # we don't need locking, since deques are thread safe. handle_write in the 278 # asyncore thread will handle the next input message if there are any 279 # enqueued. 280 self.oqueue.append(message)
281 282 # this method is overriding an asyncore.dispatcher method
283 - def readable(self):
284 # read from the socket if we can handle the next requests 285 return self._can_handle_message() and asynchat.async_chat.readable(self)
286 287 # this method is overriding an asyncore.dispatcher method
288 - def writable(self):
289 # the output queue may become full just after we called writable. This only 290 # works if we know we'll have something else waking us up from the select, 291 # in such case, anyway. 292 return asynchat.async_chat.writable(self) or self.oqueue
293 294 # this method is overriding an asyncore.dispatcher method
295 - def handle_write(self):
296 if self.oqueue: 297 # if we have data in the output queue, then send_message was called. 298 # this means we can process one more message from the input queue, if 299 # there are any. 300 data = self.oqueue.popleft() 301 self.push(data + self.terminator) 302 self.send_count += 1 303 if self.iqueue: 304 self.handle_message(*self.iqueue.popleft()) 305 self.initiate_send()
306
307 - def close_log(self):
308 logging.info("Closing connection from %s", 309 netutils.FormatAddress(self.peer_address, family=self.family)) 310 self.close()
311 312 # this method is overriding an asyncore.dispatcher method
313 - def handle_expt(self):
314 self.close_log()
315 316 # this method is overriding an asyncore.dispatcher method
317 - def handle_error(self):
318 """Log an error in handling any request, and proceed. 319 320 """ 321 logging.exception("Error while handling asyncore request") 322 self.close_log()
323
324 325 -class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
326 """An improved asyncore udp socket. 327 328 """
329 - def __init__(self, family):
330 """Constructor for AsyncUDPSocket 331 332 """ 333 GanetiBaseAsyncoreDispatcher.__init__(self) 334 self._out_queue = [] 335 self._family = family 336 self.create_socket(family, socket.SOCK_DGRAM)
337 338 # this method is overriding an asyncore.dispatcher method
339 - def handle_connect(self):
340 # Python thinks that the first udp message from a source qualifies as a 341 # "connect" and further ones are part of the same connection. We beg to 342 # differ and treat all messages equally. 343 pass
344 345 # this method is overriding an asyncore.dispatcher method
346 - def handle_read(self):
347 recv_result = utils.IgnoreSignals(self.recvfrom, 348 constants.MAX_UDP_DATA_SIZE) 349 if recv_result is not None: 350 payload, address = recv_result 351 if self._family == socket.AF_INET6: 352 # we ignore 'flow info' and 'scope id' as we don't need them 353 ip, port, _, _ = address 354 else: 355 ip, port = address 356 357 self.handle_datagram(payload, ip, port)
358
359 - def handle_datagram(self, payload, ip, port):
360 """Handle an already read udp datagram 361 362 """ 363 raise NotImplementedError
364 365 # this method is overriding an asyncore.dispatcher method
366 - def writable(self):
367 # We should check whether we can write to the socket only if we have 368 # something scheduled to be written 369 return bool(self._out_queue)
370 371 # this method is overriding an asyncore.dispatcher method
372 - def handle_write(self):
373 if not self._out_queue: 374 logging.error("handle_write called with empty output queue") 375 return 376 (ip, port, payload) = self._out_queue[0] 377 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) 378 self._out_queue.pop(0)
379
380 - def enqueue_send(self, ip, port, payload):
381 """Enqueue a datagram to be sent when possible 382 383 """ 384 if len(payload) > constants.MAX_UDP_DATA_SIZE: 385 raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload), 386 constants.MAX_UDP_DATA_SIZE)) 387 self._out_queue.append((ip, port, payload))
388
389 - def process_next_packet(self, timeout=0):
390 """Process the next datagram, waiting for it if necessary. 391 392 @type timeout: float 393 @param timeout: how long to wait for data 394 @rtype: boolean 395 @return: True if some data has been handled, False otherwise 396 397 """ 398 result = utils.WaitForFdCondition(self, select.POLLIN, timeout) 399 if result is not None and result & select.POLLIN: 400 self.handle_read() 401 return True 402 else: 403 return False
404
405 406 -class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
407 """A way to notify the asyncore loop that something is going on. 408 409 If an asyncore daemon is multithreaded when a thread tries to push some data 410 to a socket, the main loop handling asynchronous requests might be sleeping 411 waiting on a select(). To avoid this it can create an instance of the 412 AsyncAwaker, which other threads can use to wake it up. 413 414 """
415 - def __init__(self, signal_fn=None):
416 """Constructor for AsyncAwaker 417 418 @type signal_fn: function 419 @param signal_fn: function to call when awaken 420 421 """ 422 GanetiBaseAsyncoreDispatcher.__init__(self) 423 assert signal_fn is None or callable(signal_fn) 424 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, 425 socket.SOCK_STREAM) 426 self.in_socket.setblocking(0) 427 self.in_socket.shutdown(socket.SHUT_WR) 428 self.out_socket.shutdown(socket.SHUT_RD) 429 self.set_socket(self.in_socket) 430 self.need_signal = True 431 self.signal_fn = signal_fn 432 self.connected = True
433 434 # this method is overriding an asyncore.dispatcher method
435 - def handle_read(self):
436 utils.IgnoreSignals(self.recv, 4096) 437 if self.signal_fn: 438 self.signal_fn() 439 self.need_signal = True
440 441 # this method is overriding an asyncore.dispatcher method
442 - def close(self):
443 asyncore.dispatcher.close(self) 444 self.out_socket.close()
445
446 - def signal(self):
447 """Signal the asyncore main loop. 448 449 Any data we send here will be ignored, but it will cause the select() call 450 to return. 451 452 """ 453 # Yes, there is a race condition here. No, we don't care, at worst we're 454 # sending more than one wakeup token, which doesn't harm at all. 455 if self.need_signal: 456 self.need_signal = False 457 self.out_socket.send("\0")
458
459 460 -class _ShutdownCheck:
461 """Logic for L{Mainloop} shutdown. 462 463 """
464 - def __init__(self, fn):
465 """Initializes this class. 466 467 @type fn: callable 468 @param fn: Function returning C{None} if mainloop can be stopped or a 469 duration in seconds after which the function should be called again 470 @see: L{Mainloop.Run} 471 472 """ 473 assert callable(fn) 474 475 self._fn = fn 476 self._defer = None
477
478 - def CanShutdown(self):
479 """Checks whether mainloop can be stopped. 480 481 @rtype: bool 482 483 """ 484 if self._defer and self._defer.Remaining() > 0: 485 # A deferred check has already been scheduled 486 return False 487 488 # Ask mainloop driver whether we can stop or should check again 489 timeout = self._fn() 490 491 if timeout is None: 492 # Yes, can stop mainloop 493 return True 494 495 # Schedule another check in the future 496 self._defer = utils.RunningTimeout(timeout, True) 497 498 return False
499
500 501 -class Mainloop(object):
502 """Generic mainloop for daemons 503 504 @ivar scheduler: A sched.scheduler object, which can be used to register 505 timed events 506 507 """ 508 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1) 509
510 - def __init__(self):
511 """Constructs a new Mainloop instance. 512 513 """ 514 self._signal_wait = [] 515 self.scheduler = AsyncoreScheduler(time.time) 516 517 # Resolve uid/gids used 518 runtime.GetEnts()
519 520 @utils.SignalHandled([signal.SIGCHLD]) 521 @utils.SignalHandled([signal.SIGTERM]) 522 @utils.SignalHandled([signal.SIGINT])
523 - def Run(self, shutdown_wait_fn=None, signal_handlers=None):
524 """Runs the mainloop. 525 526 @type shutdown_wait_fn: callable 527 @param shutdown_wait_fn: Function to check whether loop can be terminated; 528 B{important}: function must be idempotent and must return either None 529 for shutting down or a timeout for another call 530 @type signal_handlers: dict 531 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator 532 533 """ 534 assert isinstance(signal_handlers, dict) and \ 535 len(signal_handlers) > 0, \ 536 "Broken SignalHandled decorator" 537 538 # Counter for received signals 539 shutdown_signals = 0 540 541 # Logic to wait for shutdown 542 shutdown_waiter = None 543 544 # Start actual main loop 545 while True: 546 if shutdown_signals == 1 and shutdown_wait_fn is not None: 547 if shutdown_waiter is None: 548 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn) 549 550 # Let mainloop driver decide if we can already abort 551 if shutdown_waiter.CanShutdown(): 552 break 553 554 # Re-evaluate in a second 555 timeout = 1.0 556 557 elif shutdown_signals >= 1: 558 # Abort loop if more than one signal has been sent or no callback has 559 # been given 560 break 561 562 else: 563 # Wait forever on I/O events 564 timeout = None 565 566 if self.scheduler.empty(): 567 asyncore.loop(count=1, timeout=timeout, use_poll=True) 568 else: 569 try: 570 self.scheduler.run(max_delay=timeout) 571 except SchedulerBreakout: 572 pass 573 574 # Check whether a signal was raised 575 for (sig, handler) in signal_handlers.items(): 576 if handler.called: 577 self._CallSignalWaiters(sig) 578 if sig in (signal.SIGTERM, signal.SIGINT): 579 logging.info("Received signal %s asking for shutdown", sig) 580 shutdown_signals += 1 581 handler.Clear()
582
583 - def _CallSignalWaiters(self, signum):
584 """Calls all signal waiters for a certain signal. 585 586 @type signum: int 587 @param signum: Signal number 588 589 """ 590 for owner in self._signal_wait: 591 owner.OnSignal(signum)
592
593 - def RegisterSignal(self, owner):
594 """Registers a receiver for signal notifications 595 596 The receiver must support a "OnSignal(self, signum)" function. 597 598 @type owner: instance 599 @param owner: Receiver 600 601 """ 602 self._signal_wait.append(owner)
603
604 605 -def _VerifyDaemonUser(daemon_name):
606 """Verifies the process uid matches the configured uid. 607 608 This method verifies that a daemon is started as the user it is 609 intended to be run 610 611 @param daemon_name: The name of daemon to be started 612 @return: A tuple with the first item indicating success or not, 613 the second item current uid and third with expected uid 614 615 """ 616 getents = runtime.GetEnts() 617 running_uid = os.getuid() 618 daemon_uids = { 619 constants.MASTERD: getents.masterd_uid, 620 constants.RAPI: getents.rapi_uid, 621 constants.NODED: getents.noded_uid, 622 constants.CONFD: getents.confd_uid, 623 } 624 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name 625 626 return (daemon_uids[daemon_name] == running_uid, running_uid, 627 daemon_uids[daemon_name])
628
629 630 -def _BeautifyError(err):
631 """Try to format an error better. 632 633 Since we're dealing with daemon startup errors, in many cases this 634 will be due to socket error and such, so we try to format these cases better. 635 636 @param err: an exception object 637 @rtype: string 638 @return: the formatted error description 639 640 """ 641 try: 642 if isinstance(err, socket.error): 643 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0]) 644 elif isinstance(err, EnvironmentError): 645 if err.filename is None: 646 return "%s (errno=%s)" % (err.strerror, err.errno) 647 else: 648 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename, 649 err.errno) 650 else: 651 return str(err) 652 except Exception: # pylint: disable=W0703 653 logging.exception("Error while handling existing error %s", err) 654 return "%s" % str(err)
655
656 657 -def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
658 """Handler for SIGHUP. 659 660 @param reopen_fn: List of callback functions for reopening log files 661 662 """ 663 logging.info("Reopening log files after receiving SIGHUP") 664 665 for fn in reopen_fn: 666 if fn: 667 fn() 668
669 670 -def GenericMain(daemon_name, optionparser, 671 check_fn, prepare_fn, exec_fn, 672 multithreaded=False, console_logging=False, 673 default_ssl_cert=None, default_ssl_key=None):
674 """Shared main function for daemons. 675 676 @type daemon_name: string 677 @param daemon_name: daemon name 678 @type optionparser: optparse.OptionParser 679 @param optionparser: initialized optionparser with daemon-specific options 680 (common -f -d options will be handled by this module) 681 @type check_fn: function which accepts (options, args) 682 @param check_fn: function that checks start conditions and exits if they're 683 not met 684 @type prepare_fn: function which accepts (options, args) 685 @param prepare_fn: function that is run before forking, or None; 686 it's result will be passed as the third parameter to exec_fn, or 687 if None was passed in, we will just pass None to exec_fn 688 @type exec_fn: function which accepts (options, args, prepare_results) 689 @param exec_fn: function that's executed with the daemon's pid file held, and 690 runs the daemon itself. 691 @type multithreaded: bool 692 @param multithreaded: Whether the daemon uses threads 693 @type console_logging: boolean 694 @param console_logging: if True, the daemon will fall back to the system 695 console if logging fails 696 @type default_ssl_cert: string 697 @param default_ssl_cert: Default SSL certificate path 698 @type default_ssl_key: string 699 @param default_ssl_key: Default SSL key path 700 701 """ 702 optionparser.add_option("-f", "--foreground", dest="fork", 703 help="Don't detach from the current terminal", 704 default=True, action="store_false") 705 optionparser.add_option("-d", "--debug", dest="debug", 706 help="Enable some debug messages", 707 default=False, action="store_true") 708 optionparser.add_option("--syslog", dest="syslog", 709 help="Enable logging to syslog (except debug" 710 " messages); one of 'no', 'yes' or 'only' [%s]" % 711 constants.SYSLOG_USAGE, 712 default=constants.SYSLOG_USAGE, 713 choices=["no", "yes", "only"]) 714 715 if daemon_name in constants.DAEMONS_PORTS: 716 default_bind_address = constants.IP4_ADDRESS_ANY 717 family = ssconf.SimpleStore().GetPrimaryIPFamily() 718 # family will default to AF_INET if there is no ssconf file (e.g. when 719 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters 720 # <= 2.2 can not be AF_INET6 721 if family == netutils.IP6Address.family: 722 default_bind_address = constants.IP6_ADDRESS_ANY 723 724 default_port = netutils.GetDaemonPort(daemon_name) 725 726 # For networked daemons we allow choosing the port and bind address 727 optionparser.add_option("-p", "--port", dest="port", 728 help="Network port (default: %s)" % default_port, 729 default=default_port, type="int") 730 optionparser.add_option("-b", "--bind", dest="bind_address", 731 help=("Bind address (default: '%s')" % 732 default_bind_address), 733 default=default_bind_address, metavar="ADDRESS") 734 735 if default_ssl_key is not None and default_ssl_cert is not None: 736 optionparser.add_option("--no-ssl", dest="ssl", 737 help="Do not secure HTTP protocol with SSL", 738 default=True, action="store_false") 739 optionparser.add_option("-K", "--ssl-key", dest="ssl_key", 740 help=("SSL key path (default: %s)" % 741 default_ssl_key), 742 default=default_ssl_key, type="string", 743 metavar="SSL_KEY_PATH") 744 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", 745 help=("SSL certificate path (default: %s)" % 746 default_ssl_cert), 747 default=default_ssl_cert, type="string", 748 metavar="SSL_CERT_PATH") 749 750 # Disable the use of fork(2) if the daemon uses threads 751 if multithreaded: 752 utils.DisableFork() 753 754 options, args = optionparser.parse_args() 755 756 if getattr(options, "ssl", False): 757 ssl_paths = { 758 "certificate": options.ssl_cert, 759 "key": options.ssl_key, 760 } 761 762 for name, path in ssl_paths.iteritems(): 763 if not os.path.isfile(path): 764 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path) 765 sys.exit(constants.EXIT_FAILURE) 766 767 # TODO: By initiating http.HttpSslParams here we would only read the files 768 # once and have a proper validation (isfile returns False on directories) 769 # at the same time. 770 771 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name) 772 if not result: 773 msg = ("%s started using wrong user ID (%d), expected %d" % 774 (daemon_name, running_uid, expected_uid)) 775 print >> sys.stderr, msg 776 sys.exit(constants.EXIT_FAILURE) 777 778 if check_fn is not None: 779 check_fn(options, args) 780 781 log_filename = constants.DAEMONS_LOGFILES[daemon_name] 782 783 if options.fork: 784 utils.CloseFDs() 785 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename) 786 else: 787 (wpipe, stdio_reopen_fn) = (None, None) 788 789 log_reopen_fn = \ 790 utils.SetupLogging(log_filename, daemon_name, 791 debug=options.debug, 792 stderr_logging=not options.fork, 793 multithreaded=multithreaded, 794 syslog=options.syslog, 795 console_logging=console_logging) 796 797 # Reopen log file(s) on SIGHUP 798 signal.signal(signal.SIGHUP, 799 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn])) 800 801 try: 802 utils.WritePidFile(utils.DaemonPidFileName(daemon_name)) 803 except errors.PidFileLockError, err: 804 print >> sys.stderr, "Error while locking PID file:\n%s" % err 805 sys.exit(constants.EXIT_FAILURE) 806 807 try: 808 try: 809 logging.info("%s daemon startup", daemon_name) 810 if callable(prepare_fn): 811 prep_results = prepare_fn(options, args) 812 else: 813 prep_results = None 814 except Exception, err: 815 utils.WriteErrorToFD(wpipe, _BeautifyError(err)) 816 raise 817 818 if wpipe is not None: 819 # we're done with the preparation phase, we close the pipe to 820 # let the parent know it's safe to exit 821 os.close(wpipe) 822 823 exec_fn(options, args, prep_results) 824 finally: 825 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))
826