Package ganeti :: Module daemon
[hide private]
[frames] | no frames]

Source Code for Module ganeti.daemon

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module with helper classes and functions for daemons""" 
 23   
 24   
 25  import asyncore 
 26  import os 
 27  import signal 
 28  import logging 
 29  import sched 
 30  import time 
 31  import socket 
 32  import select 
 33  import sys 
 34   
 35  from ganeti import utils 
 36  from ganeti import constants 
 37  from ganeti import errors 
38 39 40 -class SchedulerBreakout(Exception):
41 """Exception used to get out of the scheduler loop 42 43 """
44
45 46 -def AsyncoreDelayFunction(timeout):
47 """Asyncore-compatible scheduler delay function. 48 49 This is a delay function for sched that, rather than actually sleeping, 50 executes asyncore events happening in the meantime. 51 52 After an event has occurred, rather than returning, it raises a 53 SchedulerBreakout exception, which will force the current scheduler.run() 54 invocation to terminate, so that we can also check for signals. The main loop 55 will then call the scheduler run again, which will allow it to actually 56 process any due events. 57 58 This is needed because scheduler.run() doesn't support a count=..., as 59 asyncore loop, and the scheduler module documents throwing exceptions from 60 inside the delay function as an allowed usage model. 61 62 """ 63 asyncore.loop(timeout=timeout, count=1, use_poll=True) 64 raise SchedulerBreakout()
65
66 67 -class AsyncoreScheduler(sched.scheduler):
68 """Event scheduler integrated with asyncore 69 70 """
71 - def __init__(self, timefunc):
72 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73
74 75 -class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
76 """Base Ganeti Asyncore Dispacher 77 78 """ 79 # this method is overriding an asyncore.dispatcher method
80 - def handle_error(self):
81 """Log an error in handling any request, and proceed. 82 83 """ 84 logging.exception("Error while handling asyncore request")
85 86 # this method is overriding an asyncore.dispatcher method
87 - def writable(self):
88 """Most of the time we don't want to check for writability. 89 90 """ 91 return False
92
93 94 -class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
95 """An improved asyncore udp socket. 96 97 """
98 - def __init__(self):
99 """Constructor for AsyncUDPSocket 100 101 """ 102 GanetiBaseAsyncoreDispatcher.__init__(self) 103 self._out_queue = [] 104 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
105 106 # this method is overriding an asyncore.dispatcher method
107 - def handle_connect(self):
108 # Python thinks that the first udp message from a source qualifies as a 109 # "connect" and further ones are part of the same connection. We beg to 110 # differ and treat all messages equally. 111 pass
112 113 # this method is overriding an asyncore.dispatcher method
114 - def handle_read(self):
115 recv_result = utils.IgnoreSignals(self.recvfrom, 116 constants.MAX_UDP_DATA_SIZE) 117 if recv_result is not None: 118 payload, address = recv_result 119 ip, port = address 120 self.handle_datagram(payload, ip, port)
121
122 - def handle_datagram(self, payload, ip, port):
123 """Handle an already read udp datagram 124 125 """ 126 raise NotImplementedError
127 128 # this method is overriding an asyncore.dispatcher method
129 - def writable(self):
130 # We should check whether we can write to the socket only if we have 131 # something scheduled to be written 132 return bool(self._out_queue)
133 134 # this method is overriding an asyncore.dispatcher method
135 - def handle_write(self):
136 if not self._out_queue: 137 logging.error("handle_write called with empty output queue") 138 return 139 (ip, port, payload) = self._out_queue[0] 140 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) 141 self._out_queue.pop(0)
142
143 - def enqueue_send(self, ip, port, payload):
144 """Enqueue a datagram to be sent when possible 145 146 """ 147 if len(payload) > constants.MAX_UDP_DATA_SIZE: 148 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload), 149 constants.MAX_UDP_DATA_SIZE)) 150 self._out_queue.append((ip, port, payload))
151
152 - def process_next_packet(self, timeout=0):
153 """Process the next datagram, waiting for it if necessary. 154 155 @type timeout: float 156 @param timeout: how long to wait for data 157 @rtype: boolean 158 @return: True if some data has been handled, False otherwise 159 160 """ 161 result = utils.WaitForFdCondition(self, select.POLLIN, timeout) 162 if result is not None and result & select.POLLIN: 163 self.handle_read() 164 return True 165 else: 166 return False
167
168 169 -class Mainloop(object):
170 """Generic mainloop for daemons 171 172 @ivar scheduler: A sched.scheduler object, which can be used to register 173 timed events 174 175 """
176 - def __init__(self):
177 """Constructs a new Mainloop instance. 178 179 """ 180 self._signal_wait = [] 181 self.scheduler = AsyncoreScheduler(time.time)
182 183 @utils.SignalHandled([signal.SIGCHLD]) 184 @utils.SignalHandled([signal.SIGTERM]) 185 @utils.SignalHandled([signal.SIGINT])
186 - def Run(self, signal_handlers=None):
187 """Runs the mainloop. 188 189 @type signal_handlers: dict 190 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator 191 192 """ 193 assert isinstance(signal_handlers, dict) and \ 194 len(signal_handlers) > 0, \ 195 "Broken SignalHandled decorator" 196 running = True 197 # Start actual main loop 198 while running: 199 if not self.scheduler.empty(): 200 try: 201 self.scheduler.run() 202 except SchedulerBreakout: 203 pass 204 else: 205 asyncore.loop(count=1, use_poll=True) 206 207 # Check whether a signal was raised 208 for sig in signal_handlers: 209 handler = signal_handlers[sig] 210 if handler.called: 211 self._CallSignalWaiters(sig) 212 running = sig not in (signal.SIGTERM, signal.SIGINT) 213 handler.Clear()
214
215 - def _CallSignalWaiters(self, signum):
216 """Calls all signal waiters for a certain signal. 217 218 @type signum: int 219 @param signum: Signal number 220 221 """ 222 for owner in self._signal_wait: 223 owner.OnSignal(signum)
224
225 - def RegisterSignal(self, owner):
226 """Registers a receiver for signal notifications 227 228 The receiver must support a "OnSignal(self, signum)" function. 229 230 @type owner: instance 231 @param owner: Receiver 232 233 """ 234 self._signal_wait.append(owner)
235
236 237 -def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn, 238 console_logging=False):
239 """Shared main function for daemons. 240 241 @type daemon_name: string 242 @param daemon_name: daemon name 243 @type optionparser: optparse.OptionParser 244 @param optionparser: initialized optionparser with daemon-specific options 245 (common -f -d options will be handled by this module) 246 @type dirs: list of (string, integer) 247 @param dirs: list of directories that must be created if they don't exist, 248 and the permissions to be used to create them 249 @type check_fn: function which accepts (options, args) 250 @param check_fn: function that checks start conditions and exits if they're 251 not met 252 @type exec_fn: function which accepts (options, args) 253 @param exec_fn: function that's executed with the daemon's pid file held, and 254 runs the daemon itself. 255 @type console_logging: boolean 256 @param console_logging: if True, the daemon will fall back to the system 257 console if logging fails 258 259 """ 260 optionparser.add_option("-f", "--foreground", dest="fork", 261 help="Don't detach from the current terminal", 262 default=True, action="store_false") 263 optionparser.add_option("-d", "--debug", dest="debug", 264 help="Enable some debug messages", 265 default=False, action="store_true") 266 optionparser.add_option("--syslog", dest="syslog", 267 help="Enable logging to syslog (except debug" 268 " messages); one of 'no', 'yes' or 'only' [%s]" % 269 constants.SYSLOG_USAGE, 270 default=constants.SYSLOG_USAGE, 271 choices=["no", "yes", "only"]) 272 if daemon_name in constants.DAEMONS_PORTS: 273 # for networked daemons we also allow choosing the bind port and address. 274 # by default we use the port provided by utils.GetDaemonPort, and bind to 275 # 0.0.0.0 (which is represented by and empty bind address. 276 port = utils.GetDaemonPort(daemon_name) 277 optionparser.add_option("-p", "--port", dest="port", 278 help="Network port (%s default)." % port, 279 default=port, type="int") 280 optionparser.add_option("-b", "--bind", dest="bind_address", 281 help="Bind address", 282 default="", metavar="ADDRESS") 283 284 if daemon_name in constants.DAEMONS_SSL: 285 default_cert, default_key = constants.DAEMONS_SSL[daemon_name] 286 optionparser.add_option("--no-ssl", dest="ssl", 287 help="Do not secure HTTP protocol with SSL", 288 default=True, action="store_false") 289 optionparser.add_option("-K", "--ssl-key", dest="ssl_key", 290 help="SSL key", 291 default=default_key, type="string") 292 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", 293 help="SSL certificate", 294 default=default_cert, type="string") 295 296 multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS 297 298 options, args = optionparser.parse_args() 299 300 if hasattr(options, 'ssl') and options.ssl: 301 if not (options.ssl_cert and options.ssl_key): 302 print >> sys.stderr, "Need key and certificate to use ssl" 303 sys.exit(constants.EXIT_FAILURE) 304 for fname in (options.ssl_cert, options.ssl_key): 305 if not os.path.isfile(fname): 306 print >> sys.stderr, "Need ssl file %s to run" % fname 307 sys.exit(constants.EXIT_FAILURE) 308 309 if check_fn is not None: 310 check_fn(options, args) 311 312 utils.EnsureDirs(dirs) 313 314 if options.fork: 315 utils.CloseFDs() 316 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name]) 317 318 utils.WritePidFile(daemon_name) 319 try: 320 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name], 321 debug=options.debug, 322 stderr_logging=not options.fork, 323 multithreaded=multithread, 324 program=daemon_name, 325 syslog=options.syslog, 326 console_logging=console_logging) 327 logging.info("%s daemon startup", daemon_name) 328 exec_fn(options, args) 329 finally: 330 utils.RemovePidFile(daemon_name)
331