1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Master daemon program.
32
33 Some classes deviates from the standard style guide since the
34 inheritance from parent classes requires it.
35
36 """
37
38
39
40
41 import grp
42 import os
43 import pwd
44 import sys
45 import socket
46 import time
47 import tempfile
48 import logging
49
50 from optparse import OptionParser
51
52 from ganeti import config
53 from ganeti import constants
54 from ganeti import daemon
55 from ganeti import mcpu
56 from ganeti import opcodes
57 from ganeti import jqueue
58 from ganeti import locking
59 from ganeti import luxi
60 import ganeti.rpc.errors as rpcerr
61 from ganeti import utils
62 from ganeti import errors
63 from ganeti import ssconf
64 from ganeti import workerpool
65 import ganeti.rpc.node as rpc
66 import ganeti.rpc.client as rpccl
67 from ganeti import bootstrap
68 from ganeti import netutils
69 from ganeti import objects
70 from ganeti import query
71 from ganeti import runtime
72 from ganeti import pathutils
73 from ganeti import ht
74
75 from ganeti.utils import version
76
77
78 CLIENT_REQUEST_WORKERS = 16
79
80 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
81 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
85 """Log information about a recently submitted job.
86
87 """
88 op_summary = utils.CommaJoin(op.Summary() for op in ops)
89
90 if status:
91 logging.info("New job with id %s, summary: %s", info, op_summary)
92 else:
93 logging.info("Failed to submit job, reason: '%s', summary: %s",
94 info, op_summary)
95
98
99 - def RunTask(self, server, message, client):
100 """Process the request.
101
102 """
103 client_ops = ClientOps(server)
104
105 try:
106 (method, args, ver) = rpccl.ParseRequest(message)
107 except rpcerr.ProtocolError, err:
108 logging.error("Protocol Error: %s", err)
109 client.close_log()
110 return
111
112 success = False
113 try:
114
115 if ver is not None and ver != constants.LUXI_VERSION:
116 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
117 (constants.LUXI_VERSION, ver))
118
119 result = client_ops.handle_request(method, args)
120 success = True
121 except errors.GenericError, err:
122 logging.exception("Unexpected exception")
123 success = False
124 result = errors.EncodeException(err)
125 except:
126 logging.exception("Unexpected exception")
127 err = sys.exc_info()
128 result = "Caught exception: %s" % str(err[1])
129
130 try:
131 reply = rpccl.FormatResponse(success, result)
132 client.send_message(reply)
133
134 server.awaker.signal()
135 except:
136 logging.exception("Send error")
137 client.close_log()
138
141 """Handler for master peers.
142
143 """
144 _MAX_UNHANDLED = 1
145
146 - def __init__(self, server, connected_socket, client_address, family):
152
155
158 """Logic for master daemon shutdown.
159
160 """
161
162 _CHECK_INTERVAL = 5.0
163
164
165
166 _SHUTDOWN_LINGER = 5.0
167
169 """Initializes this class.
170
171 """
172 self._had_active_jobs = None
173 self._linger_timeout = None
174
176 """Determines if master daemon is ready for shutdown.
177
178 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
179 @rtype: None or number
180 @return: None if master daemon is ready, timeout if the check must be
181 repeated
182
183 """
184 if jq_prepare_result:
185
186 logging.info("Job queue has been notified for shutdown but is still"
187 " busy; next check in %s seconds", self._CHECK_INTERVAL)
188 self._had_active_jobs = True
189 return self._CHECK_INTERVAL
190
191 if not self._had_active_jobs:
192
193 return None
194
195
196
197 if self._linger_timeout is None:
198 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
199
200 remaining = self._linger_timeout.Remaining()
201
202 logging.info("Job queue no longer busy; shutting down master daemon"
203 " in %s seconds", remaining)
204
205
206
207
208 if remaining < 0:
209 return None
210 else:
211 return remaining
212
215 """Master Server.
216
217 This is the main asynchronous master server. It handles connections to the
218 master socket.
219
220 """
221 family = socket.AF_UNIX
222
224 """MasterServer constructor
225
226 @param address: the unix socket address to bind the MasterServer to
227 @param uid: The uid of the owner of the socket
228 @param gid: The gid of the owner of the socket
229
230 """
231 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
232 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
233 os.chmod(temp_name, 0770)
234 os.chown(temp_name, uid, gid)
235 os.rename(temp_name, address)
236
237 self.awaker = daemon.AsyncAwaker()
238
239
240 self.context = None
241 self.request_workers = None
242
243 self._shutdown_check = None
244
249
255
257 """Prepares server for shutdown.
258
259 """
260 if self._shutdown_check is None:
261 self._shutdown_check = _MasterShutdownCheck()
262
263 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
264
266 """Cleanup the server.
267
268 This involves shutting down the processor threads and the master
269 socket.
270
271 """
272 try:
273 self.close()
274 finally:
275 if self.request_workers:
276 self.request_workers.TerminateWorkers()
277 if self.context:
278 self.context.jobqueue.Shutdown()
279
282 """Class holding high-level client operations."""
285
287 context = self.server.context
288 queue = context.jobqueue
289
290
291 if not isinstance(args, (tuple, list)):
292 logging.info("Received invalid arguments of type '%s'", type(args))
293 raise ValueError("Invalid arguments type '%s'" % type(args))
294
295 if method not in luxi.REQ_ALL:
296 logging.info("Received invalid request '%s'", method)
297 raise ValueError("Invalid operation '%s'" % method)
298
299
300
301 if method == luxi.REQ_SUBMIT_JOB:
302 logging.info("Receiving new job")
303 (job_def, ) = args
304 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
305 job_id = queue.SubmitJob(ops)
306 _LogNewJob(True, job_id, ops)
307 return job_id
308
309 elif method == luxi.REQ_PICKUP_JOB:
310 logging.info("Picking up new job from queue")
311 (job_id, ) = args
312 queue.PickupJob(job_id)
313
314 elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
315 logging.info("Forcefully receiving new job")
316 (job_def, ) = args
317 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
318 job_id = queue.SubmitJobToDrainedQueue(ops)
319 _LogNewJob(True, job_id, ops)
320 return job_id
321
322 elif method == luxi.REQ_SUBMIT_MANY_JOBS:
323 logging.info("Receiving multiple jobs")
324 (job_defs, ) = args
325 jobs = []
326 for ops in job_defs:
327 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
328 job_ids = queue.SubmitManyJobs(jobs)
329 for ((status, job_id), ops) in zip(job_ids, jobs):
330 _LogNewJob(status, job_id, ops)
331 return job_ids
332
333 elif method == luxi.REQ_CANCEL_JOB:
334 (job_id, ) = args
335 logging.info("Received job cancel request for %s", job_id)
336 return queue.CancelJob(job_id)
337
338 elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
339 (job_id, priority) = args
340 logging.info("Received request to change priority for job %s to %s",
341 job_id, priority)
342 return queue.ChangeJobPriority(job_id, priority)
343
344 elif method == luxi.REQ_ARCHIVE_JOB:
345 (job_id, ) = args
346 logging.info("Received job archive request for %s", job_id)
347 return queue.ArchiveJob(job_id)
348
349 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
350 (age, timeout) = args
351 logging.info("Received job autoarchive request for age %s, timeout %s",
352 age, timeout)
353 return queue.AutoArchiveJobs(age, timeout)
354
355 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
356 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
357 logging.info("Received job poll request for %s", job_id)
358 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
359 prev_log_serial, timeout)
360
361 elif method == luxi.REQ_QUERY:
362 (what, fields, qfilter) = args
363
364 if what in constants.QR_VIA_OP:
365 result = self._Query(opcodes.OpQuery(what=what, fields=fields,
366 qfilter=qfilter))
367 elif what == constants.QR_LOCK:
368 if qfilter is not None:
369 raise errors.OpPrereqError("Lock queries can't be filtered",
370 errors.ECODE_INVAL)
371 return context.glm.QueryLocks(fields)
372 elif what == constants.QR_JOB:
373 return queue.QueryJobs(fields, qfilter)
374 elif what in constants.QR_VIA_LUXI:
375 luxi_client = runtime.GetClient(query=True)
376 result = luxi_client.Query(what, fields, qfilter).ToDict()
377 else:
378 raise errors.OpPrereqError("Resource type '%s' unknown" % what,
379 errors.ECODE_INVAL)
380
381 return result
382
383 elif method == luxi.REQ_QUERY_FIELDS:
384 (what, fields) = args
385 req = objects.QueryFieldsRequest(what=what, fields=fields)
386
387 try:
388 fielddefs = query.ALL_FIELDS[req.what]
389 except KeyError:
390 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
391 errors.ECODE_INVAL)
392
393 return query.QueryFields(fielddefs, req.fields)
394
395 elif method == luxi.REQ_QUERY_JOBS:
396 (job_ids, fields) = args
397 if isinstance(job_ids, (tuple, list)) and job_ids:
398 msg = utils.CommaJoin(job_ids)
399 else:
400 msg = str(job_ids)
401 logging.info("Received job query request for %s", msg)
402 return queue.OldStyleQueryJobs(job_ids, fields)
403
404 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
405 (fields, ) = args
406 logging.info("Received config values query request for %s", fields)
407 op = opcodes.OpClusterConfigQuery(output_fields=fields)
408 return self._Query(op)
409
410 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
411 logging.info("Received cluster info query request")
412 op = opcodes.OpClusterQuery()
413 return self._Query(op)
414
415 elif method == luxi.REQ_QUERY_TAGS:
416 (kind, name) = args
417 logging.info("Received tags query request")
418 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
419 return self._Query(op)
420
421 elif method == luxi.REQ_SET_DRAIN_FLAG:
422 (drain_flag, ) = args
423 logging.info("Received queue drain flag change request to %s",
424 drain_flag)
425 return queue.SetDrainFlag(drain_flag)
426
427 elif method == luxi.REQ_SET_WATCHER_PAUSE:
428 (until, ) = args
429
430 return _SetWatcherPause(context, until)
431
432 else:
433 logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
434 raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
435 " but not implemented" % method)
436
438 """Runs the specified opcode and returns the result.
439
440 """
441
442 proc = mcpu.Processor(self.server.context, None, enable_locks=False)
443
444
445
446 return proc.ExecOpCode(op, None)
447
448
449 -class GanetiContext(object):
450 """Context common to all ganeti threads.
451
452 This class creates and holds common objects shared by all threads.
453
454 """
455
456
457 _instance = None
458
459 - def __init__(self):
460 """Constructs a new GanetiContext object.
461
462 There should be only a GanetiContext object at any time, so this
463 function raises an error if this is not the case.
464
465 """
466 assert self.__class__._instance is None, "double GanetiContext instance"
467
468
469 self.cfg = config.ConfigWriter()
470
471
472 self.glm = locking.GanetiLockManager(
473 self.cfg.GetNodeList(),
474 self.cfg.GetNodeGroupList(),
475 [inst.name for inst in self.cfg.GetAllInstancesInfo().values()],
476 self.cfg.GetNetworkList())
477
478 self.cfg.SetContext(self)
479
480
481 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
482
483
484 self.jobqueue = jqueue.JobQueue(self)
485
486
487 self.__class__._instance = self
488
489 - def __setattr__(self, name, value):
490 """Setting GanetiContext attributes is forbidden after initialization.
491
492 """
493 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
494 object.__setattr__(self, name, value)
495
496 - def AddNode(self, node, ec_id):
497 """Adds a node to the configuration and lock manager.
498
499 """
500
501 self.cfg.AddNode(node, ec_id)
502
503
504 self.jobqueue.AddNode(node)
505
506
507 self.glm.add(locking.LEVEL_NODE, node.uuid)
508 self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
509
510 - def ReaddNode(self, node):
511 """Updates a node that's already in the configuration
512
513 """
514
515 self.jobqueue.AddNode(node)
516
517 - def RemoveNode(self, node):
518 """Removes a node from the configuration and lock manager.
519
520 """
521
522 self.cfg.RemoveNode(node.uuid)
523
524
525 self.jobqueue.RemoveNode(node.name)
526
527
528 self.glm.remove(locking.LEVEL_NODE, node.uuid)
529 self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
530
533 """Creates or removes the watcher pause file.
534
535 @type context: L{GanetiContext}
536 @param context: Global Ganeti context
537 @type until: None or int
538 @param until: Unix timestamp saying until when the watcher shouldn't run
539
540 """
541 node_names = context.cfg.GetNodeList()
542
543 if until is None:
544 logging.info("Received request to no longer pause watcher")
545 else:
546 if not ht.TNumber(until):
547 raise TypeError("Duration must be numeric")
548
549 if until < time.time():
550 raise errors.GenericError("Unable to set pause end time in the past")
551
552 logging.info("Received request to pause watcher until %s", until)
553
554 result = context.rpc.call_set_watcher_pause(node_names, until)
555
556 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
557 for (node_name, nres) in result.items()
558 if nres.fail_msg and not nres.offline)
559 if errmsg:
560 raise errors.OpExecError("Watcher pause was set where possible, but failed"
561 " on the following node(s): %s" % errmsg)
562
563 return until
564
568 """Check the agreement on who is the master.
569
570 The function uses a very simple algorithm: we must get more positive
571 than negative answers. Since in most of the cases we are the master,
572 we'll use our own config file for getting the node list. In the
573 future we could collect the current node list from our (possibly
574 obsolete) known nodes.
575
576 In order to account for cold-start of all nodes, we retry for up to
577 a minute until we get a real answer as the top-voted one. If the
578 nodes are more out-of-sync, for now manual startup of the master
579 should be attempted.
580
581 Note that for a even number of nodes cluster, we need at least half
582 of the nodes (beside ourselves) to vote for us. This creates a
583 problem on two-node clusters, since in this case we require the
584 other node to be up too to confirm our status.
585
586 """
587 myself = netutils.Hostname.GetSysName()
588
589 cfg = config.ConfigWriter()
590 node_names = cfg.GetNodeNames(cfg.GetNodeList())
591 del cfg
592 retries = 6
593 while retries > 0:
594 votes = bootstrap.GatherMasterVotes(node_names)
595 if not votes:
596
597 return True
598 if votes[0][0] is None:
599 retries -= 1
600 time.sleep(10)
601 continue
602 break
603 if retries == 0:
604 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
605 " after multiple retries. Aborting startup")
606 logging.critical("Use the --no-voting option if you understand what"
607 " effects it has on the cluster state")
608 return False
609
610 all_votes = sum(item[1] for item in votes)
611 top_node, top_votes = votes[0]
612
613 result = False
614 if top_node != myself:
615 logging.critical("It seems we are not the master (top-voted node"
616 " is %s with %d out of %d votes)", top_node, top_votes,
617 all_votes)
618 elif top_votes < all_votes - top_votes:
619 logging.critical("It seems we are not the master (%d votes for,"
620 " %d votes against)", top_votes, all_votes - top_votes)
621 else:
622 result = True
623
624 return result
625
641
644 """Initial checks whether to run or exit with a failure.
645
646 """
647 if args:
648 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
649 sys.exit(constants.EXIT_FAILURE)
650
651 ssconf.CheckMaster(options.debug)
652
653 try:
654 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
655 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
656 except KeyError:
657 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
658 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
659 sys.exit(constants.EXIT_FAILURE)
660
661
662 runtime.InitArchInfo()
663
664
665 try:
666 config.ConfigWriter()
667 except errors.ConfigVersionMismatch, err:
668 v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
669 v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
670 print >> sys.stderr, \
671 ("Configuration version mismatch. The current Ganeti software"
672 " expects version %s, but the on-disk configuration file has"
673 " version %s. This is likely the result of upgrading the"
674 " software without running the upgrade procedure. Please contact"
675 " your cluster administrator or complete the upgrade using the"
676 " cfgupgrade utility, after reading the upgrade notes." %
677 (v1, v2))
678 sys.exit(constants.EXIT_FAILURE)
679 except errors.ConfigurationError, err:
680 print >> sys.stderr, \
681 ("Configuration error while opening the configuration file: %s\n"
682 "This might be caused by an incomplete software upgrade or"
683 " by a corrupted configuration file. Until the problem is fixed"
684 " the master daemon cannot start." % str(err))
685 sys.exit(constants.EXIT_FAILURE)
686
687
688
689 if options.no_voting:
690 if not options.yes_do_it:
691 sys.stdout.write("The 'no voting' option has been selected.\n")
692 sys.stdout.write("This is dangerous, please confirm by"
693 " typing uppercase 'yes': ")
694 sys.stdout.flush()
695
696 confirmation = sys.stdin.readline().strip()
697 if confirmation != "YES":
698 print >> sys.stderr, "Aborting."
699 sys.exit(constants.EXIT_FAILURE)
700
701 else:
702
703
704
705 if not utils.RunInSeparateProcess(CheckAgreement):
706 sys.exit(constants.EXIT_FAILURE)
707
708
709
710
711
712 utils.RunInSeparateProcess(ActivateMasterIP)
713
726
729 """Main master daemon function, executed with the PID file held.
730
731 """
732 (mainloop, master) = prep_data
733 try:
734 rpc.Init()
735 try:
736 master.setup_queue()
737 try:
738 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
739 finally:
740 master.server_cleanup()
741 finally:
742 rpc.Shutdown()
743 finally:
744 utils.RemoveFile(pathutils.MASTER_SOCKET)
745
746 logging.info("Clean master daemon shutdown")
747
750 """Main function"""
751 parser = OptionParser(description="Ganeti master daemon",
752 usage="%prog [-f] [-d]",
753 version="%%prog (ganeti) %s" %
754 constants.RELEASE_VERSION)
755 parser.add_option("--no-voting", dest="no_voting",
756 help="Do not check that the nodes agree on this node"
757 " being the master and start the daemon unconditionally",
758 default=False, action="store_true")
759 parser.add_option("--yes-do-it", dest="yes_do_it",
760 help="Override interactive check for --no-voting",
761 default=False, action="store_true")
762 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
763 ExecMasterd, multithreaded=True)
764