1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60
61
62 CLIENT_REQUEST_WORKERS = 16
63
64 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
69
70 - def RunTask(self, server, message, client):
71 """Process the request.
72
73 """
74 client_ops = ClientOps(server)
75
76 try:
77 (method, args, version) = luxi.ParseRequest(message)
78 except luxi.ProtocolError, err:
79 logging.error("Protocol Error: %s", err)
80 client.close_log()
81 return
82
83 success = False
84 try:
85
86 if version is not None and version != constants.LUXI_VERSION:
87 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88 (constants.LUXI_VERSION, version))
89
90 result = client_ops.handle_request(method, args)
91 success = True
92 except errors.GenericError, err:
93 logging.exception("Unexpected exception")
94 success = False
95 result = errors.EncodeException(err)
96 except:
97 logging.exception("Unexpected exception")
98 err = sys.exc_info()
99 result = "Caught exception: %s" % str(err[1])
100
101 try:
102 reply = luxi.FormatResponse(success, result)
103 client.send_message(reply)
104
105 server.awaker.signal()
106 except:
107 logging.exception("Send error")
108 client.close_log()
109
112 """Handler for master peers.
113
114 """
115 _MAX_UNHANDLED = 1
116 - def __init__(self, server, connected_socket, client_address, family):
122
125
128 """Master Server.
129
130 This is the main asynchronous master server. It handles connections to the
131 master socket.
132
133 """
134 family = socket.AF_UNIX
135
136 - def __init__(self, mainloop, address, uid, gid):
137 """MasterServer constructor
138
139 @type mainloop: ganeti.daemon.Mainloop
140 @param mainloop: Mainloop used to poll for I/O events
141 @param address: the unix socket address to bind the MasterServer to
142 @param uid: The uid of the owner of the socket
143 @param gid: The gid of the owner of the socket
144
145 """
146 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
147 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
148 os.chmod(temp_name, 0770)
149 os.chown(temp_name, uid, gid)
150 os.rename(temp_name, address)
151
152 self.mainloop = mainloop
153 self.awaker = daemon.AsyncAwaker()
154
155
156 self.context = None
157 self.request_workers = None
158
163
169
171 """Cleanup the server.
172
173 This involves shutting down the processor threads and the master
174 socket.
175
176 """
177 try:
178 self.close()
179 finally:
180 if self.request_workers:
181 self.request_workers.TerminateWorkers()
182 if self.context:
183 self.context.jobqueue.Shutdown()
184
187 """Class holding high-level client operations."""
190
192 queue = self.server.context.jobqueue
193
194
195
196
197
198 if method == luxi.REQ_SUBMIT_JOB:
199 logging.info("Received new job")
200 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
201 return queue.SubmitJob(ops)
202
203 if method == luxi.REQ_SUBMIT_MANY_JOBS:
204 logging.info("Received multiple jobs")
205 jobs = []
206 for ops in args:
207 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
208 return queue.SubmitManyJobs(jobs)
209
210 elif method == luxi.REQ_CANCEL_JOB:
211 job_id = args
212 logging.info("Received job cancel request for %s", job_id)
213 return queue.CancelJob(job_id)
214
215 elif method == luxi.REQ_ARCHIVE_JOB:
216 job_id = args
217 logging.info("Received job archive request for %s", job_id)
218 return queue.ArchiveJob(job_id)
219
220 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
221 (age, timeout) = args
222 logging.info("Received job autoarchive request for age %s, timeout %s",
223 age, timeout)
224 return queue.AutoArchiveJobs(age, timeout)
225
226 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228 logging.info("Received job poll request for %s", job_id)
229 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230 prev_log_serial, timeout)
231
232 elif method == luxi.REQ_QUERY:
233 req = objects.QueryRequest.FromDict(args)
234
235 if req.what in constants.QR_OP_QUERY:
236 result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
237 filter=req.filter))
238 elif req.what == constants.QR_LOCK:
239 if req.filter is not None:
240 raise errors.OpPrereqError("Lock queries can't be filtered")
241 return self.server.context.glm.QueryLocks(req.fields)
242 elif req.what in constants.QR_OP_LUXI:
243 raise NotImplementedError
244 else:
245 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
246 errors.ECODE_INVAL)
247
248 return result
249
250 elif method == luxi.REQ_QUERY_FIELDS:
251 req = objects.QueryFieldsRequest.FromDict(args)
252
253 if req.what in constants.QR_OP_QUERY:
254 result = self._Query(opcodes.OpQueryFields(what=req.what,
255 fields=req.fields))
256 elif req.what == constants.QR_LOCK:
257 return query.QueryFields(query.LOCK_FIELDS, req.fields)
258 elif req.what in constants.QR_OP_LUXI:
259 raise NotImplementedError
260 else:
261 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
262 errors.ECODE_INVAL)
263
264 return result
265
266 elif method == luxi.REQ_QUERY_JOBS:
267 (job_ids, fields) = args
268 if isinstance(job_ids, (tuple, list)) and job_ids:
269 msg = utils.CommaJoin(job_ids)
270 else:
271 msg = str(job_ids)
272 logging.info("Received job query request for %s", msg)
273 return queue.QueryJobs(job_ids, fields)
274
275 elif method == luxi.REQ_QUERY_INSTANCES:
276 (names, fields, use_locking) = args
277 logging.info("Received instance query request for %s", names)
278 if use_locking:
279 raise errors.OpPrereqError("Sync queries are not allowed",
280 errors.ECODE_INVAL)
281 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
282 use_locking=use_locking)
283 return self._Query(op)
284
285 elif method == luxi.REQ_QUERY_NODES:
286 (names, fields, use_locking) = args
287 logging.info("Received node query request for %s", names)
288 if use_locking:
289 raise errors.OpPrereqError("Sync queries are not allowed",
290 errors.ECODE_INVAL)
291 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
292 use_locking=use_locking)
293 return self._Query(op)
294
295 elif method == luxi.REQ_QUERY_GROUPS:
296 (names, fields, use_locking) = args
297 logging.info("Received group query request for %s", names)
298 if use_locking:
299 raise errors.OpPrereqError("Sync queries are not allowed",
300 errors.ECODE_INVAL)
301 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
302 return self._Query(op)
303
304 elif method == luxi.REQ_QUERY_EXPORTS:
305 nodes, use_locking = args
306 if use_locking:
307 raise errors.OpPrereqError("Sync queries are not allowed",
308 errors.ECODE_INVAL)
309 logging.info("Received exports query request")
310 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
311 return self._Query(op)
312
313 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
314 fields = args
315 logging.info("Received config values query request for %s", fields)
316 op = opcodes.OpClusterConfigQuery(output_fields=fields)
317 return self._Query(op)
318
319 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
320 logging.info("Received cluster info query request")
321 op = opcodes.OpClusterQuery()
322 return self._Query(op)
323
324 elif method == luxi.REQ_QUERY_TAGS:
325 kind, name = args
326 logging.info("Received tags query request")
327 op = opcodes.OpTagsGet(kind=kind, name=name)
328 return self._Query(op)
329
330 elif method == luxi.REQ_QUERY_LOCKS:
331 (fields, sync) = args
332 logging.info("Received locks query request")
333 if sync:
334 raise NotImplementedError("Synchronous queries are not implemented")
335 return self.server.context.glm.OldStyleQueryLocks(fields)
336
337 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
338 drain_flag = args
339 logging.info("Received queue drain flag change request to %s",
340 drain_flag)
341 return queue.SetDrainFlag(drain_flag)
342
343 elif method == luxi.REQ_SET_WATCHER_PAUSE:
344 (until, ) = args
345
346 if until is None:
347 logging.info("Received request to no longer pause the watcher")
348 else:
349 if not isinstance(until, (int, float)):
350 raise TypeError("Duration must be an integer or float")
351
352 if until < time.time():
353 raise errors.GenericError("Unable to set pause end time in the past")
354
355 logging.info("Received request to pause the watcher until %s", until)
356
357 return _SetWatcherPause(until)
358
359 else:
360 logging.info("Received invalid request '%s'", method)
361 raise ValueError("Invalid operation '%s'" % method)
362
364 """Runs the specified opcode and returns the result.
365
366 """
367
368 proc = mcpu.Processor(self.server.context, None)
369
370
371
372 return proc.ExecOpCode(op, None)
373
374
375 -class GanetiContext(object):
376 """Context common to all ganeti threads.
377
378 This class creates and holds common objects shared by all threads.
379
380 """
381
382
383 _instance = None
384
385 - def __init__(self):
386 """Constructs a new GanetiContext object.
387
388 There should be only a GanetiContext object at any time, so this
389 function raises an error if this is not the case.
390
391 """
392 assert self.__class__._instance is None, "double GanetiContext instance"
393
394
395 self.cfg = config.ConfigWriter()
396
397
398 self.glm = locking.GanetiLockManager(
399 self.cfg.GetNodeList(),
400 self.cfg.GetNodeGroupList(),
401 self.cfg.GetInstanceList())
402
403
404 self.jobqueue = jqueue.JobQueue(self)
405
406
407 self.__class__._instance = self
408
409 - def __setattr__(self, name, value):
410 """Setting GanetiContext attributes is forbidden after initialization.
411
412 """
413 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
414 object.__setattr__(self, name, value)
415
416 - def AddNode(self, node, ec_id):
417 """Adds a node to the configuration and lock manager.
418
419 """
420
421 self.cfg.AddNode(node, ec_id)
422
423
424 self.jobqueue.AddNode(node)
425
426
427 self.glm.add(locking.LEVEL_NODE, node.name)
428
429 - def ReaddNode(self, node):
430 """Updates a node that's already in the configuration
431
432 """
433
434 self.jobqueue.AddNode(node)
435
436 - def RemoveNode(self, name):
437 """Removes a node from the configuration and lock manager.
438
439 """
440
441 self.cfg.RemoveNode(name)
442
443
444 self.jobqueue.RemoveNode(name)
445
446
447 self.glm.remove(locking.LEVEL_NODE, name)
448
464
468 """Check the agreement on who is the master.
469
470 The function uses a very simple algorithm: we must get more positive
471 than negative answers. Since in most of the cases we are the master,
472 we'll use our own config file for getting the node list. In the
473 future we could collect the current node list from our (possibly
474 obsolete) known nodes.
475
476 In order to account for cold-start of all nodes, we retry for up to
477 a minute until we get a real answer as the top-voted one. If the
478 nodes are more out-of-sync, for now manual startup of the master
479 should be attempted.
480
481 Note that for a even number of nodes cluster, we need at least half
482 of the nodes (beside ourselves) to vote for us. This creates a
483 problem on two-node clusters, since in this case we require the
484 other node to be up too to confirm our status.
485
486 """
487 myself = netutils.Hostname.GetSysName()
488
489 cfg = config.ConfigWriter()
490 node_list = cfg.GetNodeList()
491 del cfg
492 retries = 6
493 while retries > 0:
494 votes = bootstrap.GatherMasterVotes(node_list)
495 if not votes:
496
497 return True
498 if votes[0][0] is None:
499 retries -= 1
500 time.sleep(10)
501 continue
502 break
503 if retries == 0:
504 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
505 " after multiple retries. Aborting startup")
506 logging.critical("Use the --no-voting option if you understand what"
507 " effects it has on the cluster state")
508 return False
509
510 all_votes = sum(item[1] for item in votes)
511 top_node, top_votes = votes[0]
512
513 result = False
514 if top_node != myself:
515 logging.critical("It seems we are not the master (top-voted node"
516 " is %s with %d out of %d votes)", top_node, top_votes,
517 all_votes)
518 elif top_votes < all_votes - top_votes:
519 logging.critical("It seems we are not the master (%d votes for,"
520 " %d votes against)", top_votes, all_votes - top_votes)
521 else:
522 result = True
523
524 return result
525
535
538 """Initial checks whether to run or exit with a failure.
539
540 """
541 if args:
542 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
543 sys.exit(constants.EXIT_FAILURE)
544
545 ssconf.CheckMaster(options.debug)
546
547 try:
548 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
549 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
550 except KeyError:
551 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
552 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
553 sys.exit(constants.EXIT_FAILURE)
554
555
556 try:
557 config.ConfigWriter()
558 except errors.ConfigVersionMismatch, err:
559 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
560 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
561 print >> sys.stderr, \
562 ("Configuration version mismatch. The current Ganeti software"
563 " expects version %s, but the on-disk configuration file has"
564 " version %s. This is likely the result of upgrading the"
565 " software without running the upgrade procedure. Please contact"
566 " your cluster administrator or complete the upgrade using the"
567 " cfgupgrade utility, after reading the upgrade notes." %
568 (v1, v2))
569 sys.exit(constants.EXIT_FAILURE)
570 except errors.ConfigurationError, err:
571 print >> sys.stderr, \
572 ("Configuration error while opening the configuration file: %s\n"
573 "This might be caused by an incomplete software upgrade or"
574 " by a corrupted configuration file. Until the problem is fixed"
575 " the master daemon cannot start." % str(err))
576 sys.exit(constants.EXIT_FAILURE)
577
578
579
580 if options.no_voting:
581 if not options.yes_do_it:
582 sys.stdout.write("The 'no voting' option has been selected.\n")
583 sys.stdout.write("This is dangerous, please confirm by"
584 " typing uppercase 'yes': ")
585 sys.stdout.flush()
586
587 confirmation = sys.stdin.readline().strip()
588 if confirmation != "YES":
589 print >> sys.stderr, "Aborting."
590 sys.exit(constants.EXIT_FAILURE)
591
592 else:
593
594
595
596 if not utils.RunInSeparateProcess(CheckAgreement):
597 sys.exit(constants.EXIT_FAILURE)
598
599
600
601
602
603 utils.RunInSeparateProcess(ActivateMasterIP)
604
618
621 """Main master daemon function, executed with the PID file held.
622
623 """
624 (mainloop, master) = prep_data
625 try:
626 rpc.Init()
627 try:
628 master.setup_queue()
629 try:
630 mainloop.Run()
631 finally:
632 master.server_cleanup()
633 finally:
634 rpc.Shutdown()
635 finally:
636 utils.RemoveFile(constants.MASTER_SOCKET)
637
640 """Main function"""
641 parser = OptionParser(description="Ganeti master daemon",
642 usage="%prog [-f] [-d]",
643 version="%%prog (ganeti) %s" %
644 constants.RELEASE_VERSION)
645 parser.add_option("--no-voting", dest="no_voting",
646 help="Do not check that the nodes agree on this node"
647 " being the master and start the daemon unconditionally",
648 default=False, action="store_true")
649 parser.add_option("--yes-do-it", dest="yes_do_it",
650 help="Override interactive check for --no-voting",
651 default=False, action="store_true")
652 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
653 ExecMasterd, multithreaded=True)
654