1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
62 from ganeti import ht
63
64
65 CLIENT_REQUEST_WORKERS = 16
66
67 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
68 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
72 """Log information about a recently submitted job.
73
74 """
75 op_summary = utils.CommaJoin(op.Summary() for op in ops)
76
77 if status:
78 logging.info("New job with id %s, summary: %s", info, op_summary)
79 else:
80 logging.info("Failed to submit job, reason: '%s', summary: %s",
81 info, op_summary)
82
85
86 - def RunTask(self, server, message, client):
87 """Process the request.
88
89 """
90 client_ops = ClientOps(server)
91
92 try:
93 (method, args, version) = luxi.ParseRequest(message)
94 except luxi.ProtocolError, err:
95 logging.error("Protocol Error: %s", err)
96 client.close_log()
97 return
98
99 success = False
100 try:
101
102 if version is not None and version != constants.LUXI_VERSION:
103 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
104 (constants.LUXI_VERSION, version))
105
106 result = client_ops.handle_request(method, args)
107 success = True
108 except errors.GenericError, err:
109 logging.exception("Unexpected exception")
110 success = False
111 result = errors.EncodeException(err)
112 except:
113 logging.exception("Unexpected exception")
114 err = sys.exc_info()
115 result = "Caught exception: %s" % str(err[1])
116
117 try:
118 reply = luxi.FormatResponse(success, result)
119 client.send_message(reply)
120
121 server.awaker.signal()
122 except:
123 logging.exception("Send error")
124 client.close_log()
125
128 """Handler for master peers.
129
130 """
131 _MAX_UNHANDLED = 1
132
133 - def __init__(self, server, connected_socket, client_address, family):
139
142
145 """Logic for master daemon shutdown.
146
147 """
148
149 _CHECK_INTERVAL = 5.0
150
151
152
153 _SHUTDOWN_LINGER = 5.0
154
156 """Initializes this class.
157
158 """
159 self._had_active_jobs = None
160 self._linger_timeout = None
161
163 """Determines if master daemon is ready for shutdown.
164
165 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
166 @rtype: None or number
167 @return: None if master daemon is ready, timeout if the check must be
168 repeated
169
170 """
171 if jq_prepare_result:
172
173 logging.info("Job queue has been notified for shutdown but is still"
174 " busy; next check in %s seconds", self._CHECK_INTERVAL)
175 self._had_active_jobs = True
176 return self._CHECK_INTERVAL
177
178 if not self._had_active_jobs:
179
180 return None
181
182
183
184 if self._linger_timeout is None:
185 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
186
187 remaining = self._linger_timeout.Remaining()
188
189 logging.info("Job queue no longer busy; shutting down master daemon"
190 " in %s seconds", remaining)
191
192
193
194
195 if remaining < 0:
196 return None
197 else:
198 return remaining
199
202 """Master Server.
203
204 This is the main asynchronous master server. It handles connections to the
205 master socket.
206
207 """
208 family = socket.AF_UNIX
209
211 """MasterServer constructor
212
213 @param address: the unix socket address to bind the MasterServer to
214 @param uid: The uid of the owner of the socket
215 @param gid: The gid of the owner of the socket
216
217 """
218 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
219 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
220 os.chmod(temp_name, 0770)
221 os.chown(temp_name, uid, gid)
222 os.rename(temp_name, address)
223
224 self.awaker = daemon.AsyncAwaker()
225
226
227 self.context = None
228 self.request_workers = None
229
230 self._shutdown_check = None
231
236
242
244 """Prepares server for shutdown.
245
246 """
247 if self._shutdown_check is None:
248 self._shutdown_check = _MasterShutdownCheck()
249
250 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
251
253 """Cleanup the server.
254
255 This involves shutting down the processor threads and the master
256 socket.
257
258 """
259 try:
260 self.close()
261 finally:
262 if self.request_workers:
263 self.request_workers.TerminateWorkers()
264 if self.context:
265 self.context.jobqueue.Shutdown()
266
269 """Class holding high-level client operations."""
272
274 context = self.server.context
275 queue = context.jobqueue
276
277
278 if not isinstance(args, (tuple, list)):
279 logging.info("Received invalid arguments of type '%s'", type(args))
280 raise ValueError("Invalid arguments type '%s'" % type(args))
281
282 if method not in luxi.REQ_ALL:
283 logging.info("Received invalid request '%s'", method)
284 raise ValueError("Invalid operation '%s'" % method)
285
286
287
288 if method == luxi.REQ_SUBMIT_JOB:
289 logging.info("Receiving new job")
290 (job_def, ) = args
291 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
292 job_id = queue.SubmitJob(ops)
293 _LogNewJob(True, job_id, ops)
294 return job_id
295
296 elif method == luxi.REQ_SUBMIT_MANY_JOBS:
297 logging.info("Receiving multiple jobs")
298 (job_defs, ) = args
299 jobs = []
300 for ops in job_defs:
301 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
302 job_ids = queue.SubmitManyJobs(jobs)
303 for ((status, job_id), ops) in zip(job_ids, jobs):
304 _LogNewJob(status, job_id, ops)
305 return job_ids
306
307 elif method == luxi.REQ_CANCEL_JOB:
308 (job_id, ) = args
309 logging.info("Received job cancel request for %s", job_id)
310 return queue.CancelJob(job_id)
311
312 elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
313 (job_id, priority) = args
314 logging.info("Received request to change priority for job %s to %s",
315 job_id, priority)
316 return queue.ChangeJobPriority(job_id, priority)
317
318 elif method == luxi.REQ_ARCHIVE_JOB:
319 (job_id, ) = args
320 logging.info("Received job archive request for %s", job_id)
321 return queue.ArchiveJob(job_id)
322
323 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
324 (age, timeout) = args
325 logging.info("Received job autoarchive request for age %s, timeout %s",
326 age, timeout)
327 return queue.AutoArchiveJobs(age, timeout)
328
329 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
330 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
331 logging.info("Received job poll request for %s", job_id)
332 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
333 prev_log_serial, timeout)
334
335 elif method == luxi.REQ_QUERY:
336 (what, fields, qfilter) = args
337
338 if what in constants.QR_VIA_OP:
339 result = self._Query(opcodes.OpQuery(what=what, fields=fields,
340 qfilter=qfilter))
341 elif what == constants.QR_LOCK:
342 if qfilter is not None:
343 raise errors.OpPrereqError("Lock queries can't be filtered",
344 errors.ECODE_INVAL)
345 return context.glm.QueryLocks(fields)
346 elif what == constants.QR_JOB:
347 return queue.QueryJobs(fields, qfilter)
348 elif what in constants.QR_VIA_LUXI:
349 raise NotImplementedError
350 else:
351 raise errors.OpPrereqError("Resource type '%s' unknown" % what,
352 errors.ECODE_INVAL)
353
354 return result
355
356 elif method == luxi.REQ_QUERY_FIELDS:
357 (what, fields) = args
358 req = objects.QueryFieldsRequest(what=what, fields=fields)
359
360 try:
361 fielddefs = query.ALL_FIELDS[req.what]
362 except KeyError:
363 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
364 errors.ECODE_INVAL)
365
366 return query.QueryFields(fielddefs, req.fields)
367
368 elif method == luxi.REQ_QUERY_JOBS:
369 (job_ids, fields) = args
370 if isinstance(job_ids, (tuple, list)) and job_ids:
371 msg = utils.CommaJoin(job_ids)
372 else:
373 msg = str(job_ids)
374 logging.info("Received job query request for %s", msg)
375 return queue.OldStyleQueryJobs(job_ids, fields)
376
377 elif method == luxi.REQ_QUERY_INSTANCES:
378 (names, fields, use_locking) = args
379 logging.info("Received instance query request for %s", names)
380 if use_locking:
381 raise errors.OpPrereqError("Sync queries are not allowed",
382 errors.ECODE_INVAL)
383 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
384 use_locking=use_locking)
385 return self._Query(op)
386
387 elif method == luxi.REQ_QUERY_NODES:
388 (names, fields, use_locking) = args
389 logging.info("Received node query request for %s", names)
390 if use_locking:
391 raise errors.OpPrereqError("Sync queries are not allowed",
392 errors.ECODE_INVAL)
393 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
394 use_locking=use_locking)
395 return self._Query(op)
396
397 elif method == luxi.REQ_QUERY_GROUPS:
398 (names, fields, use_locking) = args
399 logging.info("Received group query request for %s", names)
400 if use_locking:
401 raise errors.OpPrereqError("Sync queries are not allowed",
402 errors.ECODE_INVAL)
403 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
404 return self._Query(op)
405
406 elif method == luxi.REQ_QUERY_NETWORKS:
407 (names, fields, use_locking) = args
408 logging.info("Received network query request for %s", names)
409 if use_locking:
410 raise errors.OpPrereqError("Sync queries are not allowed",
411 errors.ECODE_INVAL)
412 op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
413 return self._Query(op)
414
415 elif method == luxi.REQ_QUERY_EXPORTS:
416 (nodes, use_locking) = args
417 if use_locking:
418 raise errors.OpPrereqError("Sync queries are not allowed",
419 errors.ECODE_INVAL)
420 logging.info("Received exports query request")
421 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
422 return self._Query(op)
423
424 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
425 (fields, ) = args
426 logging.info("Received config values query request for %s", fields)
427 op = opcodes.OpClusterConfigQuery(output_fields=fields)
428 return self._Query(op)
429
430 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
431 logging.info("Received cluster info query request")
432 op = opcodes.OpClusterQuery()
433 return self._Query(op)
434
435 elif method == luxi.REQ_QUERY_TAGS:
436 (kind, name) = args
437 logging.info("Received tags query request")
438 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
439 return self._Query(op)
440
441 elif method == luxi.REQ_SET_DRAIN_FLAG:
442 (drain_flag, ) = args
443 logging.info("Received queue drain flag change request to %s",
444 drain_flag)
445 return queue.SetDrainFlag(drain_flag)
446
447 elif method == luxi.REQ_SET_WATCHER_PAUSE:
448 (until, ) = args
449
450 return _SetWatcherPause(context, until)
451
452 else:
453 logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
454 raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
455 " but not implemented" % method)
456
458 """Runs the specified opcode and returns the result.
459
460 """
461
462 proc = mcpu.Processor(self.server.context, None, enable_locks=False)
463
464
465
466 return proc.ExecOpCode(op, None)
467
468
469 -class GanetiContext(object):
470 """Context common to all ganeti threads.
471
472 This class creates and holds common objects shared by all threads.
473
474 """
475
476
477 _instance = None
478
479 - def __init__(self):
480 """Constructs a new GanetiContext object.
481
482 There should be only a GanetiContext object at any time, so this
483 function raises an error if this is not the case.
484
485 """
486 assert self.__class__._instance is None, "double GanetiContext instance"
487
488
489 self.cfg = config.ConfigWriter()
490
491
492 self.glm = locking.GanetiLockManager(
493 self.cfg.GetNodeList(),
494 self.cfg.GetNodeGroupList(),
495 self.cfg.GetInstanceList(),
496 self.cfg.GetNetworkList())
497
498 self.cfg.SetContext(self)
499
500
501 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
502
503
504 self.jobqueue = jqueue.JobQueue(self)
505
506
507 self.__class__._instance = self
508
509 - def __setattr__(self, name, value):
510 """Setting GanetiContext attributes is forbidden after initialization.
511
512 """
513 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
514 object.__setattr__(self, name, value)
515
516 - def AddNode(self, node, ec_id):
517 """Adds a node to the configuration and lock manager.
518
519 """
520
521 self.cfg.AddNode(node, ec_id)
522
523
524 self.jobqueue.AddNode(node)
525
526
527 self.glm.add(locking.LEVEL_NODE, node.name)
528 self.glm.add(locking.LEVEL_NODE_RES, node.name)
529
530 - def ReaddNode(self, node):
531 """Updates a node that's already in the configuration
532
533 """
534
535 self.jobqueue.AddNode(node)
536
537 - def RemoveNode(self, name):
538 """Removes a node from the configuration and lock manager.
539
540 """
541
542 self.cfg.RemoveNode(name)
543
544
545 self.jobqueue.RemoveNode(name)
546
547
548 self.glm.remove(locking.LEVEL_NODE, name)
549 self.glm.remove(locking.LEVEL_NODE_RES, name)
550
553 """Creates or removes the watcher pause file.
554
555 @type context: L{GanetiContext}
556 @param context: Global Ganeti context
557 @type until: None or int
558 @param until: Unix timestamp saying until when the watcher shouldn't run
559
560 """
561 node_names = context.cfg.GetNodeList()
562
563 if until is None:
564 logging.info("Received request to no longer pause watcher")
565 else:
566 if not ht.TNumber(until):
567 raise TypeError("Duration must be numeric")
568
569 if until < time.time():
570 raise errors.GenericError("Unable to set pause end time in the past")
571
572 logging.info("Received request to pause watcher until %s", until)
573
574 result = context.rpc.call_set_watcher_pause(node_names, until)
575
576 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
577 for (node_name, nres) in result.items()
578 if nres.fail_msg and not nres.offline)
579 if errmsg:
580 raise errors.OpExecError("Watcher pause was set where possible, but failed"
581 " on the following node(s): %s" % errmsg)
582
583 return until
584
588 """Check the agreement on who is the master.
589
590 The function uses a very simple algorithm: we must get more positive
591 than negative answers. Since in most of the cases we are the master,
592 we'll use our own config file for getting the node list. In the
593 future we could collect the current node list from our (possibly
594 obsolete) known nodes.
595
596 In order to account for cold-start of all nodes, we retry for up to
597 a minute until we get a real answer as the top-voted one. If the
598 nodes are more out-of-sync, for now manual startup of the master
599 should be attempted.
600
601 Note that for a even number of nodes cluster, we need at least half
602 of the nodes (beside ourselves) to vote for us. This creates a
603 problem on two-node clusters, since in this case we require the
604 other node to be up too to confirm our status.
605
606 """
607 myself = netutils.Hostname.GetSysName()
608
609 cfg = config.ConfigWriter()
610 node_list = cfg.GetNodeList()
611 del cfg
612 retries = 6
613 while retries > 0:
614 votes = bootstrap.GatherMasterVotes(node_list)
615 if not votes:
616
617 return True
618 if votes[0][0] is None:
619 retries -= 1
620 time.sleep(10)
621 continue
622 break
623 if retries == 0:
624 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
625 " after multiple retries. Aborting startup")
626 logging.critical("Use the --no-voting option if you understand what"
627 " effects it has on the cluster state")
628 return False
629
630 all_votes = sum(item[1] for item in votes)
631 top_node, top_votes = votes[0]
632
633 result = False
634 if top_node != myself:
635 logging.critical("It seems we are not the master (top-voted node"
636 " is %s with %d out of %d votes)", top_node, top_votes,
637 all_votes)
638 elif top_votes < all_votes - top_votes:
639 logging.critical("It seems we are not the master (%d votes for,"
640 " %d votes against)", top_votes, all_votes - top_votes)
641 else:
642 result = True
643
644 return result
645
660
663 """Initial checks whether to run or exit with a failure.
664
665 """
666 if args:
667 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
668 sys.exit(constants.EXIT_FAILURE)
669
670 ssconf.CheckMaster(options.debug)
671
672 try:
673 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
674 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
675 except KeyError:
676 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
677 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
678 sys.exit(constants.EXIT_FAILURE)
679
680
681 runtime.InitArchInfo()
682
683
684 try:
685 config.ConfigWriter()
686 except errors.ConfigVersionMismatch, err:
687 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
688 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
689 print >> sys.stderr, \
690 ("Configuration version mismatch. The current Ganeti software"
691 " expects version %s, but the on-disk configuration file has"
692 " version %s. This is likely the result of upgrading the"
693 " software without running the upgrade procedure. Please contact"
694 " your cluster administrator or complete the upgrade using the"
695 " cfgupgrade utility, after reading the upgrade notes." %
696 (v1, v2))
697 sys.exit(constants.EXIT_FAILURE)
698 except errors.ConfigurationError, err:
699 print >> sys.stderr, \
700 ("Configuration error while opening the configuration file: %s\n"
701 "This might be caused by an incomplete software upgrade or"
702 " by a corrupted configuration file. Until the problem is fixed"
703 " the master daemon cannot start." % str(err))
704 sys.exit(constants.EXIT_FAILURE)
705
706
707
708 if options.no_voting:
709 if not options.yes_do_it:
710 sys.stdout.write("The 'no voting' option has been selected.\n")
711 sys.stdout.write("This is dangerous, please confirm by"
712 " typing uppercase 'yes': ")
713 sys.stdout.flush()
714
715 confirmation = sys.stdin.readline().strip()
716 if confirmation != "YES":
717 print >> sys.stderr, "Aborting."
718 sys.exit(constants.EXIT_FAILURE)
719
720 else:
721
722
723
724 if not utils.RunInSeparateProcess(CheckAgreement):
725 sys.exit(constants.EXIT_FAILURE)
726
727
728
729
730
731 utils.RunInSeparateProcess(ActivateMasterIP)
732
745
748 """Main master daemon function, executed with the PID file held.
749
750 """
751 (mainloop, master) = prep_data
752 try:
753 rpc.Init()
754 try:
755 master.setup_queue()
756 try:
757 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
758 finally:
759 master.server_cleanup()
760 finally:
761 rpc.Shutdown()
762 finally:
763 utils.RemoveFile(pathutils.MASTER_SOCKET)
764
765 logging.info("Clean master daemon shutdown")
766
769 """Main function"""
770 parser = OptionParser(description="Ganeti master daemon",
771 usage="%prog [-f] [-d]",
772 version="%%prog (ganeti) %s" %
773 constants.RELEASE_VERSION)
774 parser.add_option("--no-voting", dest="no_voting",
775 help="Do not check that the nodes agree on this node"
776 " being the master and start the daemon unconditionally",
777 default=False, action="store_true")
778 parser.add_option("--yes-do-it", dest="yes_do_it",
779 help="Override interactive check for --no-voting",
780 default=False, action="store_true")
781 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
782 ExecMasterd, multithreaded=True)
783