1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Master daemon program.
32
33 Some classes deviates from the standard style guide since the
34 inheritance from parent classes requires it.
35
36 """
37
38
39
40
41 import os
42 import sys
43 import socket
44 import time
45 import tempfile
46 import logging
47
48
49 from ganeti import config
50 from ganeti import constants
51 from ganeti import daemon
52 from ganeti import jqueue
53 from ganeti import luxi
54 import ganeti.rpc.errors as rpcerr
55 from ganeti import utils
56 from ganeti import errors
57 from ganeti import workerpool
58 import ganeti.rpc.node as rpc
59 import ganeti.rpc.client as rpccl
60 from ganeti import ht
61
62
63 CLIENT_REQUEST_WORKERS = 16
64
65 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
66 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
70 """Log information about a recently submitted job.
71
72 """
73 op_summary = utils.CommaJoin(op.Summary() for op in ops)
74
75 if status:
76 logging.info("New job with id %s, summary: %s", info, op_summary)
77 else:
78 logging.info("Failed to submit job, reason: '%s', summary: %s",
79 info, op_summary)
80
83
84 - def RunTask(self, server, message, client):
85 """Process the request.
86
87 """
88 client_ops = ClientOps(server)
89
90 try:
91 (method, args, ver) = rpccl.ParseRequest(message)
92 except rpcerr.ProtocolError, err:
93 logging.error("Protocol Error: %s", err)
94 client.close_log()
95 return
96
97 success = False
98 try:
99
100 if ver is not None and ver != constants.LUXI_VERSION:
101 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
102 (constants.LUXI_VERSION, ver))
103
104 result = client_ops.handle_request(method, args)
105 success = True
106 except errors.GenericError, err:
107 logging.exception("Unexpected exception")
108 success = False
109 result = errors.EncodeException(err)
110 except:
111 logging.exception("Unexpected exception")
112 err = sys.exc_info()
113 result = "Caught exception: %s" % str(err[1])
114
115 try:
116 reply = rpccl.FormatResponse(success, result)
117 client.send_message(reply)
118
119 server.awaker.signal()
120 except:
121 logging.exception("Send error")
122 client.close_log()
123
126 """Handler for master peers.
127
128 """
129 _MAX_UNHANDLED = 1
130
131 - def __init__(self, server, connected_socket, client_address, family):
137
140
143 """Logic for master daemon shutdown.
144
145 """
146
147 _CHECK_INTERVAL = 5.0
148
149
150
151 _SHUTDOWN_LINGER = 5.0
152
154 """Initializes this class.
155
156 """
157 self._had_active_jobs = None
158 self._linger_timeout = None
159
161 """Determines if master daemon is ready for shutdown.
162
163 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
164 @rtype: None or number
165 @return: None if master daemon is ready, timeout if the check must be
166 repeated
167
168 """
169 if jq_prepare_result:
170
171 logging.info("Job queue has been notified for shutdown but is still"
172 " busy; next check in %s seconds", self._CHECK_INTERVAL)
173 self._had_active_jobs = True
174 return self._CHECK_INTERVAL
175
176 if not self._had_active_jobs:
177
178 return None
179
180
181
182 if self._linger_timeout is None:
183 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
184
185 remaining = self._linger_timeout.Remaining()
186
187 logging.info("Job queue no longer busy; shutting down master daemon"
188 " in %s seconds", remaining)
189
190
191
192
193 if remaining < 0:
194 return None
195 else:
196 return remaining
197
200 """Master Server.
201
202 This is the main asynchronous master server. It handles connections to the
203 master socket.
204
205 """
206 family = socket.AF_UNIX
207
209 """MasterServer constructor
210
211 @param address: the unix socket address to bind the MasterServer to
212 @param uid: The uid of the owner of the socket
213 @param gid: The gid of the owner of the socket
214
215 """
216 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
217 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
218 os.chmod(temp_name, 0770)
219 os.chown(temp_name, uid, gid)
220 os.rename(temp_name, address)
221
222 self.awaker = daemon.AsyncAwaker()
223
224
225 self.context = None
226 self.request_workers = None
227
228 self._shutdown_check = None
229
234
235 - def setup_context(self):
236 self.context = GanetiContext()
237 self.request_workers = workerpool.WorkerPool("ClientReq",
238 CLIENT_REQUEST_WORKERS,
239 ClientRequestWorker)
240
242 """Prepares server for shutdown.
243
244 """
245 if self._shutdown_check is None:
246 self._shutdown_check = _MasterShutdownCheck()
247
248 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
249
251 """Cleanup the server.
252
253 This involves shutting down the processor threads and the master
254 socket.
255
256 """
257 try:
258 self.close()
259 finally:
260 if self.request_workers:
261 self.request_workers.TerminateWorkers()
262 if self.context:
263 self.context.jobqueue.Shutdown()
264 self.context.livelock.close()
265
268 """Class holding high-level client operations."""
271
272 @staticmethod
274 logging.info("Picking up new job from queue")
275 (job_id, ) = args
276 queue.PickupJob(job_id)
277 return job_id
278
279 @staticmethod
281 (job_id, priority) = args
282 logging.info("Received request to change priority for job %s to %s",
283 job_id, priority)
284 return queue.ChangeJobPriority(job_id, priority)
285
287 context = self.server.context
288 queue = context.jobqueue
289
290
291 if not isinstance(args, (tuple, list)):
292 logging.info("Received invalid arguments of type '%s'", type(args))
293 raise ValueError("Invalid arguments type '%s'" % type(args))
294
295 if method not in luxi.REQ_ALL:
296 logging.info("Received invalid request '%s'", method)
297 raise ValueError("Invalid operation '%s'" % method)
298
299 job_id = None
300 if method == luxi.REQ_PICKUP_JOB:
301 job_id = self._PickupJob(args, queue)
302 elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
303 job_id = self._ChangeJobPriority(args, queue)
304 else:
305 logging.info("Request '%s' not supported by masterd", method)
306 raise ValueError("Unsupported operation '%s'" % method)
307
308 return job_id
309
310
311 -class GanetiContext(object):
312 """Context common to all ganeti threads.
313
314 This class creates and holds common objects shared by all threads.
315
316 """
317
318
319 _instance = None
320
321 - def __init__(self, livelock=None):
322 """Constructs a new GanetiContext object.
323
324 There should be only a GanetiContext object at any time, so this
325 function raises an error if this is not the case.
326
327 """
328 assert self.__class__._instance is None, "double GanetiContext instance"
329
330
331 if livelock is None:
332 self.livelock = utils.livelock.LiveLock("masterd")
333 else:
334 self.livelock = livelock
335
336
337 cfg = self.GetConfig(None)
338 logging.debug("Creating the job queue")
339 self.jobqueue = jqueue.JobQueue(self, cfg)
340
341
342 self.__class__._instance = self
343
344 - def __setattr__(self, name, value):
345 """Setting GanetiContext attributes is forbidden after initialization.
346
347 """
348 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
349 object.__setattr__(self, name, value)
350
351 - def GetWConfdContext(self, ec_id):
353
354 - def GetConfig(self, ec_id):
355 return config.GetConfig(ec_id, self.livelock)
356
357
358
359 - def GetRpc(self, cfg):
360 return rpc.RpcRunner(cfg, lambda _: None)
361
362 - def AddNode(self, cfg, node, ec_id):
363 """Adds a node to the configuration.
364
365 """
366
367 cfg.AddNode(node, ec_id)
368
369
370 self.jobqueue.AddNode(node)
371
372 - def ReaddNode(self, node):
373 """Updates a node that's already in the configuration
374
375 """
376
377 self.jobqueue.AddNode(node)
378
379 - def RemoveNode(self, cfg, node):
380 """Removes a node from the configuration and lock manager.
381
382 """
383
384 cfg.RemoveNode(node.uuid)
385
386
387 self.jobqueue.RemoveNode(node.name)
388
391 """Creates or removes the watcher pause file.
392
393 @type context: L{GanetiContext}
394 @param context: Global Ganeti context
395 @type until: None or int
396 @param until: Unix timestamp saying until when the watcher shouldn't run
397
398 """
399 node_names = context.GetConfig(ec_id).GetNodeList()
400
401 if until is None:
402 logging.info("Received request to no longer pause watcher")
403 else:
404 if not ht.TNumber(until):
405 raise TypeError("Duration must be numeric")
406
407 if until < time.time():
408 raise errors.GenericError("Unable to set pause end time in the past")
409
410 logging.info("Received request to pause watcher until %s", until)
411
412 result = context.rpc.call_set_watcher_pause(node_names, until)
413
414 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
415 for (node_name, nres) in result.items()
416 if nres.fail_msg and not nres.offline)
417 if errmsg:
418 raise errors.OpExecError("Watcher pause was set where possible, but failed"
419 " on the following node(s): %s" % errmsg)
420
421 return until
422