1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61
62
63 CLIENT_REQUEST_WORKERS = 16
64
65 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
66 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
70 """Log information about a recently submitted job.
71
72 """
73 if status:
74 logging.info("New job with id %s, summary: %s",
75 info, utils.CommaJoin(op.Summary() for op in ops))
76 else:
77 logging.info("Failed to submit job, reason: '%s', summary: %s",
78 info, utils.CommaJoin(op.Summary() for op in ops))
79
82
83 - def RunTask(self, server, message, client):
84 """Process the request.
85
86 """
87 client_ops = ClientOps(server)
88
89 try:
90 (method, args, version) = luxi.ParseRequest(message)
91 except luxi.ProtocolError, err:
92 logging.error("Protocol Error: %s", err)
93 client.close_log()
94 return
95
96 success = False
97 try:
98
99 if version is not None and version != constants.LUXI_VERSION:
100 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
101 (constants.LUXI_VERSION, version))
102
103 result = client_ops.handle_request(method, args)
104 success = True
105 except errors.GenericError, err:
106 logging.exception("Unexpected exception")
107 success = False
108 result = errors.EncodeException(err)
109 except:
110 logging.exception("Unexpected exception")
111 err = sys.exc_info()
112 result = "Caught exception: %s" % str(err[1])
113
114 try:
115 reply = luxi.FormatResponse(success, result)
116 client.send_message(reply)
117
118 server.awaker.signal()
119 except:
120 logging.exception("Send error")
121 client.close_log()
122
125 """Handler for master peers.
126
127 """
128 _MAX_UNHANDLED = 1
129
130 - def __init__(self, server, connected_socket, client_address, family):
136
139
142 """Logic for master daemon shutdown.
143
144 """
145
146 _CHECK_INTERVAL = 5.0
147
148
149
150 _SHUTDOWN_LINGER = 5.0
151
153 """Initializes this class.
154
155 """
156 self._had_active_jobs = None
157 self._linger_timeout = None
158
160 """Determines if master daemon is ready for shutdown.
161
162 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
163 @rtype: None or number
164 @return: None if master daemon is ready, timeout if the check must be
165 repeated
166
167 """
168 if jq_prepare_result:
169
170 logging.info("Job queue has been notified for shutdown but is still"
171 " busy; next check in %s seconds", self._CHECK_INTERVAL)
172 self._had_active_jobs = True
173 return self._CHECK_INTERVAL
174
175 if not self._had_active_jobs:
176
177 return None
178
179
180
181 if self._linger_timeout is None:
182 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
183
184 remaining = self._linger_timeout.Remaining()
185
186 logging.info("Job queue no longer busy; shutting down master daemon"
187 " in %s seconds", remaining)
188
189
190
191
192 if remaining < 0:
193 return None
194 else:
195 return remaining
196
199 """Master Server.
200
201 This is the main asynchronous master server. It handles connections to the
202 master socket.
203
204 """
205 family = socket.AF_UNIX
206
208 """MasterServer constructor
209
210 @param address: the unix socket address to bind the MasterServer to
211 @param uid: The uid of the owner of the socket
212 @param gid: The gid of the owner of the socket
213
214 """
215 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
216 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
217 os.chmod(temp_name, 0770)
218 os.chown(temp_name, uid, gid)
219 os.rename(temp_name, address)
220
221 self.awaker = daemon.AsyncAwaker()
222
223
224 self.context = None
225 self.request_workers = None
226
227 self._shutdown_check = None
228
233
239
241 """Prepares server for shutdown.
242
243 """
244 if self._shutdown_check is None:
245 self._shutdown_check = _MasterShutdownCheck()
246
247 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
248
250 """Cleanup the server.
251
252 This involves shutting down the processor threads and the master
253 socket.
254
255 """
256 try:
257 self.close()
258 finally:
259 if self.request_workers:
260 self.request_workers.TerminateWorkers()
261 if self.context:
262 self.context.jobqueue.Shutdown()
263
266 """Class holding high-level client operations."""
269
271 context = self.server.context
272 queue = context.jobqueue
273
274
275 if not isinstance(args, (tuple, list)):
276 logging.info("Received invalid arguments of type '%s'", type(args))
277 raise ValueError("Invalid arguments type '%s'" % type(args))
278
279
280
281 if method == luxi.REQ_SUBMIT_JOB:
282 logging.info("Receiving new job")
283 (job_def, ) = args
284 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
285 job_id = queue.SubmitJob(ops)
286 _LogNewJob(True, job_id, ops)
287 return job_id
288
289 elif method == luxi.REQ_SUBMIT_MANY_JOBS:
290 logging.info("Receiving multiple jobs")
291 (job_defs, ) = args
292 jobs = []
293 for ops in job_defs:
294 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
295 job_ids = queue.SubmitManyJobs(jobs)
296 for ((status, job_id), ops) in zip(job_ids, jobs):
297 _LogNewJob(status, job_id, ops)
298 return job_ids
299
300 elif method == luxi.REQ_CANCEL_JOB:
301 (job_id, ) = args
302 logging.info("Received job cancel request for %s", job_id)
303 return queue.CancelJob(job_id)
304
305 elif method == luxi.REQ_ARCHIVE_JOB:
306 (job_id, ) = args
307 logging.info("Received job archive request for %s", job_id)
308 return queue.ArchiveJob(job_id)
309
310 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
311 (age, timeout) = args
312 logging.info("Received job autoarchive request for age %s, timeout %s",
313 age, timeout)
314 return queue.AutoArchiveJobs(age, timeout)
315
316 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
317 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
318 logging.info("Received job poll request for %s", job_id)
319 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
320 prev_log_serial, timeout)
321
322 elif method == luxi.REQ_QUERY:
323 (what, fields, qfilter) = args
324
325 if what in constants.QR_VIA_OP:
326 result = self._Query(opcodes.OpQuery(what=what, fields=fields,
327 qfilter=qfilter))
328 elif what == constants.QR_LOCK:
329 if qfilter is not None:
330 raise errors.OpPrereqError("Lock queries can't be filtered")
331 return context.glm.QueryLocks(fields)
332 elif what == constants.QR_JOB:
333 return queue.QueryJobs(fields, qfilter)
334 elif what in constants.QR_VIA_LUXI:
335 raise NotImplementedError
336 else:
337 raise errors.OpPrereqError("Resource type '%s' unknown" % what,
338 errors.ECODE_INVAL)
339
340 return result
341
342 elif method == luxi.REQ_QUERY_FIELDS:
343 (what, fields) = args
344 req = objects.QueryFieldsRequest(what=what, fields=fields)
345
346 try:
347 fielddefs = query.ALL_FIELDS[req.what]
348 except KeyError:
349 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
350 errors.ECODE_INVAL)
351
352 return query.QueryFields(fielddefs, req.fields)
353
354 elif method == luxi.REQ_QUERY_JOBS:
355 (job_ids, fields) = args
356 if isinstance(job_ids, (tuple, list)) and job_ids:
357 msg = utils.CommaJoin(job_ids)
358 else:
359 msg = str(job_ids)
360 logging.info("Received job query request for %s", msg)
361 return queue.OldStyleQueryJobs(job_ids, fields)
362
363 elif method == luxi.REQ_QUERY_INSTANCES:
364 (names, fields, use_locking) = args
365 logging.info("Received instance query request for %s", names)
366 if use_locking:
367 raise errors.OpPrereqError("Sync queries are not allowed",
368 errors.ECODE_INVAL)
369 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
370 use_locking=use_locking)
371 return self._Query(op)
372
373 elif method == luxi.REQ_QUERY_NODES:
374 (names, fields, use_locking) = args
375 logging.info("Received node query request for %s", names)
376 if use_locking:
377 raise errors.OpPrereqError("Sync queries are not allowed",
378 errors.ECODE_INVAL)
379 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
380 use_locking=use_locking)
381 return self._Query(op)
382
383 elif method == luxi.REQ_QUERY_GROUPS:
384 (names, fields, use_locking) = args
385 logging.info("Received group query request for %s", names)
386 if use_locking:
387 raise errors.OpPrereqError("Sync queries are not allowed",
388 errors.ECODE_INVAL)
389 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
390 return self._Query(op)
391
392 elif method == luxi.REQ_QUERY_EXPORTS:
393 (nodes, use_locking) = args
394 if use_locking:
395 raise errors.OpPrereqError("Sync queries are not allowed",
396 errors.ECODE_INVAL)
397 logging.info("Received exports query request")
398 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
399 return self._Query(op)
400
401 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
402 (fields, ) = args
403 logging.info("Received config values query request for %s", fields)
404 op = opcodes.OpClusterConfigQuery(output_fields=fields)
405 return self._Query(op)
406
407 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
408 logging.info("Received cluster info query request")
409 op = opcodes.OpClusterQuery()
410 return self._Query(op)
411
412 elif method == luxi.REQ_QUERY_TAGS:
413 (kind, name) = args
414 logging.info("Received tags query request")
415 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
416 return self._Query(op)
417
418 elif method == luxi.REQ_SET_DRAIN_FLAG:
419 (drain_flag, ) = args
420 logging.info("Received queue drain flag change request to %s",
421 drain_flag)
422 return queue.SetDrainFlag(drain_flag)
423
424 elif method == luxi.REQ_SET_WATCHER_PAUSE:
425 (until, ) = args
426
427 if until is None:
428 logging.info("Received request to no longer pause the watcher")
429 else:
430 if not isinstance(until, (int, float)):
431 raise TypeError("Duration must be an integer or float")
432
433 if until < time.time():
434 raise errors.GenericError("Unable to set pause end time in the past")
435
436 logging.info("Received request to pause the watcher until %s", until)
437
438 return _SetWatcherPause(until)
439
440 else:
441 logging.info("Received invalid request '%s'", method)
442 raise ValueError("Invalid operation '%s'" % method)
443
445 """Runs the specified opcode and returns the result.
446
447 """
448
449 proc = mcpu.Processor(self.server.context, None, enable_locks=False)
450
451
452
453 return proc.ExecOpCode(op, None)
454
455
456 -class GanetiContext(object):
457 """Context common to all ganeti threads.
458
459 This class creates and holds common objects shared by all threads.
460
461 """
462
463
464 _instance = None
465
466 - def __init__(self):
467 """Constructs a new GanetiContext object.
468
469 There should be only a GanetiContext object at any time, so this
470 function raises an error if this is not the case.
471
472 """
473 assert self.__class__._instance is None, "double GanetiContext instance"
474
475
476 self.cfg = config.ConfigWriter()
477
478
479 self.glm = locking.GanetiLockManager(
480 self.cfg.GetNodeList(),
481 self.cfg.GetNodeGroupList(),
482 self.cfg.GetInstanceList())
483
484 self.cfg.SetContext(self)
485
486
487 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
488
489
490 self.jobqueue = jqueue.JobQueue(self)
491
492
493 self.__class__._instance = self
494
495 - def __setattr__(self, name, value):
496 """Setting GanetiContext attributes is forbidden after initialization.
497
498 """
499 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
500 object.__setattr__(self, name, value)
501
502 - def AddNode(self, node, ec_id):
503 """Adds a node to the configuration and lock manager.
504
505 """
506
507 self.cfg.AddNode(node, ec_id)
508
509
510 self.jobqueue.AddNode(node)
511
512
513 self.glm.add(locking.LEVEL_NODE, node.name)
514 self.glm.add(locking.LEVEL_NODE_RES, node.name)
515
516 - def ReaddNode(self, node):
517 """Updates a node that's already in the configuration
518
519 """
520
521 self.jobqueue.AddNode(node)
522
523 - def RemoveNode(self, name):
524 """Removes a node from the configuration and lock manager.
525
526 """
527
528 self.cfg.RemoveNode(name)
529
530
531 self.jobqueue.RemoveNode(name)
532
533
534 self.glm.remove(locking.LEVEL_NODE, name)
535 self.glm.remove(locking.LEVEL_NODE_RES, name)
536
552
556 """Check the agreement on who is the master.
557
558 The function uses a very simple algorithm: we must get more positive
559 than negative answers. Since in most of the cases we are the master,
560 we'll use our own config file for getting the node list. In the
561 future we could collect the current node list from our (possibly
562 obsolete) known nodes.
563
564 In order to account for cold-start of all nodes, we retry for up to
565 a minute until we get a real answer as the top-voted one. If the
566 nodes are more out-of-sync, for now manual startup of the master
567 should be attempted.
568
569 Note that for a even number of nodes cluster, we need at least half
570 of the nodes (beside ourselves) to vote for us. This creates a
571 problem on two-node clusters, since in this case we require the
572 other node to be up too to confirm our status.
573
574 """
575 myself = netutils.Hostname.GetSysName()
576
577 cfg = config.ConfigWriter()
578 node_list = cfg.GetNodeList()
579 del cfg
580 retries = 6
581 while retries > 0:
582 votes = bootstrap.GatherMasterVotes(node_list)
583 if not votes:
584
585 return True
586 if votes[0][0] is None:
587 retries -= 1
588 time.sleep(10)
589 continue
590 break
591 if retries == 0:
592 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
593 " after multiple retries. Aborting startup")
594 logging.critical("Use the --no-voting option if you understand what"
595 " effects it has on the cluster state")
596 return False
597
598 all_votes = sum(item[1] for item in votes)
599 top_node, top_votes = votes[0]
600
601 result = False
602 if top_node != myself:
603 logging.critical("It seems we are not the master (top-voted node"
604 " is %s with %d out of %d votes)", top_node, top_votes,
605 all_votes)
606 elif top_votes < all_votes - top_votes:
607 logging.critical("It seems we are not the master (%d votes for,"
608 " %d votes against)", top_votes, all_votes - top_votes)
609 else:
610 result = True
611
612 return result
613
628
631 """Initial checks whether to run or exit with a failure.
632
633 """
634 if args:
635 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
636 sys.exit(constants.EXIT_FAILURE)
637
638 ssconf.CheckMaster(options.debug)
639
640 try:
641 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
642 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
643 except KeyError:
644 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
645 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
646 sys.exit(constants.EXIT_FAILURE)
647
648
649 runtime.InitArchInfo()
650
651
652 try:
653 config.ConfigWriter()
654 except errors.ConfigVersionMismatch, err:
655 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
656 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
657 print >> sys.stderr, \
658 ("Configuration version mismatch. The current Ganeti software"
659 " expects version %s, but the on-disk configuration file has"
660 " version %s. This is likely the result of upgrading the"
661 " software without running the upgrade procedure. Please contact"
662 " your cluster administrator or complete the upgrade using the"
663 " cfgupgrade utility, after reading the upgrade notes." %
664 (v1, v2))
665 sys.exit(constants.EXIT_FAILURE)
666 except errors.ConfigurationError, err:
667 print >> sys.stderr, \
668 ("Configuration error while opening the configuration file: %s\n"
669 "This might be caused by an incomplete software upgrade or"
670 " by a corrupted configuration file. Until the problem is fixed"
671 " the master daemon cannot start." % str(err))
672 sys.exit(constants.EXIT_FAILURE)
673
674
675
676 if options.no_voting:
677 if not options.yes_do_it:
678 sys.stdout.write("The 'no voting' option has been selected.\n")
679 sys.stdout.write("This is dangerous, please confirm by"
680 " typing uppercase 'yes': ")
681 sys.stdout.flush()
682
683 confirmation = sys.stdin.readline().strip()
684 if confirmation != "YES":
685 print >> sys.stderr, "Aborting."
686 sys.exit(constants.EXIT_FAILURE)
687
688 else:
689
690
691
692 if not utils.RunInSeparateProcess(CheckAgreement):
693 sys.exit(constants.EXIT_FAILURE)
694
695
696
697
698
699 utils.RunInSeparateProcess(ActivateMasterIP)
700
713
716 """Main master daemon function, executed with the PID file held.
717
718 """
719 (mainloop, master) = prep_data
720 try:
721 rpc.Init()
722 try:
723 master.setup_queue()
724 try:
725 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
726 finally:
727 master.server_cleanup()
728 finally:
729 rpc.Shutdown()
730 finally:
731 utils.RemoveFile(constants.MASTER_SOCKET)
732
733 logging.info("Clean master daemon shutdown")
734
737 """Main function"""
738 parser = OptionParser(description="Ganeti master daemon",
739 usage="%prog [-f] [-d]",
740 version="%%prog (ganeti) %s" %
741 constants.RELEASE_VERSION)
742 parser.add_option("--no-voting", dest="no_voting",
743 help="Do not check that the nodes agree on this node"
744 " being the master and start the daemon unconditionally",
745 default=False, action="store_true")
746 parser.add_option("--yes-do-it", dest="yes_do_it",
747 help="Override interactive check for --no-voting",
748 default=False, action="store_true")
749 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
750 ExecMasterd, multithreaded=True)
751