Package ganeti :: Package server :: Module masterd
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.masterd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Master daemon program. 
 23   
 24  Some classes deviates from the standard style guide since the 
 25  inheritance from parent classes requires it. 
 26   
 27  """ 
 28   
 29  # pylint: disable=C0103 
 30  # C0103: Invalid name ganeti-masterd 
 31   
 32  import grp 
 33  import os 
 34  import pwd 
 35  import sys 
 36  import socket 
 37  import time 
 38  import tempfile 
 39  import logging 
 40   
 41  from optparse import OptionParser 
 42   
 43  from ganeti import config 
 44  from ganeti import constants 
 45  from ganeti import daemon 
 46  from ganeti import mcpu 
 47  from ganeti import opcodes 
 48  from ganeti import jqueue 
 49  from ganeti import locking 
 50  from ganeti import luxi 
 51  from ganeti import utils 
 52  from ganeti import errors 
 53  from ganeti import ssconf 
 54  from ganeti import workerpool 
 55  from ganeti import rpc 
 56  from ganeti import bootstrap 
 57  from ganeti import netutils 
 58  from ganeti import objects 
 59  from ganeti import query 
 60  from ganeti import runtime 
 61   
 62   
 63  CLIENT_REQUEST_WORKERS = 16 
 64   
 65  EXIT_NOTMASTER = constants.EXIT_NOTMASTER 
 66  EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR 
67 68 69 -def _LogNewJob(status, info, ops):
70 """Log information about a recently submitted job. 71 72 """ 73 if status: 74 logging.info("New job with id %s, summary: %s", 75 info, utils.CommaJoin(op.Summary() for op in ops)) 76 else: 77 logging.info("Failed to submit job, reason: '%s', summary: %s", 78 info, utils.CommaJoin(op.Summary() for op in ops))
79
80 81 -class ClientRequestWorker(workerpool.BaseWorker):
82 # pylint: disable=W0221
83 - def RunTask(self, server, message, client):
84 """Process the request. 85 86 """ 87 client_ops = ClientOps(server) 88 89 try: 90 (method, args, version) = luxi.ParseRequest(message) 91 except luxi.ProtocolError, err: 92 logging.error("Protocol Error: %s", err) 93 client.close_log() 94 return 95 96 success = False 97 try: 98 # Verify client's version if there was one in the request 99 if version is not None and version != constants.LUXI_VERSION: 100 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % 101 (constants.LUXI_VERSION, version)) 102 103 result = client_ops.handle_request(method, args) 104 success = True 105 except errors.GenericError, err: 106 logging.exception("Unexpected exception") 107 success = False 108 result = errors.EncodeException(err) 109 except: 110 logging.exception("Unexpected exception") 111 err = sys.exc_info() 112 result = "Caught exception: %s" % str(err[1]) 113 114 try: 115 reply = luxi.FormatResponse(success, result) 116 client.send_message(reply) 117 # awake the main thread so that it can write out the data. 118 server.awaker.signal() 119 except: # pylint: disable=W0702 120 logging.exception("Send error") 121 client.close_log()
122
123 124 -class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
125 """Handler for master peers. 126 127 """ 128 _MAX_UNHANDLED = 1 129
130 - def __init__(self, server, connected_socket, client_address, family):
131 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, 132 client_address, 133 constants.LUXI_EOM, 134 family, self._MAX_UNHANDLED) 135 self.server = server
136
137 - def handle_message(self, message, _):
138 self.server.request_workers.AddTask((self.server, message, self))
139
140 141 -class _MasterShutdownCheck:
142 """Logic for master daemon shutdown. 143 144 """ 145 #: How long to wait between checks 146 _CHECK_INTERVAL = 5.0 147 148 #: How long to wait after all jobs are done (e.g. to give clients time to 149 #: retrieve the job status) 150 _SHUTDOWN_LINGER = 5.0 151
152 - def __init__(self):
153 """Initializes this class. 154 155 """ 156 self._had_active_jobs = None 157 self._linger_timeout = None
158
159 - def __call__(self, jq_prepare_result):
160 """Determines if master daemon is ready for shutdown. 161 162 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} 163 @rtype: None or number 164 @return: None if master daemon is ready, timeout if the check must be 165 repeated 166 167 """ 168 if jq_prepare_result: 169 # Check again shortly 170 logging.info("Job queue has been notified for shutdown but is still" 171 " busy; next check in %s seconds", self._CHECK_INTERVAL) 172 self._had_active_jobs = True 173 return self._CHECK_INTERVAL 174 175 if not self._had_active_jobs: 176 # Can shut down as there were no active jobs on the first check 177 return None 178 179 # No jobs are running anymore, but maybe some clients want to collect some 180 # information. Give them a short amount of time. 181 if self._linger_timeout is None: 182 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) 183 184 remaining = self._linger_timeout.Remaining() 185 186 logging.info("Job queue no longer busy; shutting down master daemon" 187 " in %s seconds", remaining) 188 189 # TODO: Should the master daemon socket be closed at this point? Doing so 190 # wouldn't affect existing connections. 191 192 if remaining < 0: 193 return None 194 else: 195 return remaining
196
197 198 -class MasterServer(daemon.AsyncStreamServer):
199 """Master Server. 200 201 This is the main asynchronous master server. It handles connections to the 202 master socket. 203 204 """ 205 family = socket.AF_UNIX 206
207 - def __init__(self, address, uid, gid):
208 """MasterServer constructor 209 210 @param address: the unix socket address to bind the MasterServer to 211 @param uid: The uid of the owner of the socket 212 @param gid: The gid of the owner of the socket 213 214 """ 215 temp_name = tempfile.mktemp(dir=os.path.dirname(address)) 216 daemon.AsyncStreamServer.__init__(self, self.family, temp_name) 217 os.chmod(temp_name, 0770) 218 os.chown(temp_name, uid, gid) 219 os.rename(temp_name, address) 220 221 self.awaker = daemon.AsyncAwaker() 222 223 # We'll only start threads once we've forked. 224 self.context = None 225 self.request_workers = None 226 227 self._shutdown_check = None
228
229 - def handle_connection(self, connected_socket, client_address):
230 # TODO: add connection count and limit the number of open connections to a 231 # maximum number to avoid breaking for lack of file descriptors or memory. 232 MasterClientHandler(self, connected_socket, client_address, self.family)
233
234 - def setup_queue(self):
235 self.context = GanetiContext() 236 self.request_workers = workerpool.WorkerPool("ClientReq", 237 CLIENT_REQUEST_WORKERS, 238 ClientRequestWorker)
239
240 - def WaitForShutdown(self):
241 """Prepares server for shutdown. 242 243 """ 244 if self._shutdown_check is None: 245 self._shutdown_check = _MasterShutdownCheck() 246 247 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
248
249 - def server_cleanup(self):
250 """Cleanup the server. 251 252 This involves shutting down the processor threads and the master 253 socket. 254 255 """ 256 try: 257 self.close() 258 finally: 259 if self.request_workers: 260 self.request_workers.TerminateWorkers() 261 if self.context: 262 self.context.jobqueue.Shutdown()
263
264 265 -class ClientOps:
266 """Class holding high-level client operations."""
267 - def __init__(self, server):
268 self.server = server
269
270 - def handle_request(self, method, args): # pylint: disable=R0911
271 context = self.server.context 272 queue = context.jobqueue 273 274 # TODO: Parameter validation 275 if not isinstance(args, (tuple, list)): 276 logging.info("Received invalid arguments of type '%s'", type(args)) 277 raise ValueError("Invalid arguments type '%s'" % type(args)) 278 279 # TODO: Rewrite to not exit in each 'if/elif' branch 280 281 if method == luxi.REQ_SUBMIT_JOB: 282 logging.info("Receiving new job") 283 (job_def, ) = args 284 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] 285 job_id = queue.SubmitJob(ops) 286 _LogNewJob(True, job_id, ops) 287 return job_id 288 289 elif method == luxi.REQ_SUBMIT_MANY_JOBS: 290 logging.info("Receiving multiple jobs") 291 (job_defs, ) = args 292 jobs = [] 293 for ops in job_defs: 294 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) 295 job_ids = queue.SubmitManyJobs(jobs) 296 for ((status, job_id), ops) in zip(job_ids, jobs): 297 _LogNewJob(status, job_id, ops) 298 return job_ids 299 300 elif method == luxi.REQ_CANCEL_JOB: 301 (job_id, ) = args 302 logging.info("Received job cancel request for %s", job_id) 303 return queue.CancelJob(job_id) 304 305 elif method == luxi.REQ_ARCHIVE_JOB: 306 (job_id, ) = args 307 logging.info("Received job archive request for %s", job_id) 308 return queue.ArchiveJob(job_id) 309 310 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS: 311 (age, timeout) = args 312 logging.info("Received job autoarchive request for age %s, timeout %s", 313 age, timeout) 314 return queue.AutoArchiveJobs(age, timeout) 315 316 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: 317 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args 318 logging.info("Received job poll request for %s", job_id) 319 return queue.WaitForJobChanges(job_id, fields, prev_job_info, 320 prev_log_serial, timeout) 321 322 elif method == luxi.REQ_QUERY: 323 (what, fields, qfilter) = args 324 325 if what in constants.QR_VIA_OP: 326 result = self._Query(opcodes.OpQuery(what=what, fields=fields, 327 qfilter=qfilter)) 328 elif what == constants.QR_LOCK: 329 if qfilter is not None: 330 raise errors.OpPrereqError("Lock queries can't be filtered") 331 return context.glm.QueryLocks(fields) 332 elif what == constants.QR_JOB: 333 return queue.QueryJobs(fields, qfilter) 334 elif what in constants.QR_VIA_LUXI: 335 raise NotImplementedError 336 else: 337 raise errors.OpPrereqError("Resource type '%s' unknown" % what, 338 errors.ECODE_INVAL) 339 340 return result 341 342 elif method == luxi.REQ_QUERY_FIELDS: 343 (what, fields) = args 344 req = objects.QueryFieldsRequest(what=what, fields=fields) 345 346 try: 347 fielddefs = query.ALL_FIELDS[req.what] 348 except KeyError: 349 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, 350 errors.ECODE_INVAL) 351 352 return query.QueryFields(fielddefs, req.fields) 353 354 elif method == luxi.REQ_QUERY_JOBS: 355 (job_ids, fields) = args 356 if isinstance(job_ids, (tuple, list)) and job_ids: 357 msg = utils.CommaJoin(job_ids) 358 else: 359 msg = str(job_ids) 360 logging.info("Received job query request for %s", msg) 361 return queue.OldStyleQueryJobs(job_ids, fields) 362 363 elif method == luxi.REQ_QUERY_INSTANCES: 364 (names, fields, use_locking) = args 365 logging.info("Received instance query request for %s", names) 366 if use_locking: 367 raise errors.OpPrereqError("Sync queries are not allowed", 368 errors.ECODE_INVAL) 369 op = opcodes.OpInstanceQuery(names=names, output_fields=fields, 370 use_locking=use_locking) 371 return self._Query(op) 372 373 elif method == luxi.REQ_QUERY_NODES: 374 (names, fields, use_locking) = args 375 logging.info("Received node query request for %s", names) 376 if use_locking: 377 raise errors.OpPrereqError("Sync queries are not allowed", 378 errors.ECODE_INVAL) 379 op = opcodes.OpNodeQuery(names=names, output_fields=fields, 380 use_locking=use_locking) 381 return self._Query(op) 382 383 elif method == luxi.REQ_QUERY_GROUPS: 384 (names, fields, use_locking) = args 385 logging.info("Received group query request for %s", names) 386 if use_locking: 387 raise errors.OpPrereqError("Sync queries are not allowed", 388 errors.ECODE_INVAL) 389 op = opcodes.OpGroupQuery(names=names, output_fields=fields) 390 return self._Query(op) 391 392 elif method == luxi.REQ_QUERY_EXPORTS: 393 (nodes, use_locking) = args 394 if use_locking: 395 raise errors.OpPrereqError("Sync queries are not allowed", 396 errors.ECODE_INVAL) 397 logging.info("Received exports query request") 398 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking) 399 return self._Query(op) 400 401 elif method == luxi.REQ_QUERY_CONFIG_VALUES: 402 (fields, ) = args 403 logging.info("Received config values query request for %s", fields) 404 op = opcodes.OpClusterConfigQuery(output_fields=fields) 405 return self._Query(op) 406 407 elif method == luxi.REQ_QUERY_CLUSTER_INFO: 408 logging.info("Received cluster info query request") 409 op = opcodes.OpClusterQuery() 410 return self._Query(op) 411 412 elif method == luxi.REQ_QUERY_TAGS: 413 (kind, name) = args 414 logging.info("Received tags query request") 415 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False) 416 return self._Query(op) 417 418 elif method == luxi.REQ_SET_DRAIN_FLAG: 419 (drain_flag, ) = args 420 logging.info("Received queue drain flag change request to %s", 421 drain_flag) 422 return queue.SetDrainFlag(drain_flag) 423 424 elif method == luxi.REQ_SET_WATCHER_PAUSE: 425 (until, ) = args 426 427 if until is None: 428 logging.info("Received request to no longer pause the watcher") 429 else: 430 if not isinstance(until, (int, float)): 431 raise TypeError("Duration must be an integer or float") 432 433 if until < time.time(): 434 raise errors.GenericError("Unable to set pause end time in the past") 435 436 logging.info("Received request to pause the watcher until %s", until) 437 438 return _SetWatcherPause(until) 439 440 else: 441 logging.info("Received invalid request '%s'", method) 442 raise ValueError("Invalid operation '%s'" % method)
443
444 - def _Query(self, op):
445 """Runs the specified opcode and returns the result. 446 447 """ 448 # Queries don't have a job id 449 proc = mcpu.Processor(self.server.context, None, enable_locks=False) 450 451 # TODO: Executing an opcode using locks will acquire them in blocking mode. 452 # Consider using a timeout for retries. 453 return proc.ExecOpCode(op, None)
454
455 456 -class GanetiContext(object):
457 """Context common to all ganeti threads. 458 459 This class creates and holds common objects shared by all threads. 460 461 """ 462 # pylint: disable=W0212 463 # we do want to ensure a singleton here 464 _instance = None 465
466 - def __init__(self):
467 """Constructs a new GanetiContext object. 468 469 There should be only a GanetiContext object at any time, so this 470 function raises an error if this is not the case. 471 472 """ 473 assert self.__class__._instance is None, "double GanetiContext instance" 474 475 # Create global configuration object 476 self.cfg = config.ConfigWriter() 477 478 # Locking manager 479 self.glm = locking.GanetiLockManager( 480 self.cfg.GetNodeList(), 481 self.cfg.GetNodeGroupList(), 482 self.cfg.GetInstanceList()) 483 484 self.cfg.SetContext(self) 485 486 # RPC runner 487 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor) 488 489 # Job queue 490 self.jobqueue = jqueue.JobQueue(self) 491 492 # setting this also locks the class against attribute modifications 493 self.__class__._instance = self
494
495 - def __setattr__(self, name, value):
496 """Setting GanetiContext attributes is forbidden after initialization. 497 498 """ 499 assert self.__class__._instance is None, "Attempt to modify Ganeti Context" 500 object.__setattr__(self, name, value)
501
502 - def AddNode(self, node, ec_id):
503 """Adds a node to the configuration and lock manager. 504 505 """ 506 # Add it to the configuration 507 self.cfg.AddNode(node, ec_id) 508 509 # If preseeding fails it'll not be added 510 self.jobqueue.AddNode(node) 511 512 # Add the new node to the Ganeti Lock Manager 513 self.glm.add(locking.LEVEL_NODE, node.name) 514 self.glm.add(locking.LEVEL_NODE_RES, node.name)
515
516 - def ReaddNode(self, node):
517 """Updates a node that's already in the configuration 518 519 """ 520 # Synchronize the queue again 521 self.jobqueue.AddNode(node)
522
523 - def RemoveNode(self, name):
524 """Removes a node from the configuration and lock manager. 525 526 """ 527 # Remove node from configuration 528 self.cfg.RemoveNode(name) 529 530 # Notify job queue 531 self.jobqueue.RemoveNode(name) 532 533 # Remove the node from the Ganeti Lock Manager 534 self.glm.remove(locking.LEVEL_NODE, name) 535 self.glm.remove(locking.LEVEL_NODE_RES, name)
536
537 538 -def _SetWatcherPause(until):
539 """Creates or removes the watcher pause file. 540 541 @type until: None or int 542 @param until: Unix timestamp saying until when the watcher shouldn't run 543 544 """ 545 if until is None: 546 utils.RemoveFile(constants.WATCHER_PAUSEFILE) 547 else: 548 utils.WriteFile(constants.WATCHER_PAUSEFILE, 549 data="%d\n" % (until, )) 550 551 return until
552
553 554 @rpc.RunWithRPC 555 -def CheckAgreement():
556 """Check the agreement on who is the master. 557 558 The function uses a very simple algorithm: we must get more positive 559 than negative answers. Since in most of the cases we are the master, 560 we'll use our own config file for getting the node list. In the 561 future we could collect the current node list from our (possibly 562 obsolete) known nodes. 563 564 In order to account for cold-start of all nodes, we retry for up to 565 a minute until we get a real answer as the top-voted one. If the 566 nodes are more out-of-sync, for now manual startup of the master 567 should be attempted. 568 569 Note that for a even number of nodes cluster, we need at least half 570 of the nodes (beside ourselves) to vote for us. This creates a 571 problem on two-node clusters, since in this case we require the 572 other node to be up too to confirm our status. 573 574 """ 575 myself = netutils.Hostname.GetSysName() 576 #temp instantiation of a config writer, used only to get the node list 577 cfg = config.ConfigWriter() 578 node_list = cfg.GetNodeList() 579 del cfg 580 retries = 6 581 while retries > 0: 582 votes = bootstrap.GatherMasterVotes(node_list) 583 if not votes: 584 # empty node list, this is a one node cluster 585 return True 586 if votes[0][0] is None: 587 retries -= 1 588 time.sleep(10) 589 continue 590 break 591 if retries == 0: 592 logging.critical("Cluster inconsistent, most of the nodes didn't answer" 593 " after multiple retries. Aborting startup") 594 logging.critical("Use the --no-voting option if you understand what" 595 " effects it has on the cluster state") 596 return False 597 # here a real node is at the top of the list 598 all_votes = sum(item[1] for item in votes) 599 top_node, top_votes = votes[0] 600 601 result = False 602 if top_node != myself: 603 logging.critical("It seems we are not the master (top-voted node" 604 " is %s with %d out of %d votes)", top_node, top_votes, 605 all_votes) 606 elif top_votes < all_votes - top_votes: 607 logging.critical("It seems we are not the master (%d votes for," 608 " %d votes against)", top_votes, all_votes - top_votes) 609 else: 610 result = True 611 612 return result
613
614 615 @rpc.RunWithRPC 616 -def ActivateMasterIP():
617 # activate ip 618 cfg = config.ConfigWriter() 619 master_params = cfg.GetMasterNetworkParameters() 620 ems = cfg.GetUseExternalMipScript() 621 runner = rpc.BootstrapRunner() 622 result = runner.call_node_activate_master_ip(master_params.name, 623 master_params, ems) 624 625 msg = result.fail_msg 626 if msg: 627 logging.error("Can't activate master IP address: %s", msg)
628
629 630 -def CheckMasterd(options, args):
631 """Initial checks whether to run or exit with a failure. 632 633 """ 634 if args: # masterd doesn't take any arguments 635 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0]) 636 sys.exit(constants.EXIT_FAILURE) 637 638 ssconf.CheckMaster(options.debug) 639 640 try: 641 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid 642 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid 643 except KeyError: 644 print >> sys.stderr, ("User or group not existing on system: %s:%s" % 645 (constants.MASTERD_USER, constants.DAEMONS_GROUP)) 646 sys.exit(constants.EXIT_FAILURE) 647 648 # Determine static runtime architecture information 649 runtime.InitArchInfo() 650 651 # Check the configuration is sane before anything else 652 try: 653 config.ConfigWriter() 654 except errors.ConfigVersionMismatch, err: 655 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0]) 656 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1]) 657 print >> sys.stderr, \ 658 ("Configuration version mismatch. The current Ganeti software" 659 " expects version %s, but the on-disk configuration file has" 660 " version %s. This is likely the result of upgrading the" 661 " software without running the upgrade procedure. Please contact" 662 " your cluster administrator or complete the upgrade using the" 663 " cfgupgrade utility, after reading the upgrade notes." % 664 (v1, v2)) 665 sys.exit(constants.EXIT_FAILURE) 666 except errors.ConfigurationError, err: 667 print >> sys.stderr, \ 668 ("Configuration error while opening the configuration file: %s\n" 669 "This might be caused by an incomplete software upgrade or" 670 " by a corrupted configuration file. Until the problem is fixed" 671 " the master daemon cannot start." % str(err)) 672 sys.exit(constants.EXIT_FAILURE) 673 674 # If CheckMaster didn't fail we believe we are the master, but we have to 675 # confirm with the other nodes. 676 if options.no_voting: 677 if not options.yes_do_it: 678 sys.stdout.write("The 'no voting' option has been selected.\n") 679 sys.stdout.write("This is dangerous, please confirm by" 680 " typing uppercase 'yes': ") 681 sys.stdout.flush() 682 683 confirmation = sys.stdin.readline().strip() 684 if confirmation != "YES": 685 print >> sys.stderr, "Aborting." 686 sys.exit(constants.EXIT_FAILURE) 687 688 else: 689 # CheckAgreement uses RPC and threads, hence it needs to be run in 690 # a separate process before we call utils.Daemonize in the current 691 # process. 692 if not utils.RunInSeparateProcess(CheckAgreement): 693 sys.exit(constants.EXIT_FAILURE) 694 695 # ActivateMasterIP also uses RPC/threads, so we run it again via a 696 # separate process. 697 698 # TODO: decide whether failure to activate the master IP is a fatal error 699 utils.RunInSeparateProcess(ActivateMasterIP)
700
701 702 -def PrepMasterd(options, _):
703 """Prep master daemon function, executed with the PID file held. 704 705 """ 706 # This is safe to do as the pid file guarantees against 707 # concurrent execution. 708 utils.RemoveFile(constants.MASTER_SOCKET) 709 710 mainloop = daemon.Mainloop() 711 master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid) 712 return (mainloop, master)
713
714 715 -def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
716 """Main master daemon function, executed with the PID file held. 717 718 """ 719 (mainloop, master) = prep_data 720 try: 721 rpc.Init() 722 try: 723 master.setup_queue() 724 try: 725 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown) 726 finally: 727 master.server_cleanup() 728 finally: 729 rpc.Shutdown() 730 finally: 731 utils.RemoveFile(constants.MASTER_SOCKET) 732 733 logging.info("Clean master daemon shutdown") 734
735 736 -def Main():
737 """Main function""" 738 parser = OptionParser(description="Ganeti master daemon", 739 usage="%prog [-f] [-d]", 740 version="%%prog (ganeti) %s" % 741 constants.RELEASE_VERSION) 742 parser.add_option("--no-voting", dest="no_voting", 743 help="Do not check that the nodes agree on this node" 744 " being the master and start the daemon unconditionally", 745 default=False, action="store_true") 746 parser.add_option("--yes-do-it", dest="yes_do_it", 747 help="Override interactive check for --no-voting", 748 default=False, action="store_true") 749 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd, 750 ExecMasterd, multithreaded=True)
751