1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Master daemon program.
32
33 Some classes deviates from the standard style guide since the
34 inheritance from parent classes requires it.
35
36 """
37
38
39
40
41 import grp
42 import os
43 import pwd
44 import sys
45 import socket
46 import time
47 import tempfile
48 import logging
49
50 from optparse import OptionParser
51
52 from ganeti import config
53 from ganeti import constants
54 from ganeti import daemon
55 from ganeti import mcpu
56 from ganeti import opcodes
57 from ganeti import jqueue
58 from ganeti import locking
59 from ganeti import luxi
60 from ganeti import utils
61 from ganeti import errors
62 from ganeti import ssconf
63 from ganeti import workerpool
64 from ganeti import rpc
65 from ganeti import bootstrap
66 from ganeti import netutils
67 from ganeti import objects
68 from ganeti import query
69 from ganeti import runtime
70 from ganeti import pathutils
71 from ganeti import ht
72
73 from ganeti.utils import version
74
75
76 CLIENT_REQUEST_WORKERS = 16
77
78 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
79 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
83 """Log information about a recently submitted job.
84
85 """
86 op_summary = utils.CommaJoin(op.Summary() for op in ops)
87
88 if status:
89 logging.info("New job with id %s, summary: %s", info, op_summary)
90 else:
91 logging.info("Failed to submit job, reason: '%s', summary: %s",
92 info, op_summary)
93
96
97 - def RunTask(self, server, message, client):
98 """Process the request.
99
100 """
101 client_ops = ClientOps(server)
102
103 try:
104 (method, args, ver) = luxi.ParseRequest(message)
105 except luxi.ProtocolError, err:
106 logging.error("Protocol Error: %s", err)
107 client.close_log()
108 return
109
110 success = False
111 try:
112
113 if ver is not None and ver != constants.LUXI_VERSION:
114 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
115 (constants.LUXI_VERSION, ver))
116
117 result = client_ops.handle_request(method, args)
118 success = True
119 except errors.GenericError, err:
120 logging.exception("Unexpected exception")
121 success = False
122 result = errors.EncodeException(err)
123 except:
124 logging.exception("Unexpected exception")
125 err = sys.exc_info()
126 result = "Caught exception: %s" % str(err[1])
127
128 try:
129 reply = luxi.FormatResponse(success, result)
130 client.send_message(reply)
131
132 server.awaker.signal()
133 except:
134 logging.exception("Send error")
135 client.close_log()
136
139 """Handler for master peers.
140
141 """
142 _MAX_UNHANDLED = 1
143
144 - def __init__(self, server, connected_socket, client_address, family):
150
153
156 """Logic for master daemon shutdown.
157
158 """
159
160 _CHECK_INTERVAL = 5.0
161
162
163
164 _SHUTDOWN_LINGER = 5.0
165
167 """Initializes this class.
168
169 """
170 self._had_active_jobs = None
171 self._linger_timeout = None
172
174 """Determines if master daemon is ready for shutdown.
175
176 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
177 @rtype: None or number
178 @return: None if master daemon is ready, timeout if the check must be
179 repeated
180
181 """
182 if jq_prepare_result:
183
184 logging.info("Job queue has been notified for shutdown but is still"
185 " busy; next check in %s seconds", self._CHECK_INTERVAL)
186 self._had_active_jobs = True
187 return self._CHECK_INTERVAL
188
189 if not self._had_active_jobs:
190
191 return None
192
193
194
195 if self._linger_timeout is None:
196 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
197
198 remaining = self._linger_timeout.Remaining()
199
200 logging.info("Job queue no longer busy; shutting down master daemon"
201 " in %s seconds", remaining)
202
203
204
205
206 if remaining < 0:
207 return None
208 else:
209 return remaining
210
213 """Master Server.
214
215 This is the main asynchronous master server. It handles connections to the
216 master socket.
217
218 """
219 family = socket.AF_UNIX
220
222 """MasterServer constructor
223
224 @param address: the unix socket address to bind the MasterServer to
225 @param uid: The uid of the owner of the socket
226 @param gid: The gid of the owner of the socket
227
228 """
229 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
230 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
231 os.chmod(temp_name, 0770)
232 os.chown(temp_name, uid, gid)
233 os.rename(temp_name, address)
234
235 self.awaker = daemon.AsyncAwaker()
236
237
238 self.context = None
239 self.request_workers = None
240
241 self._shutdown_check = None
242
247
253
255 """Prepares server for shutdown.
256
257 """
258 if self._shutdown_check is None:
259 self._shutdown_check = _MasterShutdownCheck()
260
261 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
262
264 """Cleanup the server.
265
266 This involves shutting down the processor threads and the master
267 socket.
268
269 """
270 try:
271 self.close()
272 finally:
273 if self.request_workers:
274 self.request_workers.TerminateWorkers()
275 if self.context:
276 self.context.jobqueue.Shutdown()
277
280 """Class holding high-level client operations."""
283
285 context = self.server.context
286 queue = context.jobqueue
287
288
289 if not isinstance(args, (tuple, list)):
290 logging.info("Received invalid arguments of type '%s'", type(args))
291 raise ValueError("Invalid arguments type '%s'" % type(args))
292
293 if method not in luxi.REQ_ALL:
294 logging.info("Received invalid request '%s'", method)
295 raise ValueError("Invalid operation '%s'" % method)
296
297
298
299 if method == luxi.REQ_SUBMIT_JOB:
300 logging.info("Receiving new job")
301 (job_def, ) = args
302 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
303 job_id = queue.SubmitJob(ops)
304 _LogNewJob(True, job_id, ops)
305 return job_id
306
307 elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
308 logging.info("Forcefully receiving new job")
309 (job_def, ) = args
310 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
311 job_id = queue.SubmitJobToDrainedQueue(ops)
312 _LogNewJob(True, job_id, ops)
313 return job_id
314
315 elif method == luxi.REQ_SUBMIT_MANY_JOBS:
316 logging.info("Receiving multiple jobs")
317 (job_defs, ) = args
318 jobs = []
319 for ops in job_defs:
320 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
321 job_ids = queue.SubmitManyJobs(jobs)
322 for ((status, job_id), ops) in zip(job_ids, jobs):
323 _LogNewJob(status, job_id, ops)
324 return job_ids
325
326 elif method == luxi.REQ_CANCEL_JOB:
327 (job_id, ) = args
328 logging.info("Received job cancel request for %s", job_id)
329 return queue.CancelJob(job_id)
330
331 elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
332 (job_id, priority) = args
333 logging.info("Received request to change priority for job %s to %s",
334 job_id, priority)
335 return queue.ChangeJobPriority(job_id, priority)
336
337 elif method == luxi.REQ_ARCHIVE_JOB:
338 (job_id, ) = args
339 logging.info("Received job archive request for %s", job_id)
340 return queue.ArchiveJob(job_id)
341
342 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
343 (age, timeout) = args
344 logging.info("Received job autoarchive request for age %s, timeout %s",
345 age, timeout)
346 return queue.AutoArchiveJobs(age, timeout)
347
348 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
349 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
350 logging.info("Received job poll request for %s", job_id)
351 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
352 prev_log_serial, timeout)
353
354 elif method == luxi.REQ_QUERY:
355 (what, fields, qfilter) = args
356
357 if what in constants.QR_VIA_OP:
358 result = self._Query(opcodes.OpQuery(what=what, fields=fields,
359 qfilter=qfilter))
360 elif what == constants.QR_LOCK:
361 if qfilter is not None:
362 raise errors.OpPrereqError("Lock queries can't be filtered",
363 errors.ECODE_INVAL)
364 return context.glm.QueryLocks(fields)
365 elif what == constants.QR_JOB:
366 return queue.QueryJobs(fields, qfilter)
367 elif what in constants.QR_VIA_LUXI:
368 raise NotImplementedError
369 else:
370 raise errors.OpPrereqError("Resource type '%s' unknown" % what,
371 errors.ECODE_INVAL)
372
373 return result
374
375 elif method == luxi.REQ_QUERY_FIELDS:
376 (what, fields) = args
377 req = objects.QueryFieldsRequest(what=what, fields=fields)
378
379 try:
380 fielddefs = query.ALL_FIELDS[req.what]
381 except KeyError:
382 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
383 errors.ECODE_INVAL)
384
385 return query.QueryFields(fielddefs, req.fields)
386
387 elif method == luxi.REQ_QUERY_JOBS:
388 (job_ids, fields) = args
389 if isinstance(job_ids, (tuple, list)) and job_ids:
390 msg = utils.CommaJoin(job_ids)
391 else:
392 msg = str(job_ids)
393 logging.info("Received job query request for %s", msg)
394 return queue.OldStyleQueryJobs(job_ids, fields)
395
396 elif method == luxi.REQ_QUERY_INSTANCES:
397 (names, fields, use_locking) = args
398 logging.info("Received instance query request for %s", names)
399 if use_locking:
400 raise errors.OpPrereqError("Sync queries are not allowed",
401 errors.ECODE_INVAL)
402 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
403 use_locking=use_locking)
404 return self._Query(op)
405
406 elif method == luxi.REQ_QUERY_NODES:
407 (names, fields, use_locking) = args
408 logging.info("Received node query request for %s", names)
409 if use_locking:
410 raise errors.OpPrereqError("Sync queries are not allowed",
411 errors.ECODE_INVAL)
412 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
413 use_locking=use_locking)
414 return self._Query(op)
415
416 elif method == luxi.REQ_QUERY_GROUPS:
417 (names, fields, use_locking) = args
418 logging.info("Received group query request for %s", names)
419 if use_locking:
420 raise errors.OpPrereqError("Sync queries are not allowed",
421 errors.ECODE_INVAL)
422 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
423 return self._Query(op)
424
425 elif method == luxi.REQ_QUERY_NETWORKS:
426 (names, fields, use_locking) = args
427 logging.info("Received network query request for %s", names)
428 if use_locking:
429 raise errors.OpPrereqError("Sync queries are not allowed",
430 errors.ECODE_INVAL)
431 op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
432 return self._Query(op)
433
434 elif method == luxi.REQ_QUERY_EXPORTS:
435 (nodes, use_locking) = args
436 if use_locking:
437 raise errors.OpPrereqError("Sync queries are not allowed",
438 errors.ECODE_INVAL)
439 logging.info("Received exports query request")
440 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
441 return self._Query(op)
442
443 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
444 (fields, ) = args
445 logging.info("Received config values query request for %s", fields)
446 op = opcodes.OpClusterConfigQuery(output_fields=fields)
447 return self._Query(op)
448
449 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
450 logging.info("Received cluster info query request")
451 op = opcodes.OpClusterQuery()
452 return self._Query(op)
453
454 elif method == luxi.REQ_QUERY_TAGS:
455 (kind, name) = args
456 logging.info("Received tags query request")
457 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
458 return self._Query(op)
459
460 elif method == luxi.REQ_SET_DRAIN_FLAG:
461 (drain_flag, ) = args
462 logging.info("Received queue drain flag change request to %s",
463 drain_flag)
464 return queue.SetDrainFlag(drain_flag)
465
466 elif method == luxi.REQ_SET_WATCHER_PAUSE:
467 (until, ) = args
468
469 return _SetWatcherPause(context, until)
470
471 else:
472 logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
473 raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
474 " but not implemented" % method)
475
477 """Runs the specified opcode and returns the result.
478
479 """
480
481 proc = mcpu.Processor(self.server.context, None, enable_locks=False)
482
483
484
485 return proc.ExecOpCode(op, None)
486
487
488 -class GanetiContext(object):
489 """Context common to all ganeti threads.
490
491 This class creates and holds common objects shared by all threads.
492
493 """
494
495
496 _instance = None
497
498 - def __init__(self):
499 """Constructs a new GanetiContext object.
500
501 There should be only a GanetiContext object at any time, so this
502 function raises an error if this is not the case.
503
504 """
505 assert self.__class__._instance is None, "double GanetiContext instance"
506
507
508 self.cfg = config.ConfigWriter()
509
510
511 self.glm = locking.GanetiLockManager(
512 self.cfg.GetNodeList(),
513 self.cfg.GetNodeGroupList(),
514 [inst.name for inst in self.cfg.GetAllInstancesInfo().values()],
515 self.cfg.GetNetworkList())
516
517 self.cfg.SetContext(self)
518
519
520 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
521
522
523 self.jobqueue = jqueue.JobQueue(self)
524
525
526 self.__class__._instance = self
527
528 - def __setattr__(self, name, value):
529 """Setting GanetiContext attributes is forbidden after initialization.
530
531 """
532 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
533 object.__setattr__(self, name, value)
534
535 - def AddNode(self, node, ec_id):
536 """Adds a node to the configuration and lock manager.
537
538 """
539
540 self.cfg.AddNode(node, ec_id)
541
542
543 self.jobqueue.AddNode(node)
544
545
546 self.glm.add(locking.LEVEL_NODE, node.uuid)
547 self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
548
549 - def ReaddNode(self, node):
550 """Updates a node that's already in the configuration
551
552 """
553
554 self.jobqueue.AddNode(node)
555
556 - def RemoveNode(self, node):
557 """Removes a node from the configuration and lock manager.
558
559 """
560
561 self.cfg.RemoveNode(node.uuid)
562
563
564 self.jobqueue.RemoveNode(node.name)
565
566
567 self.glm.remove(locking.LEVEL_NODE, node.uuid)
568 self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
569
572 """Creates or removes the watcher pause file.
573
574 @type context: L{GanetiContext}
575 @param context: Global Ganeti context
576 @type until: None or int
577 @param until: Unix timestamp saying until when the watcher shouldn't run
578
579 """
580 node_names = context.cfg.GetNodeList()
581
582 if until is None:
583 logging.info("Received request to no longer pause watcher")
584 else:
585 if not ht.TNumber(until):
586 raise TypeError("Duration must be numeric")
587
588 if until < time.time():
589 raise errors.GenericError("Unable to set pause end time in the past")
590
591 logging.info("Received request to pause watcher until %s", until)
592
593 result = context.rpc.call_set_watcher_pause(node_names, until)
594
595 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
596 for (node_name, nres) in result.items()
597 if nres.fail_msg and not nres.offline)
598 if errmsg:
599 raise errors.OpExecError("Watcher pause was set where possible, but failed"
600 " on the following node(s): %s" % errmsg)
601
602 return until
603
607 """Check the agreement on who is the master.
608
609 The function uses a very simple algorithm: we must get more positive
610 than negative answers. Since in most of the cases we are the master,
611 we'll use our own config file for getting the node list. In the
612 future we could collect the current node list from our (possibly
613 obsolete) known nodes.
614
615 In order to account for cold-start of all nodes, we retry for up to
616 a minute until we get a real answer as the top-voted one. If the
617 nodes are more out-of-sync, for now manual startup of the master
618 should be attempted.
619
620 Note that for a even number of nodes cluster, we need at least half
621 of the nodes (beside ourselves) to vote for us. This creates a
622 problem on two-node clusters, since in this case we require the
623 other node to be up too to confirm our status.
624
625 """
626 myself = netutils.Hostname.GetSysName()
627
628 cfg = config.ConfigWriter()
629 node_names = cfg.GetNodeNames(cfg.GetNodeList())
630 del cfg
631 retries = 6
632 while retries > 0:
633 votes = bootstrap.GatherMasterVotes(node_names)
634 if not votes:
635
636 return True
637 if votes[0][0] is None:
638 retries -= 1
639 time.sleep(10)
640 continue
641 break
642 if retries == 0:
643 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
644 " after multiple retries. Aborting startup")
645 logging.critical("Use the --no-voting option if you understand what"
646 " effects it has on the cluster state")
647 return False
648
649 all_votes = sum(item[1] for item in votes)
650 top_node, top_votes = votes[0]
651
652 result = False
653 if top_node != myself:
654 logging.critical("It seems we are not the master (top-voted node"
655 " is %s with %d out of %d votes)", top_node, top_votes,
656 all_votes)
657 elif top_votes < all_votes - top_votes:
658 logging.critical("It seems we are not the master (%d votes for,"
659 " %d votes against)", top_votes, all_votes - top_votes)
660 else:
661 result = True
662
663 return result
664
680
683 """Initial checks whether to run or exit with a failure.
684
685 """
686 if args:
687 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
688 sys.exit(constants.EXIT_FAILURE)
689
690 ssconf.CheckMaster(options.debug)
691
692 try:
693 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
694 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
695 except KeyError:
696 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
697 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
698 sys.exit(constants.EXIT_FAILURE)
699
700
701 runtime.InitArchInfo()
702
703
704 try:
705 config.ConfigWriter()
706 except errors.ConfigVersionMismatch, err:
707 v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
708 v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
709 print >> sys.stderr, \
710 ("Configuration version mismatch. The current Ganeti software"
711 " expects version %s, but the on-disk configuration file has"
712 " version %s. This is likely the result of upgrading the"
713 " software without running the upgrade procedure. Please contact"
714 " your cluster administrator or complete the upgrade using the"
715 " cfgupgrade utility, after reading the upgrade notes." %
716 (v1, v2))
717 sys.exit(constants.EXIT_FAILURE)
718 except errors.ConfigurationError, err:
719 print >> sys.stderr, \
720 ("Configuration error while opening the configuration file: %s\n"
721 "This might be caused by an incomplete software upgrade or"
722 " by a corrupted configuration file. Until the problem is fixed"
723 " the master daemon cannot start." % str(err))
724 sys.exit(constants.EXIT_FAILURE)
725
726
727
728 if options.no_voting:
729 if not options.yes_do_it:
730 sys.stdout.write("The 'no voting' option has been selected.\n")
731 sys.stdout.write("This is dangerous, please confirm by"
732 " typing uppercase 'yes': ")
733 sys.stdout.flush()
734
735 confirmation = sys.stdin.readline().strip()
736 if confirmation != "YES":
737 print >> sys.stderr, "Aborting."
738 sys.exit(constants.EXIT_FAILURE)
739
740 else:
741
742
743
744 if not utils.RunInSeparateProcess(CheckAgreement):
745 sys.exit(constants.EXIT_FAILURE)
746
747
748
749
750
751 utils.RunInSeparateProcess(ActivateMasterIP)
752
765
768 """Main master daemon function, executed with the PID file held.
769
770 """
771 (mainloop, master) = prep_data
772 try:
773 rpc.Init()
774 try:
775 master.setup_queue()
776 try:
777 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
778 finally:
779 master.server_cleanup()
780 finally:
781 rpc.Shutdown()
782 finally:
783 utils.RemoveFile(pathutils.MASTER_SOCKET)
784
785 logging.info("Clean master daemon shutdown")
786
789 """Main function"""
790 parser = OptionParser(description="Ganeti master daemon",
791 usage="%prog [-f] [-d]",
792 version="%%prog (ganeti) %s" %
793 constants.RELEASE_VERSION)
794 parser.add_option("--no-voting", dest="no_voting",
795 help="Do not check that the nodes agree on this node"
796 " being the master and start the daemon unconditionally",
797 default=False, action="store_true")
798 parser.add_option("--yes-do-it", dest="yes_do_it",
799 help="Override interactive check for --no-voting",
800 default=False, action="store_true")
801 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
802 ExecMasterd, multithreaded=True)
803