1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
62 from ganeti import ht
63
64
65 CLIENT_REQUEST_WORKERS = 16
66
67 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
68 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
72 """Log information about a recently submitted job.
73
74 """
75 op_summary = utils.CommaJoin(op.Summary() for op in ops)
76
77 if status:
78 logging.info("New job with id %s, summary: %s", info, op_summary)
79 else:
80 logging.info("Failed to submit job, reason: '%s', summary: %s",
81 info, op_summary)
82
85
86 - def RunTask(self, server, message, client):
87 """Process the request.
88
89 """
90 client_ops = ClientOps(server)
91
92 try:
93 (method, args, version) = luxi.ParseRequest(message)
94 except luxi.ProtocolError, err:
95 logging.error("Protocol Error: %s", err)
96 client.close_log()
97 return
98
99 success = False
100 try:
101
102 if version is not None and version != constants.LUXI_VERSION:
103 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
104 (constants.LUXI_VERSION, version))
105
106 result = client_ops.handle_request(method, args)
107 success = True
108 except errors.GenericError, err:
109 logging.exception("Unexpected exception")
110 success = False
111 result = errors.EncodeException(err)
112 except:
113 logging.exception("Unexpected exception")
114 err = sys.exc_info()
115 result = "Caught exception: %s" % str(err[1])
116
117 try:
118 reply = luxi.FormatResponse(success, result)
119 client.send_message(reply)
120
121 server.awaker.signal()
122 except:
123 logging.exception("Send error")
124 client.close_log()
125
128 """Handler for master peers.
129
130 """
131 _MAX_UNHANDLED = 1
132
133 - def __init__(self, server, connected_socket, client_address, family):
139
142
145 """Logic for master daemon shutdown.
146
147 """
148
149 _CHECK_INTERVAL = 5.0
150
151
152
153 _SHUTDOWN_LINGER = 5.0
154
156 """Initializes this class.
157
158 """
159 self._had_active_jobs = None
160 self._linger_timeout = None
161
163 """Determines if master daemon is ready for shutdown.
164
165 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
166 @rtype: None or number
167 @return: None if master daemon is ready, timeout if the check must be
168 repeated
169
170 """
171 if jq_prepare_result:
172
173 logging.info("Job queue has been notified for shutdown but is still"
174 " busy; next check in %s seconds", self._CHECK_INTERVAL)
175 self._had_active_jobs = True
176 return self._CHECK_INTERVAL
177
178 if not self._had_active_jobs:
179
180 return None
181
182
183
184 if self._linger_timeout is None:
185 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
186
187 remaining = self._linger_timeout.Remaining()
188
189 logging.info("Job queue no longer busy; shutting down master daemon"
190 " in %s seconds", remaining)
191
192
193
194
195 if remaining < 0:
196 return None
197 else:
198 return remaining
199
202 """Master Server.
203
204 This is the main asynchronous master server. It handles connections to the
205 master socket.
206
207 """
208 family = socket.AF_UNIX
209
211 """MasterServer constructor
212
213 @param address: the unix socket address to bind the MasterServer to
214 @param uid: The uid of the owner of the socket
215 @param gid: The gid of the owner of the socket
216
217 """
218 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
219 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
220 os.chmod(temp_name, 0770)
221 os.chown(temp_name, uid, gid)
222 os.rename(temp_name, address)
223
224 self.awaker = daemon.AsyncAwaker()
225
226
227 self.context = None
228 self.request_workers = None
229
230 self._shutdown_check = None
231
236
242
244 """Prepares server for shutdown.
245
246 """
247 if self._shutdown_check is None:
248 self._shutdown_check = _MasterShutdownCheck()
249
250 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
251
253 """Cleanup the server.
254
255 This involves shutting down the processor threads and the master
256 socket.
257
258 """
259 try:
260 self.close()
261 finally:
262 if self.request_workers:
263 self.request_workers.TerminateWorkers()
264 if self.context:
265 self.context.jobqueue.Shutdown()
266
269 """Class holding high-level client operations."""
272
274 context = self.server.context
275 queue = context.jobqueue
276
277
278 if not isinstance(args, (tuple, list)):
279 logging.info("Received invalid arguments of type '%s'", type(args))
280 raise ValueError("Invalid arguments type '%s'" % type(args))
281
282
283
284 if method == luxi.REQ_SUBMIT_JOB:
285 logging.info("Receiving new job")
286 (job_def, ) = args
287 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
288 job_id = queue.SubmitJob(ops)
289 _LogNewJob(True, job_id, ops)
290 return job_id
291
292 elif method == luxi.REQ_SUBMIT_MANY_JOBS:
293 logging.info("Receiving multiple jobs")
294 (job_defs, ) = args
295 jobs = []
296 for ops in job_defs:
297 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
298 job_ids = queue.SubmitManyJobs(jobs)
299 for ((status, job_id), ops) in zip(job_ids, jobs):
300 _LogNewJob(status, job_id, ops)
301 return job_ids
302
303 elif method == luxi.REQ_CANCEL_JOB:
304 (job_id, ) = args
305 logging.info("Received job cancel request for %s", job_id)
306 return queue.CancelJob(job_id)
307
308 elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
309 (job_id, priority) = args
310 logging.info("Received request to change priority for job %s to %s",
311 job_id, priority)
312 return queue.ChangeJobPriority(job_id, priority)
313
314 elif method == luxi.REQ_ARCHIVE_JOB:
315 (job_id, ) = args
316 logging.info("Received job archive request for %s", job_id)
317 return queue.ArchiveJob(job_id)
318
319 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
320 (age, timeout) = args
321 logging.info("Received job autoarchive request for age %s, timeout %s",
322 age, timeout)
323 return queue.AutoArchiveJobs(age, timeout)
324
325 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
326 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
327 logging.info("Received job poll request for %s", job_id)
328 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
329 prev_log_serial, timeout)
330
331 elif method == luxi.REQ_QUERY:
332 (what, fields, qfilter) = args
333
334 if what in constants.QR_VIA_OP:
335 result = self._Query(opcodes.OpQuery(what=what, fields=fields,
336 qfilter=qfilter))
337 elif what == constants.QR_LOCK:
338 if qfilter is not None:
339 raise errors.OpPrereqError("Lock queries can't be filtered",
340 errors.ECODE_INVAL)
341 return context.glm.QueryLocks(fields)
342 elif what == constants.QR_JOB:
343 return queue.QueryJobs(fields, qfilter)
344 elif what in constants.QR_VIA_LUXI:
345 raise NotImplementedError
346 else:
347 raise errors.OpPrereqError("Resource type '%s' unknown" % what,
348 errors.ECODE_INVAL)
349
350 return result
351
352 elif method == luxi.REQ_QUERY_FIELDS:
353 (what, fields) = args
354 req = objects.QueryFieldsRequest(what=what, fields=fields)
355
356 try:
357 fielddefs = query.ALL_FIELDS[req.what]
358 except KeyError:
359 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
360 errors.ECODE_INVAL)
361
362 return query.QueryFields(fielddefs, req.fields)
363
364 elif method == luxi.REQ_QUERY_JOBS:
365 (job_ids, fields) = args
366 if isinstance(job_ids, (tuple, list)) and job_ids:
367 msg = utils.CommaJoin(job_ids)
368 else:
369 msg = str(job_ids)
370 logging.info("Received job query request for %s", msg)
371 return queue.OldStyleQueryJobs(job_ids, fields)
372
373 elif method == luxi.REQ_QUERY_INSTANCES:
374 (names, fields, use_locking) = args
375 logging.info("Received instance query request for %s", names)
376 if use_locking:
377 raise errors.OpPrereqError("Sync queries are not allowed",
378 errors.ECODE_INVAL)
379 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
380 use_locking=use_locking)
381 return self._Query(op)
382
383 elif method == luxi.REQ_QUERY_NODES:
384 (names, fields, use_locking) = args
385 logging.info("Received node query request for %s", names)
386 if use_locking:
387 raise errors.OpPrereqError("Sync queries are not allowed",
388 errors.ECODE_INVAL)
389 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
390 use_locking=use_locking)
391 return self._Query(op)
392
393 elif method == luxi.REQ_QUERY_GROUPS:
394 (names, fields, use_locking) = args
395 logging.info("Received group query request for %s", names)
396 if use_locking:
397 raise errors.OpPrereqError("Sync queries are not allowed",
398 errors.ECODE_INVAL)
399 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
400 return self._Query(op)
401
402 elif method == luxi.REQ_QUERY_NETWORKS:
403 (names, fields, use_locking) = args
404 logging.info("Received network query request for %s", names)
405 if use_locking:
406 raise errors.OpPrereqError("Sync queries are not allowed",
407 errors.ECODE_INVAL)
408 op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
409 return self._Query(op)
410
411 elif method == luxi.REQ_QUERY_EXPORTS:
412 (nodes, use_locking) = args
413 if use_locking:
414 raise errors.OpPrereqError("Sync queries are not allowed",
415 errors.ECODE_INVAL)
416 logging.info("Received exports query request")
417 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
418 return self._Query(op)
419
420 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
421 (fields, ) = args
422 logging.info("Received config values query request for %s", fields)
423 op = opcodes.OpClusterConfigQuery(output_fields=fields)
424 return self._Query(op)
425
426 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
427 logging.info("Received cluster info query request")
428 op = opcodes.OpClusterQuery()
429 return self._Query(op)
430
431 elif method == luxi.REQ_QUERY_TAGS:
432 (kind, name) = args
433 logging.info("Received tags query request")
434 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
435 return self._Query(op)
436
437 elif method == luxi.REQ_SET_DRAIN_FLAG:
438 (drain_flag, ) = args
439 logging.info("Received queue drain flag change request to %s",
440 drain_flag)
441 return queue.SetDrainFlag(drain_flag)
442
443 elif method == luxi.REQ_SET_WATCHER_PAUSE:
444 (until, ) = args
445
446 return _SetWatcherPause(context, until)
447
448 else:
449 logging.info("Received invalid request '%s'", method)
450 raise ValueError("Invalid operation '%s'" % method)
451
453 """Runs the specified opcode and returns the result.
454
455 """
456
457 proc = mcpu.Processor(self.server.context, None, enable_locks=False)
458
459
460
461 return proc.ExecOpCode(op, None)
462
463
464 -class GanetiContext(object):
465 """Context common to all ganeti threads.
466
467 This class creates and holds common objects shared by all threads.
468
469 """
470
471
472 _instance = None
473
474 - def __init__(self):
475 """Constructs a new GanetiContext object.
476
477 There should be only a GanetiContext object at any time, so this
478 function raises an error if this is not the case.
479
480 """
481 assert self.__class__._instance is None, "double GanetiContext instance"
482
483
484 self.cfg = config.ConfigWriter()
485
486
487 self.glm = locking.GanetiLockManager(
488 self.cfg.GetNodeList(),
489 self.cfg.GetNodeGroupList(),
490 self.cfg.GetInstanceList(),
491 self.cfg.GetNetworkList())
492
493 self.cfg.SetContext(self)
494
495
496 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
497
498
499 self.jobqueue = jqueue.JobQueue(self)
500
501
502 self.__class__._instance = self
503
504 - def __setattr__(self, name, value):
505 """Setting GanetiContext attributes is forbidden after initialization.
506
507 """
508 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
509 object.__setattr__(self, name, value)
510
511 - def AddNode(self, node, ec_id):
512 """Adds a node to the configuration and lock manager.
513
514 """
515
516 self.cfg.AddNode(node, ec_id)
517
518
519 self.jobqueue.AddNode(node)
520
521
522 self.glm.add(locking.LEVEL_NODE, node.name)
523 self.glm.add(locking.LEVEL_NODE_RES, node.name)
524
525 - def ReaddNode(self, node):
526 """Updates a node that's already in the configuration
527
528 """
529
530 self.jobqueue.AddNode(node)
531
532 - def RemoveNode(self, name):
533 """Removes a node from the configuration and lock manager.
534
535 """
536
537 self.cfg.RemoveNode(name)
538
539
540 self.jobqueue.RemoveNode(name)
541
542
543 self.glm.remove(locking.LEVEL_NODE, name)
544 self.glm.remove(locking.LEVEL_NODE_RES, name)
545
548 """Creates or removes the watcher pause file.
549
550 @type context: L{GanetiContext}
551 @param context: Global Ganeti context
552 @type until: None or int
553 @param until: Unix timestamp saying until when the watcher shouldn't run
554
555 """
556 node_names = context.cfg.GetNodeList()
557
558 if until is None:
559 logging.info("Received request to no longer pause watcher")
560 else:
561 if not ht.TNumber(until):
562 raise TypeError("Duration must be numeric")
563
564 if until < time.time():
565 raise errors.GenericError("Unable to set pause end time in the past")
566
567 logging.info("Received request to pause watcher until %s", until)
568
569 result = context.rpc.call_set_watcher_pause(node_names, until)
570
571 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
572 for (node_name, nres) in result.items()
573 if nres.fail_msg and not nres.offline)
574 if errmsg:
575 raise errors.OpExecError("Watcher pause was set where possible, but failed"
576 " on the following node(s): %s" % errmsg)
577
578 return until
579
583 """Check the agreement on who is the master.
584
585 The function uses a very simple algorithm: we must get more positive
586 than negative answers. Since in most of the cases we are the master,
587 we'll use our own config file for getting the node list. In the
588 future we could collect the current node list from our (possibly
589 obsolete) known nodes.
590
591 In order to account for cold-start of all nodes, we retry for up to
592 a minute until we get a real answer as the top-voted one. If the
593 nodes are more out-of-sync, for now manual startup of the master
594 should be attempted.
595
596 Note that for a even number of nodes cluster, we need at least half
597 of the nodes (beside ourselves) to vote for us. This creates a
598 problem on two-node clusters, since in this case we require the
599 other node to be up too to confirm our status.
600
601 """
602 myself = netutils.Hostname.GetSysName()
603
604 cfg = config.ConfigWriter()
605 node_list = cfg.GetNodeList()
606 del cfg
607 retries = 6
608 while retries > 0:
609 votes = bootstrap.GatherMasterVotes(node_list)
610 if not votes:
611
612 return True
613 if votes[0][0] is None:
614 retries -= 1
615 time.sleep(10)
616 continue
617 break
618 if retries == 0:
619 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
620 " after multiple retries. Aborting startup")
621 logging.critical("Use the --no-voting option if you understand what"
622 " effects it has on the cluster state")
623 return False
624
625 all_votes = sum(item[1] for item in votes)
626 top_node, top_votes = votes[0]
627
628 result = False
629 if top_node != myself:
630 logging.critical("It seems we are not the master (top-voted node"
631 " is %s with %d out of %d votes)", top_node, top_votes,
632 all_votes)
633 elif top_votes < all_votes - top_votes:
634 logging.critical("It seems we are not the master (%d votes for,"
635 " %d votes against)", top_votes, all_votes - top_votes)
636 else:
637 result = True
638
639 return result
640
655
658 """Initial checks whether to run or exit with a failure.
659
660 """
661 if args:
662 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
663 sys.exit(constants.EXIT_FAILURE)
664
665 ssconf.CheckMaster(options.debug)
666
667 try:
668 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
669 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
670 except KeyError:
671 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
672 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
673 sys.exit(constants.EXIT_FAILURE)
674
675
676 runtime.InitArchInfo()
677
678
679 try:
680 config.ConfigWriter()
681 except errors.ConfigVersionMismatch, err:
682 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
683 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
684 print >> sys.stderr, \
685 ("Configuration version mismatch. The current Ganeti software"
686 " expects version %s, but the on-disk configuration file has"
687 " version %s. This is likely the result of upgrading the"
688 " software without running the upgrade procedure. Please contact"
689 " your cluster administrator or complete the upgrade using the"
690 " cfgupgrade utility, after reading the upgrade notes." %
691 (v1, v2))
692 sys.exit(constants.EXIT_FAILURE)
693 except errors.ConfigurationError, err:
694 print >> sys.stderr, \
695 ("Configuration error while opening the configuration file: %s\n"
696 "This might be caused by an incomplete software upgrade or"
697 " by a corrupted configuration file. Until the problem is fixed"
698 " the master daemon cannot start." % str(err))
699 sys.exit(constants.EXIT_FAILURE)
700
701
702
703 if options.no_voting:
704 if not options.yes_do_it:
705 sys.stdout.write("The 'no voting' option has been selected.\n")
706 sys.stdout.write("This is dangerous, please confirm by"
707 " typing uppercase 'yes': ")
708 sys.stdout.flush()
709
710 confirmation = sys.stdin.readline().strip()
711 if confirmation != "YES":
712 print >> sys.stderr, "Aborting."
713 sys.exit(constants.EXIT_FAILURE)
714
715 else:
716
717
718
719 if not utils.RunInSeparateProcess(CheckAgreement):
720 sys.exit(constants.EXIT_FAILURE)
721
722
723
724
725
726 utils.RunInSeparateProcess(ActivateMasterIP)
727
740
743 """Main master daemon function, executed with the PID file held.
744
745 """
746 (mainloop, master) = prep_data
747 try:
748 rpc.Init()
749 try:
750 master.setup_queue()
751 try:
752 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
753 finally:
754 master.server_cleanup()
755 finally:
756 rpc.Shutdown()
757 finally:
758 utils.RemoveFile(pathutils.MASTER_SOCKET)
759
760 logging.info("Clean master daemon shutdown")
761
764 """Main function"""
765 parser = OptionParser(description="Ganeti master daemon",
766 usage="%prog [-f] [-d]",
767 version="%%prog (ganeti) %s" %
768 constants.RELEASE_VERSION)
769 parser.add_option("--no-voting", dest="no_voting",
770 help="Do not check that the nodes agree on this node"
771 " being the master and start the daemon unconditionally",
772 default=False, action="store_true")
773 parser.add_option("--yes-do-it", dest="yes_do_it",
774 help="Override interactive check for --no-voting",
775 default=False, action="store_true")
776 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
777 ExecMasterd, multithreaded=True)
778