1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58
59
60 CLIENT_REQUEST_WORKERS = 16
61
62 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
67
68 - def RunTask(self, server, message, client):
69 """Process the request.
70
71 """
72 client_ops = ClientOps(server)
73
74 try:
75 (method, args, version) = luxi.ParseRequest(message)
76 except luxi.ProtocolError, err:
77 logging.error("Protocol Error: %s", err)
78 client.close_log()
79 return
80
81 success = False
82 try:
83
84 if version is not None and version != constants.LUXI_VERSION:
85 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
86 (constants.LUXI_VERSION, version))
87
88 result = client_ops.handle_request(method, args)
89 success = True
90 except errors.GenericError, err:
91 logging.exception("Unexpected exception")
92 success = False
93 result = errors.EncodeException(err)
94 except:
95 logging.exception("Unexpected exception")
96 err = sys.exc_info()
97 result = "Caught exception: %s" % str(err[1])
98
99 try:
100 reply = luxi.FormatResponse(success, result)
101 client.send_message(reply)
102
103 server.awaker.signal()
104 except:
105 logging.exception("Send error")
106 client.close_log()
107
110 """Handler for master peers.
111
112 """
113 _MAX_UNHANDLED = 1
114 - def __init__(self, server, connected_socket, client_address, family):
120
123
126 """Master Server.
127
128 This is the main asynchronous master server. It handles connections to the
129 master socket.
130
131 """
132 family = socket.AF_UNIX
133
134 - def __init__(self, mainloop, address, uid, gid):
135 """MasterServer constructor
136
137 @type mainloop: ganeti.daemon.Mainloop
138 @param mainloop: Mainloop used to poll for I/O events
139 @param address: the unix socket address to bind the MasterServer to
140 @param uid: The uid of the owner of the socket
141 @param gid: The gid of the owner of the socket
142
143 """
144 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
145 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
146 os.chmod(temp_name, 0770)
147 os.chown(temp_name, uid, gid)
148 os.rename(temp_name, address)
149
150 self.mainloop = mainloop
151 self.awaker = daemon.AsyncAwaker()
152
153
154 self.context = None
155 self.request_workers = None
156
161
167
169 """Cleanup the server.
170
171 This involves shutting down the processor threads and the master
172 socket.
173
174 """
175 try:
176 self.close()
177 finally:
178 if self.request_workers:
179 self.request_workers.TerminateWorkers()
180 if self.context:
181 self.context.jobqueue.Shutdown()
182
185 """Class holding high-level client operations."""
188
190 queue = self.server.context.jobqueue
191
192
193
194
195
196 if method == luxi.REQ_SUBMIT_JOB:
197 logging.info("Received new job")
198 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
199 return queue.SubmitJob(ops)
200
201 if method == luxi.REQ_SUBMIT_MANY_JOBS:
202 logging.info("Received multiple jobs")
203 jobs = []
204 for ops in args:
205 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
206 return queue.SubmitManyJobs(jobs)
207
208 elif method == luxi.REQ_CANCEL_JOB:
209 job_id = args
210 logging.info("Received job cancel request for %s", job_id)
211 return queue.CancelJob(job_id)
212
213 elif method == luxi.REQ_ARCHIVE_JOB:
214 job_id = args
215 logging.info("Received job archive request for %s", job_id)
216 return queue.ArchiveJob(job_id)
217
218 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
219 (age, timeout) = args
220 logging.info("Received job autoarchive request for age %s, timeout %s",
221 age, timeout)
222 return queue.AutoArchiveJobs(age, timeout)
223
224 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
225 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
226 logging.info("Received job poll request for %s", job_id)
227 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
228 prev_log_serial, timeout)
229
230 elif method == luxi.REQ_QUERY_JOBS:
231 (job_ids, fields) = args
232 if isinstance(job_ids, (tuple, list)) and job_ids:
233 msg = utils.CommaJoin(job_ids)
234 else:
235 msg = str(job_ids)
236 logging.info("Received job query request for %s", msg)
237 return queue.QueryJobs(job_ids, fields)
238
239 elif method == luxi.REQ_QUERY_INSTANCES:
240 (names, fields, use_locking) = args
241 logging.info("Received instance query request for %s", names)
242 if use_locking:
243 raise errors.OpPrereqError("Sync queries are not allowed",
244 errors.ECODE_INVAL)
245 op = opcodes.OpQueryInstances(names=names, output_fields=fields,
246 use_locking=use_locking)
247 return self._Query(op)
248
249 elif method == luxi.REQ_QUERY_NODES:
250 (names, fields, use_locking) = args
251 logging.info("Received node query request for %s", names)
252 if use_locking:
253 raise errors.OpPrereqError("Sync queries are not allowed",
254 errors.ECODE_INVAL)
255 op = opcodes.OpQueryNodes(names=names, output_fields=fields,
256 use_locking=use_locking)
257 return self._Query(op)
258
259 elif method == luxi.REQ_QUERY_EXPORTS:
260 nodes, use_locking = args
261 if use_locking:
262 raise errors.OpPrereqError("Sync queries are not allowed",
263 errors.ECODE_INVAL)
264 logging.info("Received exports query request")
265 op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
266 return self._Query(op)
267
268 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
269 fields = args
270 logging.info("Received config values query request for %s", fields)
271 op = opcodes.OpQueryConfigValues(output_fields=fields)
272 return self._Query(op)
273
274 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
275 logging.info("Received cluster info query request")
276 op = opcodes.OpQueryClusterInfo()
277 return self._Query(op)
278
279 elif method == luxi.REQ_QUERY_TAGS:
280 kind, name = args
281 logging.info("Received tags query request")
282 op = opcodes.OpGetTags(kind=kind, name=name)
283 return self._Query(op)
284
285 elif method == luxi.REQ_QUERY_LOCKS:
286 (fields, sync) = args
287 logging.info("Received locks query request")
288 return self.server.context.glm.QueryLocks(fields, sync)
289
290 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
291 drain_flag = args
292 logging.info("Received queue drain flag change request to %s",
293 drain_flag)
294 return queue.SetDrainFlag(drain_flag)
295
296 elif method == luxi.REQ_SET_WATCHER_PAUSE:
297 (until, ) = args
298
299 if until is None:
300 logging.info("Received request to no longer pause the watcher")
301 else:
302 if not isinstance(until, (int, float)):
303 raise TypeError("Duration must be an integer or float")
304
305 if until < time.time():
306 raise errors.GenericError("Unable to set pause end time in the past")
307
308 logging.info("Received request to pause the watcher until %s", until)
309
310 return _SetWatcherPause(until)
311
312 else:
313 logging.info("Received invalid request '%s'", method)
314 raise ValueError("Invalid operation '%s'" % method)
315
317 """Runs the specified opcode and returns the result.
318
319 """
320
321 proc = mcpu.Processor(self.server.context, None)
322
323
324
325 return proc.ExecOpCode(op, None)
326
327
328 -class GanetiContext(object):
329 """Context common to all ganeti threads.
330
331 This class creates and holds common objects shared by all threads.
332
333 """
334
335
336 _instance = None
337
338 - def __init__(self):
339 """Constructs a new GanetiContext object.
340
341 There should be only a GanetiContext object at any time, so this
342 function raises an error if this is not the case.
343
344 """
345 assert self.__class__._instance is None, "double GanetiContext instance"
346
347
348 self.cfg = config.ConfigWriter()
349
350
351 self.glm = locking.GanetiLockManager(
352 self.cfg.GetNodeList(),
353 self.cfg.GetInstanceList())
354
355
356 self.jobqueue = jqueue.JobQueue(self)
357
358
359 self.__class__._instance = self
360
361 - def __setattr__(self, name, value):
362 """Setting GanetiContext attributes is forbidden after initialization.
363
364 """
365 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
366 object.__setattr__(self, name, value)
367
368 - def AddNode(self, node, ec_id):
369 """Adds a node to the configuration and lock manager.
370
371 """
372
373 self.cfg.AddNode(node, ec_id)
374
375
376 self.jobqueue.AddNode(node)
377
378
379 self.glm.add(locking.LEVEL_NODE, node.name)
380
381 - def ReaddNode(self, node):
382 """Updates a node that's already in the configuration
383
384 """
385
386 self.jobqueue.AddNode(node)
387
388 - def RemoveNode(self, name):
389 """Removes a node from the configuration and lock manager.
390
391 """
392
393 self.cfg.RemoveNode(name)
394
395
396 self.jobqueue.RemoveNode(name)
397
398
399 self.glm.remove(locking.LEVEL_NODE, name)
400
416
420 """Check the agreement on who is the master.
421
422 The function uses a very simple algorithm: we must get more positive
423 than negative answers. Since in most of the cases we are the master,
424 we'll use our own config file for getting the node list. In the
425 future we could collect the current node list from our (possibly
426 obsolete) known nodes.
427
428 In order to account for cold-start of all nodes, we retry for up to
429 a minute until we get a real answer as the top-voted one. If the
430 nodes are more out-of-sync, for now manual startup of the master
431 should be attempted.
432
433 Note that for a even number of nodes cluster, we need at least half
434 of the nodes (beside ourselves) to vote for us. This creates a
435 problem on two-node clusters, since in this case we require the
436 other node to be up too to confirm our status.
437
438 """
439 myself = netutils.Hostname.GetSysName()
440
441 cfg = config.ConfigWriter()
442 node_list = cfg.GetNodeList()
443 del cfg
444 retries = 6
445 while retries > 0:
446 votes = bootstrap.GatherMasterVotes(node_list)
447 if not votes:
448
449 return True
450 if votes[0][0] is None:
451 retries -= 1
452 time.sleep(10)
453 continue
454 break
455 if retries == 0:
456 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
457 " after multiple retries. Aborting startup")
458 logging.critical("Use the --no-voting option if you understand what"
459 " effects it has on the cluster state")
460 return False
461
462 all_votes = sum(item[1] for item in votes)
463 top_node, top_votes = votes[0]
464
465 result = False
466 if top_node != myself:
467 logging.critical("It seems we are not the master (top-voted node"
468 " is %s with %d out of %d votes)", top_node, top_votes,
469 all_votes)
470 elif top_votes < all_votes - top_votes:
471 logging.critical("It seems we are not the master (%d votes for,"
472 " %d votes against)", top_votes, all_votes - top_votes)
473 else:
474 result = True
475
476 return result
477
487
490 """Initial checks whether to run or exit with a failure.
491
492 """
493 if args:
494 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
495 sys.exit(constants.EXIT_FAILURE)
496
497 ssconf.CheckMaster(options.debug)
498
499 try:
500 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
501 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
502 except KeyError:
503 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
504 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
505 sys.exit(constants.EXIT_FAILURE)
506
507
508 try:
509 config.ConfigWriter()
510 except errors.ConfigVersionMismatch, err:
511 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
512 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
513 print >> sys.stderr, \
514 ("Configuration version mismatch. The current Ganeti software"
515 " expects version %s, but the on-disk configuration file has"
516 " version %s. This is likely the result of upgrading the"
517 " software without running the upgrade procedure. Please contact"
518 " your cluster administrator or complete the upgrade using the"
519 " cfgupgrade utility, after reading the upgrade notes." %
520 (v1, v2))
521 sys.exit(constants.EXIT_FAILURE)
522 except errors.ConfigurationError, err:
523 print >> sys.stderr, \
524 ("Configuration error while opening the configuration file: %s\n"
525 "This might be caused by an incomplete software upgrade or"
526 " by a corrupted configuration file. Until the problem is fixed"
527 " the master daemon cannot start." % str(err))
528 sys.exit(constants.EXIT_FAILURE)
529
530
531
532 if options.no_voting:
533 if options.yes_do_it:
534 return
535
536 sys.stdout.write("The 'no voting' option has been selected.\n")
537 sys.stdout.write("This is dangerous, please confirm by"
538 " typing uppercase 'yes': ")
539 sys.stdout.flush()
540
541 confirmation = sys.stdin.readline().strip()
542 if confirmation != "YES":
543 print >> sys.stderr, "Aborting."
544 sys.exit(constants.EXIT_FAILURE)
545
546 return
547
548
549
550 if not utils.RunInSeparateProcess(CheckAgreement):
551 sys.exit(constants.EXIT_FAILURE)
552
553
554
555
556
557 utils.RunInSeparateProcess(ActivateMasterIP)
558
572
575 """Main master daemon function, executed with the PID file held.
576
577 """
578 (mainloop, master) = prep_data
579 try:
580 rpc.Init()
581 try:
582 master.setup_queue()
583 try:
584 mainloop.Run()
585 finally:
586 master.server_cleanup()
587 finally:
588 rpc.Shutdown()
589 finally:
590 utils.RemoveFile(constants.MASTER_SOCKET)
591
594 """Main function"""
595 parser = OptionParser(description="Ganeti master daemon",
596 usage="%prog [-f] [-d]",
597 version="%%prog (ganeti) %s" %
598 constants.RELEASE_VERSION)
599 parser.add_option("--no-voting", dest="no_voting",
600 help="Do not check that the nodes agree on this node"
601 " being the master and start the daemon unconditionally",
602 default=False, action="store_true")
603 parser.add_option("--yes-do-it", dest="yes_do_it",
604 help="Override interactive check for --no-voting",
605 default=False, action="store_true")
606 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
607 ExecMasterd, multithreaded=True)
608