Package ganeti :: Package server :: Module masterd
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.masterd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Master daemon program. 
 23   
 24  Some classes deviates from the standard style guide since the 
 25  inheritance from parent classes requires it. 
 26   
 27  """ 
 28   
 29  # pylint: disable=C0103 
 30  # C0103: Invalid name ganeti-masterd 
 31   
 32  import grp 
 33  import os 
 34  import pwd 
 35  import sys 
 36  import socket 
 37  import time 
 38  import tempfile 
 39  import logging 
 40   
 41  from optparse import OptionParser 
 42   
 43  from ganeti import config 
 44  from ganeti import constants 
 45  from ganeti import daemon 
 46  from ganeti import mcpu 
 47  from ganeti import opcodes 
 48  from ganeti import jqueue 
 49  from ganeti import locking 
 50  from ganeti import luxi 
 51  from ganeti import utils 
 52  from ganeti import errors 
 53  from ganeti import ssconf 
 54  from ganeti import workerpool 
 55  from ganeti import rpc 
 56  from ganeti import bootstrap 
 57  from ganeti import netutils 
 58  from ganeti import objects 
 59  from ganeti import query 
 60  from ganeti import runtime 
 61  from ganeti import pathutils 
 62  from ganeti import ht 
 63   
 64   
 65  CLIENT_REQUEST_WORKERS = 16 
 66   
 67  EXIT_NOTMASTER = constants.EXIT_NOTMASTER 
 68  EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR 
69 70 71 -def _LogNewJob(status, info, ops):
72 """Log information about a recently submitted job. 73 74 """ 75 op_summary = utils.CommaJoin(op.Summary() for op in ops) 76 77 if status: 78 logging.info("New job with id %s, summary: %s", info, op_summary) 79 else: 80 logging.info("Failed to submit job, reason: '%s', summary: %s", 81 info, op_summary)
82
83 84 -class ClientRequestWorker(workerpool.BaseWorker):
85 # pylint: disable=W0221
86 - def RunTask(self, server, message, client):
87 """Process the request. 88 89 """ 90 client_ops = ClientOps(server) 91 92 try: 93 (method, args, version) = luxi.ParseRequest(message) 94 except luxi.ProtocolError, err: 95 logging.error("Protocol Error: %s", err) 96 client.close_log() 97 return 98 99 success = False 100 try: 101 # Verify client's version if there was one in the request 102 if version is not None and version != constants.LUXI_VERSION: 103 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % 104 (constants.LUXI_VERSION, version)) 105 106 result = client_ops.handle_request(method, args) 107 success = True 108 except errors.GenericError, err: 109 logging.exception("Unexpected exception") 110 success = False 111 result = errors.EncodeException(err) 112 except: 113 logging.exception("Unexpected exception") 114 err = sys.exc_info() 115 result = "Caught exception: %s" % str(err[1]) 116 117 try: 118 reply = luxi.FormatResponse(success, result) 119 client.send_message(reply) 120 # awake the main thread so that it can write out the data. 121 server.awaker.signal() 122 except: # pylint: disable=W0702 123 logging.exception("Send error") 124 client.close_log()
125
126 127 -class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
128 """Handler for master peers. 129 130 """ 131 _MAX_UNHANDLED = 1 132
133 - def __init__(self, server, connected_socket, client_address, family):
134 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, 135 client_address, 136 constants.LUXI_EOM, 137 family, self._MAX_UNHANDLED) 138 self.server = server
139
140 - def handle_message(self, message, _):
141 self.server.request_workers.AddTask((self.server, message, self))
142
143 144 -class _MasterShutdownCheck:
145 """Logic for master daemon shutdown. 146 147 """ 148 #: How long to wait between checks 149 _CHECK_INTERVAL = 5.0 150 151 #: How long to wait after all jobs are done (e.g. to give clients time to 152 #: retrieve the job status) 153 _SHUTDOWN_LINGER = 5.0 154
155 - def __init__(self):
156 """Initializes this class. 157 158 """ 159 self._had_active_jobs = None 160 self._linger_timeout = None
161
162 - def __call__(self, jq_prepare_result):
163 """Determines if master daemon is ready for shutdown. 164 165 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} 166 @rtype: None or number 167 @return: None if master daemon is ready, timeout if the check must be 168 repeated 169 170 """ 171 if jq_prepare_result: 172 # Check again shortly 173 logging.info("Job queue has been notified for shutdown but is still" 174 " busy; next check in %s seconds", self._CHECK_INTERVAL) 175 self._had_active_jobs = True 176 return self._CHECK_INTERVAL 177 178 if not self._had_active_jobs: 179 # Can shut down as there were no active jobs on the first check 180 return None 181 182 # No jobs are running anymore, but maybe some clients want to collect some 183 # information. Give them a short amount of time. 184 if self._linger_timeout is None: 185 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) 186 187 remaining = self._linger_timeout.Remaining() 188 189 logging.info("Job queue no longer busy; shutting down master daemon" 190 " in %s seconds", remaining) 191 192 # TODO: Should the master daemon socket be closed at this point? Doing so 193 # wouldn't affect existing connections. 194 195 if remaining < 0: 196 return None 197 else: 198 return remaining
199
200 201 -class MasterServer(daemon.AsyncStreamServer):
202 """Master Server. 203 204 This is the main asynchronous master server. It handles connections to the 205 master socket. 206 207 """ 208 family = socket.AF_UNIX 209
210 - def __init__(self, address, uid, gid):
211 """MasterServer constructor 212 213 @param address: the unix socket address to bind the MasterServer to 214 @param uid: The uid of the owner of the socket 215 @param gid: The gid of the owner of the socket 216 217 """ 218 temp_name = tempfile.mktemp(dir=os.path.dirname(address)) 219 daemon.AsyncStreamServer.__init__(self, self.family, temp_name) 220 os.chmod(temp_name, 0770) 221 os.chown(temp_name, uid, gid) 222 os.rename(temp_name, address) 223 224 self.awaker = daemon.AsyncAwaker() 225 226 # We'll only start threads once we've forked. 227 self.context = None 228 self.request_workers = None 229 230 self._shutdown_check = None
231
232 - def handle_connection(self, connected_socket, client_address):
233 # TODO: add connection count and limit the number of open connections to a 234 # maximum number to avoid breaking for lack of file descriptors or memory. 235 MasterClientHandler(self, connected_socket, client_address, self.family)
236
237 - def setup_queue(self):
238 self.context = GanetiContext() 239 self.request_workers = workerpool.WorkerPool("ClientReq", 240 CLIENT_REQUEST_WORKERS, 241 ClientRequestWorker)
242
243 - def WaitForShutdown(self):
244 """Prepares server for shutdown. 245 246 """ 247 if self._shutdown_check is None: 248 self._shutdown_check = _MasterShutdownCheck() 249 250 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
251
252 - def server_cleanup(self):
253 """Cleanup the server. 254 255 This involves shutting down the processor threads and the master 256 socket. 257 258 """ 259 try: 260 self.close() 261 finally: 262 if self.request_workers: 263 self.request_workers.TerminateWorkers() 264 if self.context: 265 self.context.jobqueue.Shutdown()
266
267 268 -class ClientOps:
269 """Class holding high-level client operations."""
270 - def __init__(self, server):
271 self.server = server
272
273 - def handle_request(self, method, args): # pylint: disable=R0911
274 context = self.server.context 275 queue = context.jobqueue 276 277 # TODO: Parameter validation 278 if not isinstance(args, (tuple, list)): 279 logging.info("Received invalid arguments of type '%s'", type(args)) 280 raise ValueError("Invalid arguments type '%s'" % type(args)) 281 282 if method not in luxi.REQ_ALL: 283 logging.info("Received invalid request '%s'", method) 284 raise ValueError("Invalid operation '%s'" % method) 285 286 # TODO: Rewrite to not exit in each 'if/elif' branch 287 288 if method == luxi.REQ_SUBMIT_JOB: 289 logging.info("Receiving new job") 290 (job_def, ) = args 291 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] 292 job_id = queue.SubmitJob(ops) 293 _LogNewJob(True, job_id, ops) 294 return job_id 295 296 elif method == luxi.REQ_SUBMIT_MANY_JOBS: 297 logging.info("Receiving multiple jobs") 298 (job_defs, ) = args 299 jobs = [] 300 for ops in job_defs: 301 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) 302 job_ids = queue.SubmitManyJobs(jobs) 303 for ((status, job_id), ops) in zip(job_ids, jobs): 304 _LogNewJob(status, job_id, ops) 305 return job_ids 306 307 elif method == luxi.REQ_CANCEL_JOB: 308 (job_id, ) = args 309 logging.info("Received job cancel request for %s", job_id) 310 return queue.CancelJob(job_id) 311 312 elif method == luxi.REQ_CHANGE_JOB_PRIORITY: 313 (job_id, priority) = args 314 logging.info("Received request to change priority for job %s to %s", 315 job_id, priority) 316 return queue.ChangeJobPriority(job_id, priority) 317 318 elif method == luxi.REQ_ARCHIVE_JOB: 319 (job_id, ) = args 320 logging.info("Received job archive request for %s", job_id) 321 return queue.ArchiveJob(job_id) 322 323 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS: 324 (age, timeout) = args 325 logging.info("Received job autoarchive request for age %s, timeout %s", 326 age, timeout) 327 return queue.AutoArchiveJobs(age, timeout) 328 329 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: 330 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args 331 logging.info("Received job poll request for %s", job_id) 332 return queue.WaitForJobChanges(job_id, fields, prev_job_info, 333 prev_log_serial, timeout) 334 335 elif method == luxi.REQ_QUERY: 336 (what, fields, qfilter) = args 337 338 if what in constants.QR_VIA_OP: 339 result = self._Query(opcodes.OpQuery(what=what, fields=fields, 340 qfilter=qfilter)) 341 elif what == constants.QR_LOCK: 342 if qfilter is not None: 343 raise errors.OpPrereqError("Lock queries can't be filtered", 344 errors.ECODE_INVAL) 345 return context.glm.QueryLocks(fields) 346 elif what == constants.QR_JOB: 347 return queue.QueryJobs(fields, qfilter) 348 elif what in constants.QR_VIA_LUXI: 349 raise NotImplementedError 350 else: 351 raise errors.OpPrereqError("Resource type '%s' unknown" % what, 352 errors.ECODE_INVAL) 353 354 return result 355 356 elif method == luxi.REQ_QUERY_FIELDS: 357 (what, fields) = args 358 req = objects.QueryFieldsRequest(what=what, fields=fields) 359 360 try: 361 fielddefs = query.ALL_FIELDS[req.what] 362 except KeyError: 363 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, 364 errors.ECODE_INVAL) 365 366 return query.QueryFields(fielddefs, req.fields) 367 368 elif method == luxi.REQ_QUERY_JOBS: 369 (job_ids, fields) = args 370 if isinstance(job_ids, (tuple, list)) and job_ids: 371 msg = utils.CommaJoin(job_ids) 372 else: 373 msg = str(job_ids) 374 logging.info("Received job query request for %s", msg) 375 return queue.OldStyleQueryJobs(job_ids, fields) 376 377 elif method == luxi.REQ_QUERY_INSTANCES: 378 (names, fields, use_locking) = args 379 logging.info("Received instance query request for %s", names) 380 if use_locking: 381 raise errors.OpPrereqError("Sync queries are not allowed", 382 errors.ECODE_INVAL) 383 op = opcodes.OpInstanceQuery(names=names, output_fields=fields, 384 use_locking=use_locking) 385 return self._Query(op) 386 387 elif method == luxi.REQ_QUERY_NODES: 388 (names, fields, use_locking) = args 389 logging.info("Received node query request for %s", names) 390 if use_locking: 391 raise errors.OpPrereqError("Sync queries are not allowed", 392 errors.ECODE_INVAL) 393 op = opcodes.OpNodeQuery(names=names, output_fields=fields, 394 use_locking=use_locking) 395 return self._Query(op) 396 397 elif method == luxi.REQ_QUERY_GROUPS: 398 (names, fields, use_locking) = args 399 logging.info("Received group query request for %s", names) 400 if use_locking: 401 raise errors.OpPrereqError("Sync queries are not allowed", 402 errors.ECODE_INVAL) 403 op = opcodes.OpGroupQuery(names=names, output_fields=fields) 404 return self._Query(op) 405 406 elif method == luxi.REQ_QUERY_NETWORKS: 407 (names, fields, use_locking) = args 408 logging.info("Received network query request for %s", names) 409 if use_locking: 410 raise errors.OpPrereqError("Sync queries are not allowed", 411 errors.ECODE_INVAL) 412 op = opcodes.OpNetworkQuery(names=names, output_fields=fields) 413 return self._Query(op) 414 415 elif method == luxi.REQ_QUERY_EXPORTS: 416 (nodes, use_locking) = args 417 if use_locking: 418 raise errors.OpPrereqError("Sync queries are not allowed", 419 errors.ECODE_INVAL) 420 logging.info("Received exports query request") 421 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking) 422 return self._Query(op) 423 424 elif method == luxi.REQ_QUERY_CONFIG_VALUES: 425 (fields, ) = args 426 logging.info("Received config values query request for %s", fields) 427 op = opcodes.OpClusterConfigQuery(output_fields=fields) 428 return self._Query(op) 429 430 elif method == luxi.REQ_QUERY_CLUSTER_INFO: 431 logging.info("Received cluster info query request") 432 op = opcodes.OpClusterQuery() 433 return self._Query(op) 434 435 elif method == luxi.REQ_QUERY_TAGS: 436 (kind, name) = args 437 logging.info("Received tags query request") 438 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False) 439 return self._Query(op) 440 441 elif method == luxi.REQ_SET_DRAIN_FLAG: 442 (drain_flag, ) = args 443 logging.info("Received queue drain flag change request to %s", 444 drain_flag) 445 return queue.SetDrainFlag(drain_flag) 446 447 elif method == luxi.REQ_SET_WATCHER_PAUSE: 448 (until, ) = args 449 450 return _SetWatcherPause(context, until) 451 452 else: 453 logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method) 454 raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL," 455 " but not implemented" % method)
456
457 - def _Query(self, op):
458 """Runs the specified opcode and returns the result. 459 460 """ 461 # Queries don't have a job id 462 proc = mcpu.Processor(self.server.context, None, enable_locks=False) 463 464 # TODO: Executing an opcode using locks will acquire them in blocking mode. 465 # Consider using a timeout for retries. 466 return proc.ExecOpCode(op, None)
467
468 469 -class GanetiContext(object):
470 """Context common to all ganeti threads. 471 472 This class creates and holds common objects shared by all threads. 473 474 """ 475 # pylint: disable=W0212 476 # we do want to ensure a singleton here 477 _instance = None 478
479 - def __init__(self):
480 """Constructs a new GanetiContext object. 481 482 There should be only a GanetiContext object at any time, so this 483 function raises an error if this is not the case. 484 485 """ 486 assert self.__class__._instance is None, "double GanetiContext instance" 487 488 # Create global configuration object 489 self.cfg = config.ConfigWriter() 490 491 # Locking manager 492 self.glm = locking.GanetiLockManager( 493 self.cfg.GetNodeList(), 494 self.cfg.GetNodeGroupList(), 495 [inst.name for inst in self.cfg.GetAllInstancesInfo().values()], 496 self.cfg.GetNetworkList()) 497 498 self.cfg.SetContext(self) 499 500 # RPC runner 501 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor) 502 503 # Job queue 504 self.jobqueue = jqueue.JobQueue(self) 505 506 # setting this also locks the class against attribute modifications 507 self.__class__._instance = self
508
509 - def __setattr__(self, name, value):
510 """Setting GanetiContext attributes is forbidden after initialization. 511 512 """ 513 assert self.__class__._instance is None, "Attempt to modify Ganeti Context" 514 object.__setattr__(self, name, value)
515
516 - def AddNode(self, node, ec_id):
517 """Adds a node to the configuration and lock manager. 518 519 """ 520 # Add it to the configuration 521 self.cfg.AddNode(node, ec_id) 522 523 # If preseeding fails it'll not be added 524 self.jobqueue.AddNode(node) 525 526 # Add the new node to the Ganeti Lock Manager 527 self.glm.add(locking.LEVEL_NODE, node.uuid) 528 self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
529
530 - def ReaddNode(self, node):
531 """Updates a node that's already in the configuration 532 533 """ 534 # Synchronize the queue again 535 self.jobqueue.AddNode(node)
536
537 - def RemoveNode(self, node):
538 """Removes a node from the configuration and lock manager. 539 540 """ 541 # Remove node from configuration 542 self.cfg.RemoveNode(node.uuid) 543 544 # Notify job queue 545 self.jobqueue.RemoveNode(node.name) 546 547 # Remove the node from the Ganeti Lock Manager 548 self.glm.remove(locking.LEVEL_NODE, node.uuid) 549 self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
550
551 552 -def _SetWatcherPause(context, until):
553 """Creates or removes the watcher pause file. 554 555 @type context: L{GanetiContext} 556 @param context: Global Ganeti context 557 @type until: None or int 558 @param until: Unix timestamp saying until when the watcher shouldn't run 559 560 """ 561 node_names = context.cfg.GetNodeList() 562 563 if until is None: 564 logging.info("Received request to no longer pause watcher") 565 else: 566 if not ht.TNumber(until): 567 raise TypeError("Duration must be numeric") 568 569 if until < time.time(): 570 raise errors.GenericError("Unable to set pause end time in the past") 571 572 logging.info("Received request to pause watcher until %s", until) 573 574 result = context.rpc.call_set_watcher_pause(node_names, until) 575 576 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg) 577 for (node_name, nres) in result.items() 578 if nres.fail_msg and not nres.offline) 579 if errmsg: 580 raise errors.OpExecError("Watcher pause was set where possible, but failed" 581 " on the following node(s): %s" % errmsg) 582 583 return until
584
585 586 @rpc.RunWithRPC 587 -def CheckAgreement():
588 """Check the agreement on who is the master. 589 590 The function uses a very simple algorithm: we must get more positive 591 than negative answers. Since in most of the cases we are the master, 592 we'll use our own config file for getting the node list. In the 593 future we could collect the current node list from our (possibly 594 obsolete) known nodes. 595 596 In order to account for cold-start of all nodes, we retry for up to 597 a minute until we get a real answer as the top-voted one. If the 598 nodes are more out-of-sync, for now manual startup of the master 599 should be attempted. 600 601 Note that for a even number of nodes cluster, we need at least half 602 of the nodes (beside ourselves) to vote for us. This creates a 603 problem on two-node clusters, since in this case we require the 604 other node to be up too to confirm our status. 605 606 """ 607 myself = netutils.Hostname.GetSysName() 608 #temp instantiation of a config writer, used only to get the node list 609 cfg = config.ConfigWriter() 610 node_names = cfg.GetNodeNames(cfg.GetNodeList()) 611 del cfg 612 retries = 6 613 while retries > 0: 614 votes = bootstrap.GatherMasterVotes(node_names) 615 if not votes: 616 # empty node list, this is a one node cluster 617 return True 618 if votes[0][0] is None: 619 retries -= 1 620 time.sleep(10) 621 continue 622 break 623 if retries == 0: 624 logging.critical("Cluster inconsistent, most of the nodes didn't answer" 625 " after multiple retries. Aborting startup") 626 logging.critical("Use the --no-voting option if you understand what" 627 " effects it has on the cluster state") 628 return False 629 # here a real node is at the top of the list 630 all_votes = sum(item[1] for item in votes) 631 top_node, top_votes = votes[0] 632 633 result = False 634 if top_node != myself: 635 logging.critical("It seems we are not the master (top-voted node" 636 " is %s with %d out of %d votes)", top_node, top_votes, 637 all_votes) 638 elif top_votes < all_votes - top_votes: 639 logging.critical("It seems we are not the master (%d votes for," 640 " %d votes against)", top_votes, all_votes - top_votes) 641 else: 642 result = True 643 644 return result
645
646 647 @rpc.RunWithRPC 648 -def ActivateMasterIP():
649 # activate ip 650 cfg = config.ConfigWriter() 651 master_params = cfg.GetMasterNetworkParameters() 652 ems = cfg.GetUseExternalMipScript() 653 runner = rpc.BootstrapRunner() 654 # we use the node name, as the configuration is only available here yet 655 result = runner.call_node_activate_master_ip( 656 cfg.GetNodeName(master_params.uuid), master_params, ems) 657 658 msg = result.fail_msg 659 if msg: 660 logging.error("Can't activate master IP address: %s", msg)
661
662 663 -def CheckMasterd(options, args):
664 """Initial checks whether to run or exit with a failure. 665 666 """ 667 if args: # masterd doesn't take any arguments 668 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0]) 669 sys.exit(constants.EXIT_FAILURE) 670 671 ssconf.CheckMaster(options.debug) 672 673 try: 674 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid 675 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid 676 except KeyError: 677 print >> sys.stderr, ("User or group not existing on system: %s:%s" % 678 (constants.MASTERD_USER, constants.DAEMONS_GROUP)) 679 sys.exit(constants.EXIT_FAILURE) 680 681 # Determine static runtime architecture information 682 runtime.InitArchInfo() 683 684 # Check the configuration is sane before anything else 685 try: 686 config.ConfigWriter() 687 except errors.ConfigVersionMismatch, err: 688 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0]) 689 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1]) 690 print >> sys.stderr, \ 691 ("Configuration version mismatch. The current Ganeti software" 692 " expects version %s, but the on-disk configuration file has" 693 " version %s. This is likely the result of upgrading the" 694 " software without running the upgrade procedure. Please contact" 695 " your cluster administrator or complete the upgrade using the" 696 " cfgupgrade utility, after reading the upgrade notes." % 697 (v1, v2)) 698 sys.exit(constants.EXIT_FAILURE) 699 except errors.ConfigurationError, err: 700 print >> sys.stderr, \ 701 ("Configuration error while opening the configuration file: %s\n" 702 "This might be caused by an incomplete software upgrade or" 703 " by a corrupted configuration file. Until the problem is fixed" 704 " the master daemon cannot start." % str(err)) 705 sys.exit(constants.EXIT_FAILURE) 706 707 # If CheckMaster didn't fail we believe we are the master, but we have to 708 # confirm with the other nodes. 709 if options.no_voting: 710 if not options.yes_do_it: 711 sys.stdout.write("The 'no voting' option has been selected.\n") 712 sys.stdout.write("This is dangerous, please confirm by" 713 " typing uppercase 'yes': ") 714 sys.stdout.flush() 715 716 confirmation = sys.stdin.readline().strip() 717 if confirmation != "YES": 718 print >> sys.stderr, "Aborting." 719 sys.exit(constants.EXIT_FAILURE) 720 721 else: 722 # CheckAgreement uses RPC and threads, hence it needs to be run in 723 # a separate process before we call utils.Daemonize in the current 724 # process. 725 if not utils.RunInSeparateProcess(CheckAgreement): 726 sys.exit(constants.EXIT_FAILURE) 727 728 # ActivateMasterIP also uses RPC/threads, so we run it again via a 729 # separate process. 730 731 # TODO: decide whether failure to activate the master IP is a fatal error 732 utils.RunInSeparateProcess(ActivateMasterIP)
733
734 735 -def PrepMasterd(options, _):
736 """Prep master daemon function, executed with the PID file held. 737 738 """ 739 # This is safe to do as the pid file guarantees against 740 # concurrent execution. 741 utils.RemoveFile(pathutils.MASTER_SOCKET) 742 743 mainloop = daemon.Mainloop() 744 master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid) 745 return (mainloop, master)
746
747 748 -def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
749 """Main master daemon function, executed with the PID file held. 750 751 """ 752 (mainloop, master) = prep_data 753 try: 754 rpc.Init() 755 try: 756 master.setup_queue() 757 try: 758 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown) 759 finally: 760 master.server_cleanup() 761 finally: 762 rpc.Shutdown() 763 finally: 764 utils.RemoveFile(pathutils.MASTER_SOCKET) 765 766 logging.info("Clean master daemon shutdown") 767
768 769 -def Main():
770 """Main function""" 771 parser = OptionParser(description="Ganeti master daemon", 772 usage="%prog [-f] [-d]", 773 version="%%prog (ganeti) %s" % 774 constants.RELEASE_VERSION) 775 parser.add_option("--no-voting", dest="no_voting", 776 help="Do not check that the nodes agree on this node" 777 " being the master and start the daemon unconditionally", 778 default=False, action="store_true") 779 parser.add_option("--yes-do-it", dest="yes_do_it", 780 help="Override interactive check for --no-voting", 781 default=False, action="store_true") 782 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd, 783 ExecMasterd, multithreaded=True)
784