Package ganeti :: Package server :: Module masterd
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.masterd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Master daemon program. 
 23   
 24  Some classes deviates from the standard style guide since the 
 25  inheritance from parent classes requires it. 
 26   
 27  """ 
 28   
 29  # pylint: disable=C0103 
 30  # C0103: Invalid name ganeti-masterd 
 31   
 32  import grp 
 33  import os 
 34  import pwd 
 35  import sys 
 36  import socket 
 37  import time 
 38  import tempfile 
 39  import logging 
 40   
 41  from optparse import OptionParser 
 42   
 43  from ganeti import config 
 44  from ganeti import constants 
 45  from ganeti import daemon 
 46  from ganeti import mcpu 
 47  from ganeti import opcodes 
 48  from ganeti import jqueue 
 49  from ganeti import locking 
 50  from ganeti import luxi 
 51  from ganeti import utils 
 52  from ganeti import errors 
 53  from ganeti import ssconf 
 54  from ganeti import workerpool 
 55  from ganeti import rpc 
 56  from ganeti import bootstrap 
 57  from ganeti import netutils 
 58  from ganeti import objects 
 59  from ganeti import query 
 60  from ganeti import runtime 
 61  from ganeti import pathutils 
 62  from ganeti import ht 
 63   
 64   
 65  CLIENT_REQUEST_WORKERS = 16 
 66   
 67  EXIT_NOTMASTER = constants.EXIT_NOTMASTER 
 68  EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR 
69 70 71 -def _LogNewJob(status, info, ops):
72 """Log information about a recently submitted job. 73 74 """ 75 op_summary = utils.CommaJoin(op.Summary() for op in ops) 76 77 if status: 78 logging.info("New job with id %s, summary: %s", info, op_summary) 79 else: 80 logging.info("Failed to submit job, reason: '%s', summary: %s", 81 info, op_summary)
82
83 84 -class ClientRequestWorker(workerpool.BaseWorker):
85 # pylint: disable=W0221
86 - def RunTask(self, server, message, client):
87 """Process the request. 88 89 """ 90 client_ops = ClientOps(server) 91 92 try: 93 (method, args, version) = luxi.ParseRequest(message) 94 except luxi.ProtocolError, err: 95 logging.error("Protocol Error: %s", err) 96 client.close_log() 97 return 98 99 success = False 100 try: 101 # Verify client's version if there was one in the request 102 if version is not None and version != constants.LUXI_VERSION: 103 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % 104 (constants.LUXI_VERSION, version)) 105 106 result = client_ops.handle_request(method, args) 107 success = True 108 except errors.GenericError, err: 109 logging.exception("Unexpected exception") 110 success = False 111 result = errors.EncodeException(err) 112 except: 113 logging.exception("Unexpected exception") 114 err = sys.exc_info() 115 result = "Caught exception: %s" % str(err[1]) 116 117 try: 118 reply = luxi.FormatResponse(success, result) 119 client.send_message(reply) 120 # awake the main thread so that it can write out the data. 121 server.awaker.signal() 122 except: # pylint: disable=W0702 123 logging.exception("Send error") 124 client.close_log()
125
126 127 -class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
128 """Handler for master peers. 129 130 """ 131 _MAX_UNHANDLED = 1 132
133 - def __init__(self, server, connected_socket, client_address, family):
134 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, 135 client_address, 136 constants.LUXI_EOM, 137 family, self._MAX_UNHANDLED) 138 self.server = server
139
140 - def handle_message(self, message, _):
141 self.server.request_workers.AddTask((self.server, message, self))
142
143 144 -class _MasterShutdownCheck:
145 """Logic for master daemon shutdown. 146 147 """ 148 #: How long to wait between checks 149 _CHECK_INTERVAL = 5.0 150 151 #: How long to wait after all jobs are done (e.g. to give clients time to 152 #: retrieve the job status) 153 _SHUTDOWN_LINGER = 5.0 154
155 - def __init__(self):
156 """Initializes this class. 157 158 """ 159 self._had_active_jobs = None 160 self._linger_timeout = None
161
162 - def __call__(self, jq_prepare_result):
163 """Determines if master daemon is ready for shutdown. 164 165 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} 166 @rtype: None or number 167 @return: None if master daemon is ready, timeout if the check must be 168 repeated 169 170 """ 171 if jq_prepare_result: 172 # Check again shortly 173 logging.info("Job queue has been notified for shutdown but is still" 174 " busy; next check in %s seconds", self._CHECK_INTERVAL) 175 self._had_active_jobs = True 176 return self._CHECK_INTERVAL 177 178 if not self._had_active_jobs: 179 # Can shut down as there were no active jobs on the first check 180 return None 181 182 # No jobs are running anymore, but maybe some clients want to collect some 183 # information. Give them a short amount of time. 184 if self._linger_timeout is None: 185 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) 186 187 remaining = self._linger_timeout.Remaining() 188 189 logging.info("Job queue no longer busy; shutting down master daemon" 190 " in %s seconds", remaining) 191 192 # TODO: Should the master daemon socket be closed at this point? Doing so 193 # wouldn't affect existing connections. 194 195 if remaining < 0: 196 return None 197 else: 198 return remaining
199
200 201 -class MasterServer(daemon.AsyncStreamServer):
202 """Master Server. 203 204 This is the main asynchronous master server. It handles connections to the 205 master socket. 206 207 """ 208 family = socket.AF_UNIX 209
210 - def __init__(self, address, uid, gid):
211 """MasterServer constructor 212 213 @param address: the unix socket address to bind the MasterServer to 214 @param uid: The uid of the owner of the socket 215 @param gid: The gid of the owner of the socket 216 217 """ 218 temp_name = tempfile.mktemp(dir=os.path.dirname(address)) 219 daemon.AsyncStreamServer.__init__(self, self.family, temp_name) 220 os.chmod(temp_name, 0770) 221 os.chown(temp_name, uid, gid) 222 os.rename(temp_name, address) 223 224 self.awaker = daemon.AsyncAwaker() 225 226 # We'll only start threads once we've forked. 227 self.context = None 228 self.request_workers = None 229 230 self._shutdown_check = None
231
232 - def handle_connection(self, connected_socket, client_address):
233 # TODO: add connection count and limit the number of open connections to a 234 # maximum number to avoid breaking for lack of file descriptors or memory. 235 MasterClientHandler(self, connected_socket, client_address, self.family)
236
237 - def setup_queue(self):
238 self.context = GanetiContext() 239 self.request_workers = workerpool.WorkerPool("ClientReq", 240 CLIENT_REQUEST_WORKERS, 241 ClientRequestWorker)
242
243 - def WaitForShutdown(self):
244 """Prepares server for shutdown. 245 246 """ 247 if self._shutdown_check is None: 248 self._shutdown_check = _MasterShutdownCheck() 249 250 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
251
252 - def server_cleanup(self):
253 """Cleanup the server. 254 255 This involves shutting down the processor threads and the master 256 socket. 257 258 """ 259 try: 260 self.close() 261 finally: 262 if self.request_workers: 263 self.request_workers.TerminateWorkers() 264 if self.context: 265 self.context.jobqueue.Shutdown()
266
267 268 -class ClientOps:
269 """Class holding high-level client operations."""
270 - def __init__(self, server):
271 self.server = server
272
273 - def handle_request(self, method, args): # pylint: disable=R0911
274 context = self.server.context 275 queue = context.jobqueue 276 277 # TODO: Parameter validation 278 if not isinstance(args, (tuple, list)): 279 logging.info("Received invalid arguments of type '%s'", type(args)) 280 raise ValueError("Invalid arguments type '%s'" % type(args)) 281 282 # TODO: Rewrite to not exit in each 'if/elif' branch 283 284 if method == luxi.REQ_SUBMIT_JOB: 285 logging.info("Receiving new job") 286 (job_def, ) = args 287 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] 288 job_id = queue.SubmitJob(ops) 289 _LogNewJob(True, job_id, ops) 290 return job_id 291 292 elif method == luxi.REQ_SUBMIT_MANY_JOBS: 293 logging.info("Receiving multiple jobs") 294 (job_defs, ) = args 295 jobs = [] 296 for ops in job_defs: 297 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) 298 job_ids = queue.SubmitManyJobs(jobs) 299 for ((status, job_id), ops) in zip(job_ids, jobs): 300 _LogNewJob(status, job_id, ops) 301 return job_ids 302 303 elif method == luxi.REQ_CANCEL_JOB: 304 (job_id, ) = args 305 logging.info("Received job cancel request for %s", job_id) 306 return queue.CancelJob(job_id) 307 308 elif method == luxi.REQ_CHANGE_JOB_PRIORITY: 309 (job_id, priority) = args 310 logging.info("Received request to change priority for job %s to %s", 311 job_id, priority) 312 return queue.ChangeJobPriority(job_id, priority) 313 314 elif method == luxi.REQ_ARCHIVE_JOB: 315 (job_id, ) = args 316 logging.info("Received job archive request for %s", job_id) 317 return queue.ArchiveJob(job_id) 318 319 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS: 320 (age, timeout) = args 321 logging.info("Received job autoarchive request for age %s, timeout %s", 322 age, timeout) 323 return queue.AutoArchiveJobs(age, timeout) 324 325 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: 326 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args 327 logging.info("Received job poll request for %s", job_id) 328 return queue.WaitForJobChanges(job_id, fields, prev_job_info, 329 prev_log_serial, timeout) 330 331 elif method == luxi.REQ_QUERY: 332 (what, fields, qfilter) = args 333 334 if what in constants.QR_VIA_OP: 335 result = self._Query(opcodes.OpQuery(what=what, fields=fields, 336 qfilter=qfilter)) 337 elif what == constants.QR_LOCK: 338 if qfilter is not None: 339 raise errors.OpPrereqError("Lock queries can't be filtered", 340 errors.ECODE_INVAL) 341 return context.glm.QueryLocks(fields) 342 elif what == constants.QR_JOB: 343 return queue.QueryJobs(fields, qfilter) 344 elif what in constants.QR_VIA_LUXI: 345 raise NotImplementedError 346 else: 347 raise errors.OpPrereqError("Resource type '%s' unknown" % what, 348 errors.ECODE_INVAL) 349 350 return result 351 352 elif method == luxi.REQ_QUERY_FIELDS: 353 (what, fields) = args 354 req = objects.QueryFieldsRequest(what=what, fields=fields) 355 356 try: 357 fielddefs = query.ALL_FIELDS[req.what] 358 except KeyError: 359 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, 360 errors.ECODE_INVAL) 361 362 return query.QueryFields(fielddefs, req.fields) 363 364 elif method == luxi.REQ_QUERY_JOBS: 365 (job_ids, fields) = args 366 if isinstance(job_ids, (tuple, list)) and job_ids: 367 msg = utils.CommaJoin(job_ids) 368 else: 369 msg = str(job_ids) 370 logging.info("Received job query request for %s", msg) 371 return queue.OldStyleQueryJobs(job_ids, fields) 372 373 elif method == luxi.REQ_QUERY_INSTANCES: 374 (names, fields, use_locking) = args 375 logging.info("Received instance query request for %s", names) 376 if use_locking: 377 raise errors.OpPrereqError("Sync queries are not allowed", 378 errors.ECODE_INVAL) 379 op = opcodes.OpInstanceQuery(names=names, output_fields=fields, 380 use_locking=use_locking) 381 return self._Query(op) 382 383 elif method == luxi.REQ_QUERY_NODES: 384 (names, fields, use_locking) = args 385 logging.info("Received node query request for %s", names) 386 if use_locking: 387 raise errors.OpPrereqError("Sync queries are not allowed", 388 errors.ECODE_INVAL) 389 op = opcodes.OpNodeQuery(names=names, output_fields=fields, 390 use_locking=use_locking) 391 return self._Query(op) 392 393 elif method == luxi.REQ_QUERY_GROUPS: 394 (names, fields, use_locking) = args 395 logging.info("Received group query request for %s", names) 396 if use_locking: 397 raise errors.OpPrereqError("Sync queries are not allowed", 398 errors.ECODE_INVAL) 399 op = opcodes.OpGroupQuery(names=names, output_fields=fields) 400 return self._Query(op) 401 402 elif method == luxi.REQ_QUERY_NETWORKS: 403 (names, fields, use_locking) = args 404 logging.info("Received network query request for %s", names) 405 if use_locking: 406 raise errors.OpPrereqError("Sync queries are not allowed", 407 errors.ECODE_INVAL) 408 op = opcodes.OpNetworkQuery(names=names, output_fields=fields) 409 return self._Query(op) 410 411 elif method == luxi.REQ_QUERY_EXPORTS: 412 (nodes, use_locking) = args 413 if use_locking: 414 raise errors.OpPrereqError("Sync queries are not allowed", 415 errors.ECODE_INVAL) 416 logging.info("Received exports query request") 417 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking) 418 return self._Query(op) 419 420 elif method == luxi.REQ_QUERY_CONFIG_VALUES: 421 (fields, ) = args 422 logging.info("Received config values query request for %s", fields) 423 op = opcodes.OpClusterConfigQuery(output_fields=fields) 424 return self._Query(op) 425 426 elif method == luxi.REQ_QUERY_CLUSTER_INFO: 427 logging.info("Received cluster info query request") 428 op = opcodes.OpClusterQuery() 429 return self._Query(op) 430 431 elif method == luxi.REQ_QUERY_TAGS: 432 (kind, name) = args 433 logging.info("Received tags query request") 434 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False) 435 return self._Query(op) 436 437 elif method == luxi.REQ_SET_DRAIN_FLAG: 438 (drain_flag, ) = args 439 logging.info("Received queue drain flag change request to %s", 440 drain_flag) 441 return queue.SetDrainFlag(drain_flag) 442 443 elif method == luxi.REQ_SET_WATCHER_PAUSE: 444 (until, ) = args 445 446 return _SetWatcherPause(context, until) 447 448 else: 449 logging.info("Received invalid request '%s'", method) 450 raise ValueError("Invalid operation '%s'" % method)
451
452 - def _Query(self, op):
453 """Runs the specified opcode and returns the result. 454 455 """ 456 # Queries don't have a job id 457 proc = mcpu.Processor(self.server.context, None, enable_locks=False) 458 459 # TODO: Executing an opcode using locks will acquire them in blocking mode. 460 # Consider using a timeout for retries. 461 return proc.ExecOpCode(op, None)
462
463 464 -class GanetiContext(object):
465 """Context common to all ganeti threads. 466 467 This class creates and holds common objects shared by all threads. 468 469 """ 470 # pylint: disable=W0212 471 # we do want to ensure a singleton here 472 _instance = None 473
474 - def __init__(self):
475 """Constructs a new GanetiContext object. 476 477 There should be only a GanetiContext object at any time, so this 478 function raises an error if this is not the case. 479 480 """ 481 assert self.__class__._instance is None, "double GanetiContext instance" 482 483 # Create global configuration object 484 self.cfg = config.ConfigWriter() 485 486 # Locking manager 487 self.glm = locking.GanetiLockManager( 488 self.cfg.GetNodeList(), 489 self.cfg.GetNodeGroupList(), 490 self.cfg.GetInstanceList(), 491 self.cfg.GetNetworkList()) 492 493 self.cfg.SetContext(self) 494 495 # RPC runner 496 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor) 497 498 # Job queue 499 self.jobqueue = jqueue.JobQueue(self) 500 501 # setting this also locks the class against attribute modifications 502 self.__class__._instance = self
503
504 - def __setattr__(self, name, value):
505 """Setting GanetiContext attributes is forbidden after initialization. 506 507 """ 508 assert self.__class__._instance is None, "Attempt to modify Ganeti Context" 509 object.__setattr__(self, name, value)
510
511 - def AddNode(self, node, ec_id):
512 """Adds a node to the configuration and lock manager. 513 514 """ 515 # Add it to the configuration 516 self.cfg.AddNode(node, ec_id) 517 518 # If preseeding fails it'll not be added 519 self.jobqueue.AddNode(node) 520 521 # Add the new node to the Ganeti Lock Manager 522 self.glm.add(locking.LEVEL_NODE, node.name) 523 self.glm.add(locking.LEVEL_NODE_RES, node.name)
524
525 - def ReaddNode(self, node):
526 """Updates a node that's already in the configuration 527 528 """ 529 # Synchronize the queue again 530 self.jobqueue.AddNode(node)
531
532 - def RemoveNode(self, name):
533 """Removes a node from the configuration and lock manager. 534 535 """ 536 # Remove node from configuration 537 self.cfg.RemoveNode(name) 538 539 # Notify job queue 540 self.jobqueue.RemoveNode(name) 541 542 # Remove the node from the Ganeti Lock Manager 543 self.glm.remove(locking.LEVEL_NODE, name) 544 self.glm.remove(locking.LEVEL_NODE_RES, name)
545
546 547 -def _SetWatcherPause(context, until):
548 """Creates or removes the watcher pause file. 549 550 @type context: L{GanetiContext} 551 @param context: Global Ganeti context 552 @type until: None or int 553 @param until: Unix timestamp saying until when the watcher shouldn't run 554 555 """ 556 node_names = context.cfg.GetNodeList() 557 558 if until is None: 559 logging.info("Received request to no longer pause watcher") 560 else: 561 if not ht.TNumber(until): 562 raise TypeError("Duration must be numeric") 563 564 if until < time.time(): 565 raise errors.GenericError("Unable to set pause end time in the past") 566 567 logging.info("Received request to pause watcher until %s", until) 568 569 result = context.rpc.call_set_watcher_pause(node_names, until) 570 571 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg) 572 for (node_name, nres) in result.items() 573 if nres.fail_msg and not nres.offline) 574 if errmsg: 575 raise errors.OpExecError("Watcher pause was set where possible, but failed" 576 " on the following node(s): %s" % errmsg) 577 578 return until
579
580 581 @rpc.RunWithRPC 582 -def CheckAgreement():
583 """Check the agreement on who is the master. 584 585 The function uses a very simple algorithm: we must get more positive 586 than negative answers. Since in most of the cases we are the master, 587 we'll use our own config file for getting the node list. In the 588 future we could collect the current node list from our (possibly 589 obsolete) known nodes. 590 591 In order to account for cold-start of all nodes, we retry for up to 592 a minute until we get a real answer as the top-voted one. If the 593 nodes are more out-of-sync, for now manual startup of the master 594 should be attempted. 595 596 Note that for a even number of nodes cluster, we need at least half 597 of the nodes (beside ourselves) to vote for us. This creates a 598 problem on two-node clusters, since in this case we require the 599 other node to be up too to confirm our status. 600 601 """ 602 myself = netutils.Hostname.GetSysName() 603 #temp instantiation of a config writer, used only to get the node list 604 cfg = config.ConfigWriter() 605 node_list = cfg.GetNodeList() 606 del cfg 607 retries = 6 608 while retries > 0: 609 votes = bootstrap.GatherMasterVotes(node_list) 610 if not votes: 611 # empty node list, this is a one node cluster 612 return True 613 if votes[0][0] is None: 614 retries -= 1 615 time.sleep(10) 616 continue 617 break 618 if retries == 0: 619 logging.critical("Cluster inconsistent, most of the nodes didn't answer" 620 " after multiple retries. Aborting startup") 621 logging.critical("Use the --no-voting option if you understand what" 622 " effects it has on the cluster state") 623 return False 624 # here a real node is at the top of the list 625 all_votes = sum(item[1] for item in votes) 626 top_node, top_votes = votes[0] 627 628 result = False 629 if top_node != myself: 630 logging.critical("It seems we are not the master (top-voted node" 631 " is %s with %d out of %d votes)", top_node, top_votes, 632 all_votes) 633 elif top_votes < all_votes - top_votes: 634 logging.critical("It seems we are not the master (%d votes for," 635 " %d votes against)", top_votes, all_votes - top_votes) 636 else: 637 result = True 638 639 return result
640
641 642 @rpc.RunWithRPC 643 -def ActivateMasterIP():
644 # activate ip 645 cfg = config.ConfigWriter() 646 master_params = cfg.GetMasterNetworkParameters() 647 ems = cfg.GetUseExternalMipScript() 648 runner = rpc.BootstrapRunner() 649 result = runner.call_node_activate_master_ip(master_params.name, 650 master_params, ems) 651 652 msg = result.fail_msg 653 if msg: 654 logging.error("Can't activate master IP address: %s", msg)
655
656 657 -def CheckMasterd(options, args):
658 """Initial checks whether to run or exit with a failure. 659 660 """ 661 if args: # masterd doesn't take any arguments 662 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0]) 663 sys.exit(constants.EXIT_FAILURE) 664 665 ssconf.CheckMaster(options.debug) 666 667 try: 668 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid 669 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid 670 except KeyError: 671 print >> sys.stderr, ("User or group not existing on system: %s:%s" % 672 (constants.MASTERD_USER, constants.DAEMONS_GROUP)) 673 sys.exit(constants.EXIT_FAILURE) 674 675 # Determine static runtime architecture information 676 runtime.InitArchInfo() 677 678 # Check the configuration is sane before anything else 679 try: 680 config.ConfigWriter() 681 except errors.ConfigVersionMismatch, err: 682 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0]) 683 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1]) 684 print >> sys.stderr, \ 685 ("Configuration version mismatch. The current Ganeti software" 686 " expects version %s, but the on-disk configuration file has" 687 " version %s. This is likely the result of upgrading the" 688 " software without running the upgrade procedure. Please contact" 689 " your cluster administrator or complete the upgrade using the" 690 " cfgupgrade utility, after reading the upgrade notes." % 691 (v1, v2)) 692 sys.exit(constants.EXIT_FAILURE) 693 except errors.ConfigurationError, err: 694 print >> sys.stderr, \ 695 ("Configuration error while opening the configuration file: %s\n" 696 "This might be caused by an incomplete software upgrade or" 697 " by a corrupted configuration file. Until the problem is fixed" 698 " the master daemon cannot start." % str(err)) 699 sys.exit(constants.EXIT_FAILURE) 700 701 # If CheckMaster didn't fail we believe we are the master, but we have to 702 # confirm with the other nodes. 703 if options.no_voting: 704 if not options.yes_do_it: 705 sys.stdout.write("The 'no voting' option has been selected.\n") 706 sys.stdout.write("This is dangerous, please confirm by" 707 " typing uppercase 'yes': ") 708 sys.stdout.flush() 709 710 confirmation = sys.stdin.readline().strip() 711 if confirmation != "YES": 712 print >> sys.stderr, "Aborting." 713 sys.exit(constants.EXIT_FAILURE) 714 715 else: 716 # CheckAgreement uses RPC and threads, hence it needs to be run in 717 # a separate process before we call utils.Daemonize in the current 718 # process. 719 if not utils.RunInSeparateProcess(CheckAgreement): 720 sys.exit(constants.EXIT_FAILURE) 721 722 # ActivateMasterIP also uses RPC/threads, so we run it again via a 723 # separate process. 724 725 # TODO: decide whether failure to activate the master IP is a fatal error 726 utils.RunInSeparateProcess(ActivateMasterIP)
727
728 729 -def PrepMasterd(options, _):
730 """Prep master daemon function, executed with the PID file held. 731 732 """ 733 # This is safe to do as the pid file guarantees against 734 # concurrent execution. 735 utils.RemoveFile(pathutils.MASTER_SOCKET) 736 737 mainloop = daemon.Mainloop() 738 master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid) 739 return (mainloop, master)
740
741 742 -def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
743 """Main master daemon function, executed with the PID file held. 744 745 """ 746 (mainloop, master) = prep_data 747 try: 748 rpc.Init() 749 try: 750 master.setup_queue() 751 try: 752 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown) 753 finally: 754 master.server_cleanup() 755 finally: 756 rpc.Shutdown() 757 finally: 758 utils.RemoveFile(pathutils.MASTER_SOCKET) 759 760 logging.info("Clean master daemon shutdown") 761
762 763 -def Main():
764 """Main function""" 765 parser = OptionParser(description="Ganeti master daemon", 766 usage="%prog [-f] [-d]", 767 version="%%prog (ganeti) %s" % 768 constants.RELEASE_VERSION) 769 parser.add_option("--no-voting", dest="no_voting", 770 help="Do not check that the nodes agree on this node" 771 " being the master and start the daemon unconditionally", 772 default=False, action="store_true") 773 parser.add_option("--yes-do-it", dest="yes_do_it", 774 help="Override interactive check for --no-voting", 775 default=False, action="store_true") 776 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd, 777 ExecMasterd, multithreaded=True)
778