1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60
61
62 CLIENT_REQUEST_WORKERS = 16
63
64 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
69
70 - def RunTask(self, server, message, client):
71 """Process the request.
72
73 """
74 client_ops = ClientOps(server)
75
76 try:
77 (method, args, version) = luxi.ParseRequest(message)
78 except luxi.ProtocolError, err:
79 logging.error("Protocol Error: %s", err)
80 client.close_log()
81 return
82
83 success = False
84 try:
85
86 if version is not None and version != constants.LUXI_VERSION:
87 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88 (constants.LUXI_VERSION, version))
89
90 result = client_ops.handle_request(method, args)
91 success = True
92 except errors.GenericError, err:
93 logging.exception("Unexpected exception")
94 success = False
95 result = errors.EncodeException(err)
96 except:
97 logging.exception("Unexpected exception")
98 err = sys.exc_info()
99 result = "Caught exception: %s" % str(err[1])
100
101 try:
102 reply = luxi.FormatResponse(success, result)
103 client.send_message(reply)
104
105 server.awaker.signal()
106 except:
107 logging.exception("Send error")
108 client.close_log()
109
112 """Handler for master peers.
113
114 """
115 _MAX_UNHANDLED = 1
116
117 - def __init__(self, server, connected_socket, client_address, family):
123
126
129 """Master Server.
130
131 This is the main asynchronous master server. It handles connections to the
132 master socket.
133
134 """
135 family = socket.AF_UNIX
136
137 - def __init__(self, mainloop, address, uid, gid):
138 """MasterServer constructor
139
140 @type mainloop: ganeti.daemon.Mainloop
141 @param mainloop: Mainloop used to poll for I/O events
142 @param address: the unix socket address to bind the MasterServer to
143 @param uid: The uid of the owner of the socket
144 @param gid: The gid of the owner of the socket
145
146 """
147 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
148 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
149 os.chmod(temp_name, 0770)
150 os.chown(temp_name, uid, gid)
151 os.rename(temp_name, address)
152
153 self.mainloop = mainloop
154 self.awaker = daemon.AsyncAwaker()
155
156
157 self.context = None
158 self.request_workers = None
159
164
170
172 """Cleanup the server.
173
174 This involves shutting down the processor threads and the master
175 socket.
176
177 """
178 try:
179 self.close()
180 finally:
181 if self.request_workers:
182 self.request_workers.TerminateWorkers()
183 if self.context:
184 self.context.jobqueue.Shutdown()
185
188 """Class holding high-level client operations."""
191
193 queue = self.server.context.jobqueue
194
195
196
197
198
199 if method == luxi.REQ_SUBMIT_JOB:
200 logging.info("Received new job")
201 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
202 return queue.SubmitJob(ops)
203
204 if method == luxi.REQ_SUBMIT_MANY_JOBS:
205 logging.info("Received multiple jobs")
206 jobs = []
207 for ops in args:
208 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
209 return queue.SubmitManyJobs(jobs)
210
211 elif method == luxi.REQ_CANCEL_JOB:
212 job_id = args
213 logging.info("Received job cancel request for %s", job_id)
214 return queue.CancelJob(job_id)
215
216 elif method == luxi.REQ_ARCHIVE_JOB:
217 job_id = args
218 logging.info("Received job archive request for %s", job_id)
219 return queue.ArchiveJob(job_id)
220
221 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
222 (age, timeout) = args
223 logging.info("Received job autoarchive request for age %s, timeout %s",
224 age, timeout)
225 return queue.AutoArchiveJobs(age, timeout)
226
227 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
228 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
229 logging.info("Received job poll request for %s", job_id)
230 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
231 prev_log_serial, timeout)
232
233 elif method == luxi.REQ_QUERY:
234 req = objects.QueryRequest.FromDict(args)
235
236 if req.what in constants.QR_VIA_OP:
237 result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
238 filter=req.filter))
239 elif req.what == constants.QR_LOCK:
240 if req.filter is not None:
241 raise errors.OpPrereqError("Lock queries can't be filtered")
242 return self.server.context.glm.QueryLocks(req.fields)
243 elif req.what in constants.QR_VIA_LUXI:
244 raise NotImplementedError
245 else:
246 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
247 errors.ECODE_INVAL)
248
249 return result
250
251 elif method == luxi.REQ_QUERY_FIELDS:
252 req = objects.QueryFieldsRequest.FromDict(args)
253
254 try:
255 fielddefs = query.ALL_FIELDS[req.what]
256 except KeyError:
257 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
258 errors.ECODE_INVAL)
259
260 return query.QueryFields(fielddefs, req.fields)
261
262 elif method == luxi.REQ_QUERY_JOBS:
263 (job_ids, fields) = args
264 if isinstance(job_ids, (tuple, list)) and job_ids:
265 msg = utils.CommaJoin(job_ids)
266 else:
267 msg = str(job_ids)
268 logging.info("Received job query request for %s", msg)
269 return queue.QueryJobs(job_ids, fields)
270
271 elif method == luxi.REQ_QUERY_INSTANCES:
272 (names, fields, use_locking) = args
273 logging.info("Received instance query request for %s", names)
274 if use_locking:
275 raise errors.OpPrereqError("Sync queries are not allowed",
276 errors.ECODE_INVAL)
277 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
278 use_locking=use_locking)
279 return self._Query(op)
280
281 elif method == luxi.REQ_QUERY_NODES:
282 (names, fields, use_locking) = args
283 logging.info("Received node query request for %s", names)
284 if use_locking:
285 raise errors.OpPrereqError("Sync queries are not allowed",
286 errors.ECODE_INVAL)
287 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
288 use_locking=use_locking)
289 return self._Query(op)
290
291 elif method == luxi.REQ_QUERY_GROUPS:
292 (names, fields, use_locking) = args
293 logging.info("Received group query request for %s", names)
294 if use_locking:
295 raise errors.OpPrereqError("Sync queries are not allowed",
296 errors.ECODE_INVAL)
297 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
298 return self._Query(op)
299
300 elif method == luxi.REQ_QUERY_EXPORTS:
301 nodes, use_locking = args
302 if use_locking:
303 raise errors.OpPrereqError("Sync queries are not allowed",
304 errors.ECODE_INVAL)
305 logging.info("Received exports query request")
306 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
307 return self._Query(op)
308
309 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
310 fields = args
311 logging.info("Received config values query request for %s", fields)
312 op = opcodes.OpClusterConfigQuery(output_fields=fields)
313 return self._Query(op)
314
315 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
316 logging.info("Received cluster info query request")
317 op = opcodes.OpClusterQuery()
318 return self._Query(op)
319
320 elif method == luxi.REQ_QUERY_TAGS:
321 kind, name = args
322 logging.info("Received tags query request")
323 op = opcodes.OpTagsGet(kind=kind, name=name)
324 return self._Query(op)
325
326 elif method == luxi.REQ_QUERY_LOCKS:
327 (fields, sync) = args
328 logging.info("Received locks query request")
329 if sync:
330 raise NotImplementedError("Synchronous queries are not implemented")
331 return self.server.context.glm.OldStyleQueryLocks(fields)
332
333 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
334 drain_flag = args
335 logging.info("Received queue drain flag change request to %s",
336 drain_flag)
337 return queue.SetDrainFlag(drain_flag)
338
339 elif method == luxi.REQ_SET_WATCHER_PAUSE:
340 (until, ) = args
341
342 if until is None:
343 logging.info("Received request to no longer pause the watcher")
344 else:
345 if not isinstance(until, (int, float)):
346 raise TypeError("Duration must be an integer or float")
347
348 if until < time.time():
349 raise errors.GenericError("Unable to set pause end time in the past")
350
351 logging.info("Received request to pause the watcher until %s", until)
352
353 return _SetWatcherPause(until)
354
355 else:
356 logging.info("Received invalid request '%s'", method)
357 raise ValueError("Invalid operation '%s'" % method)
358
360 """Runs the specified opcode and returns the result.
361
362 """
363
364 proc = mcpu.Processor(self.server.context, None)
365
366
367
368 return proc.ExecOpCode(op, None)
369
370
371 -class GanetiContext(object):
372 """Context common to all ganeti threads.
373
374 This class creates and holds common objects shared by all threads.
375
376 """
377
378
379 _instance = None
380
381 - def __init__(self):
382 """Constructs a new GanetiContext object.
383
384 There should be only a GanetiContext object at any time, so this
385 function raises an error if this is not the case.
386
387 """
388 assert self.__class__._instance is None, "double GanetiContext instance"
389
390
391 self.cfg = config.ConfigWriter()
392
393
394 self.glm = locking.GanetiLockManager(
395 self.cfg.GetNodeList(),
396 self.cfg.GetNodeGroupList(),
397 self.cfg.GetInstanceList())
398
399
400 self.jobqueue = jqueue.JobQueue(self)
401
402
403 self.__class__._instance = self
404
405 - def __setattr__(self, name, value):
406 """Setting GanetiContext attributes is forbidden after initialization.
407
408 """
409 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
410 object.__setattr__(self, name, value)
411
412 - def AddNode(self, node, ec_id):
413 """Adds a node to the configuration and lock manager.
414
415 """
416
417 self.cfg.AddNode(node, ec_id)
418
419
420 self.jobqueue.AddNode(node)
421
422
423 self.glm.add(locking.LEVEL_NODE, node.name)
424
425 - def ReaddNode(self, node):
426 """Updates a node that's already in the configuration
427
428 """
429
430 self.jobqueue.AddNode(node)
431
432 - def RemoveNode(self, name):
433 """Removes a node from the configuration and lock manager.
434
435 """
436
437 self.cfg.RemoveNode(name)
438
439
440 self.jobqueue.RemoveNode(name)
441
442
443 self.glm.remove(locking.LEVEL_NODE, name)
444
460
464 """Check the agreement on who is the master.
465
466 The function uses a very simple algorithm: we must get more positive
467 than negative answers. Since in most of the cases we are the master,
468 we'll use our own config file for getting the node list. In the
469 future we could collect the current node list from our (possibly
470 obsolete) known nodes.
471
472 In order to account for cold-start of all nodes, we retry for up to
473 a minute until we get a real answer as the top-voted one. If the
474 nodes are more out-of-sync, for now manual startup of the master
475 should be attempted.
476
477 Note that for a even number of nodes cluster, we need at least half
478 of the nodes (beside ourselves) to vote for us. This creates a
479 problem on two-node clusters, since in this case we require the
480 other node to be up too to confirm our status.
481
482 """
483 myself = netutils.Hostname.GetSysName()
484
485 cfg = config.ConfigWriter()
486 node_list = cfg.GetNodeList()
487 del cfg
488 retries = 6
489 while retries > 0:
490 votes = bootstrap.GatherMasterVotes(node_list)
491 if not votes:
492
493 return True
494 if votes[0][0] is None:
495 retries -= 1
496 time.sleep(10)
497 continue
498 break
499 if retries == 0:
500 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
501 " after multiple retries. Aborting startup")
502 logging.critical("Use the --no-voting option if you understand what"
503 " effects it has on the cluster state")
504 return False
505
506 all_votes = sum(item[1] for item in votes)
507 top_node, top_votes = votes[0]
508
509 result = False
510 if top_node != myself:
511 logging.critical("It seems we are not the master (top-voted node"
512 " is %s with %d out of %d votes)", top_node, top_votes,
513 all_votes)
514 elif top_votes < all_votes - top_votes:
515 logging.critical("It seems we are not the master (%d votes for,"
516 " %d votes against)", top_votes, all_votes - top_votes)
517 else:
518 result = True
519
520 return result
521
531
534 """Initial checks whether to run or exit with a failure.
535
536 """
537 if args:
538 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
539 sys.exit(constants.EXIT_FAILURE)
540
541 ssconf.CheckMaster(options.debug)
542
543 try:
544 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
545 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
546 except KeyError:
547 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
548 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
549 sys.exit(constants.EXIT_FAILURE)
550
551
552 try:
553 config.ConfigWriter()
554 except errors.ConfigVersionMismatch, err:
555 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
556 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
557 print >> sys.stderr, \
558 ("Configuration version mismatch. The current Ganeti software"
559 " expects version %s, but the on-disk configuration file has"
560 " version %s. This is likely the result of upgrading the"
561 " software without running the upgrade procedure. Please contact"
562 " your cluster administrator or complete the upgrade using the"
563 " cfgupgrade utility, after reading the upgrade notes." %
564 (v1, v2))
565 sys.exit(constants.EXIT_FAILURE)
566 except errors.ConfigurationError, err:
567 print >> sys.stderr, \
568 ("Configuration error while opening the configuration file: %s\n"
569 "This might be caused by an incomplete software upgrade or"
570 " by a corrupted configuration file. Until the problem is fixed"
571 " the master daemon cannot start." % str(err))
572 sys.exit(constants.EXIT_FAILURE)
573
574
575
576 if options.no_voting:
577 if not options.yes_do_it:
578 sys.stdout.write("The 'no voting' option has been selected.\n")
579 sys.stdout.write("This is dangerous, please confirm by"
580 " typing uppercase 'yes': ")
581 sys.stdout.flush()
582
583 confirmation = sys.stdin.readline().strip()
584 if confirmation != "YES":
585 print >> sys.stderr, "Aborting."
586 sys.exit(constants.EXIT_FAILURE)
587
588 else:
589
590
591
592 if not utils.RunInSeparateProcess(CheckAgreement):
593 sys.exit(constants.EXIT_FAILURE)
594
595
596
597
598
599 utils.RunInSeparateProcess(ActivateMasterIP)
600
614
617 """Main master daemon function, executed with the PID file held.
618
619 """
620 (mainloop, master) = prep_data
621 try:
622 rpc.Init()
623 try:
624 master.setup_queue()
625 try:
626 mainloop.Run()
627 finally:
628 master.server_cleanup()
629 finally:
630 rpc.Shutdown()
631 finally:
632 utils.RemoveFile(constants.MASTER_SOCKET)
633
636 """Main function"""
637 parser = OptionParser(description="Ganeti master daemon",
638 usage="%prog [-f] [-d]",
639 version="%%prog (ganeti) %s" %
640 constants.RELEASE_VERSION)
641 parser.add_option("--no-voting", dest="no_voting",
642 help="Do not check that the nodes agree on this node"
643 " being the master and start the daemon unconditionally",
644 default=False, action="store_true")
645 parser.add_option("--yes-do-it", dest="yes_do_it",
646 help="Override interactive check for --no-voting",
647 default=False, action="store_true")
648 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
649 ExecMasterd, multithreaded=True)
650