1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import os
27 import signal
28 import logging
29 import sched
30 import time
31 import socket
32 import select
33 import sys
34
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
41 """Exception used to get out of the scheduler loop
42
43 """
44
47 """Asyncore-compatible scheduler delay function.
48
49 This is a delay function for sched that, rather than actually sleeping,
50 executes asyncore events happening in the meantime.
51
52 After an event has occurred, rather than returning, it raises a
53 SchedulerBreakout exception, which will force the current scheduler.run()
54 invocation to terminate, so that we can also check for signals. The main loop
55 will then call the scheduler run again, which will allow it to actually
56 process any due events.
57
58 This is needed because scheduler.run() doesn't support a count=..., as
59 asyncore loop, and the scheduler module documents throwing exceptions from
60 inside the delay function as an allowed usage model.
61
62 """
63 asyncore.loop(timeout=timeout, count=1, use_poll=True)
64 raise SchedulerBreakout()
65
68 """Event scheduler integrated with asyncore
69
70 """
73
76 """Base Ganeti Asyncore Dispacher
77
78 """
79
81 """Log an error in handling any request, and proceed.
82
83 """
84 logging.exception("Error while handling asyncore request")
85
86
88 """Most of the time we don't want to check for writability.
89
90 """
91 return False
92
95 """An improved asyncore udp socket.
96
97 """
105
106
112
113
121
123 """Handle an already read udp datagram
124
125 """
126 raise NotImplementedError
127
128
130
131
132 return bool(self._out_queue)
133
134
136 if not self._out_queue:
137 logging.error("handle_write called with empty output queue")
138 return
139 (ip, port, payload) = self._out_queue[0]
140 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
141 self._out_queue.pop(0)
142
151
153 """Process the next datagram, waiting for it if necessary.
154
155 @type timeout: float
156 @param timeout: how long to wait for data
157 @rtype: boolean
158 @return: True if some data has been handled, False otherwise
159
160 """
161 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
162 if result is not None and result & select.POLLIN:
163 self.handle_read()
164 return True
165 else:
166 return False
167
168
169 -class Mainloop(object):
170 """Generic mainloop for daemons
171
172 @ivar scheduler: A sched.scheduler object, which can be used to register
173 timed events
174
175 """
176 - def __init__(self):
177 """Constructs a new Mainloop instance.
178
179 """
180 self._signal_wait = []
181 self.scheduler = AsyncoreScheduler(time.time)
182
183 @utils.SignalHandled([signal.SIGCHLD])
184 @utils.SignalHandled([signal.SIGTERM])
185 @utils.SignalHandled([signal.SIGINT])
186 - def Run(self, signal_handlers=None):
187 """Runs the mainloop.
188
189 @type signal_handlers: dict
190 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
191
192 """
193 assert isinstance(signal_handlers, dict) and \
194 len(signal_handlers) > 0, \
195 "Broken SignalHandled decorator"
196 running = True
197
198 while running:
199 if not self.scheduler.empty():
200 try:
201 self.scheduler.run()
202 except SchedulerBreakout:
203 pass
204 else:
205 asyncore.loop(count=1, use_poll=True)
206
207
208 for sig in signal_handlers:
209 handler = signal_handlers[sig]
210 if handler.called:
211 self._CallSignalWaiters(sig)
212 running = sig not in (signal.SIGTERM, signal.SIGINT)
213 handler.Clear()
214
215 - def _CallSignalWaiters(self, signum):
216 """Calls all signal waiters for a certain signal.
217
218 @type signum: int
219 @param signum: Signal number
220
221 """
222 for owner in self._signal_wait:
223 owner.OnSignal(signum)
224
225 - def RegisterSignal(self, owner):
226 """Registers a receiver for signal notifications
227
228 The receiver must support a "OnSignal(self, signum)" function.
229
230 @type owner: instance
231 @param owner: Receiver
232
233 """
234 self._signal_wait.append(owner)
235
236
237 -def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
238 console_logging=False):
239 """Shared main function for daemons.
240
241 @type daemon_name: string
242 @param daemon_name: daemon name
243 @type optionparser: optparse.OptionParser
244 @param optionparser: initialized optionparser with daemon-specific options
245 (common -f -d options will be handled by this module)
246 @type dirs: list of (string, integer)
247 @param dirs: list of directories that must be created if they don't exist,
248 and the permissions to be used to create them
249 @type check_fn: function which accepts (options, args)
250 @param check_fn: function that checks start conditions and exits if they're
251 not met
252 @type exec_fn: function which accepts (options, args)
253 @param exec_fn: function that's executed with the daemon's pid file held, and
254 runs the daemon itself.
255 @type console_logging: boolean
256 @param console_logging: if True, the daemon will fall back to the system
257 console if logging fails
258
259 """
260 optionparser.add_option("-f", "--foreground", dest="fork",
261 help="Don't detach from the current terminal",
262 default=True, action="store_false")
263 optionparser.add_option("-d", "--debug", dest="debug",
264 help="Enable some debug messages",
265 default=False, action="store_true")
266 optionparser.add_option("--syslog", dest="syslog",
267 help="Enable logging to syslog (except debug"
268 " messages); one of 'no', 'yes' or 'only' [%s]" %
269 constants.SYSLOG_USAGE,
270 default=constants.SYSLOG_USAGE,
271 choices=["no", "yes", "only"])
272 if daemon_name in constants.DAEMONS_PORTS:
273
274
275
276 port = utils.GetDaemonPort(daemon_name)
277 optionparser.add_option("-p", "--port", dest="port",
278 help="Network port (%s default)." % port,
279 default=port, type="int")
280 optionparser.add_option("-b", "--bind", dest="bind_address",
281 help="Bind address",
282 default="", metavar="ADDRESS")
283
284 if daemon_name in constants.DAEMONS_SSL:
285 default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
286 optionparser.add_option("--no-ssl", dest="ssl",
287 help="Do not secure HTTP protocol with SSL",
288 default=True, action="store_false")
289 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
290 help="SSL key",
291 default=default_key, type="string")
292 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
293 help="SSL certificate",
294 default=default_cert, type="string")
295
296 multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
297
298 options, args = optionparser.parse_args()
299
300 if hasattr(options, 'ssl') and options.ssl:
301 if not (options.ssl_cert and options.ssl_key):
302 print >> sys.stderr, "Need key and certificate to use ssl"
303 sys.exit(constants.EXIT_FAILURE)
304 for fname in (options.ssl_cert, options.ssl_key):
305 if not os.path.isfile(fname):
306 print >> sys.stderr, "Need ssl file %s to run" % fname
307 sys.exit(constants.EXIT_FAILURE)
308
309 if check_fn is not None:
310 check_fn(options, args)
311
312 utils.EnsureDirs(dirs)
313
314 if options.fork:
315 utils.CloseFDs()
316 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
317
318 utils.WritePidFile(daemon_name)
319 try:
320 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
321 debug=options.debug,
322 stderr_logging=not options.fork,
323 multithreaded=multithread,
324 program=daemon_name,
325 syslog=options.syslog,
326 console_logging=console_logging)
327 logging.info("%s daemon startup", daemon_name)
328 exec_fn(options, args)
329 finally:
330 utils.RemovePidFile(daemon_name)
331