Package ganeti :: Package server :: Module masterd
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.masterd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Master daemon program. 
 32   
 33  Some classes deviates from the standard style guide since the 
 34  inheritance from parent classes requires it. 
 35   
 36  """ 
 37   
 38  # pylint: disable=C0103 
 39  # C0103: Invalid name ganeti-masterd 
 40   
 41  import grp 
 42  import os 
 43  import pwd 
 44  import sys 
 45  import socket 
 46  import time 
 47  import tempfile 
 48  import logging 
 49   
 50  from optparse import OptionParser 
 51   
 52  from ganeti import config 
 53  from ganeti import constants 
 54  from ganeti import daemon 
 55  from ganeti import mcpu 
 56  from ganeti import opcodes 
 57  from ganeti import jqueue 
 58  from ganeti import locking 
 59  from ganeti import luxi 
 60  import ganeti.rpc.errors as rpcerr 
 61  from ganeti import utils 
 62  from ganeti import errors 
 63  from ganeti import ssconf 
 64  from ganeti import workerpool 
 65  import ganeti.rpc.node as rpc 
 66  import ganeti.rpc.client as rpccl 
 67  from ganeti import bootstrap 
 68  from ganeti import netutils 
 69  from ganeti import objects 
 70  from ganeti import query 
 71  from ganeti import runtime 
 72  from ganeti import pathutils 
 73  from ganeti import ht 
 74   
 75  from ganeti.utils import version 
 76   
 77   
 78  CLIENT_REQUEST_WORKERS = 16 
 79   
 80  EXIT_NOTMASTER = constants.EXIT_NOTMASTER 
 81  EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR 
82 83 84 -def _LogNewJob(status, info, ops):
85 """Log information about a recently submitted job. 86 87 """ 88 op_summary = utils.CommaJoin(op.Summary() for op in ops) 89 90 if status: 91 logging.info("New job with id %s, summary: %s", info, op_summary) 92 else: 93 logging.info("Failed to submit job, reason: '%s', summary: %s", 94 info, op_summary)
95
96 97 -class ClientRequestWorker(workerpool.BaseWorker):
98 # pylint: disable=W0221
99 - def RunTask(self, server, message, client):
100 """Process the request. 101 102 """ 103 client_ops = ClientOps(server) 104 105 try: 106 (method, args, ver) = rpccl.ParseRequest(message) 107 except rpcerr.ProtocolError, err: 108 logging.error("Protocol Error: %s", err) 109 client.close_log() 110 return 111 112 success = False 113 try: 114 # Verify client's version if there was one in the request 115 if ver is not None and ver != constants.LUXI_VERSION: 116 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % 117 (constants.LUXI_VERSION, ver)) 118 119 result = client_ops.handle_request(method, args) 120 success = True 121 except errors.GenericError, err: 122 logging.exception("Unexpected exception") 123 success = False 124 result = errors.EncodeException(err) 125 except: 126 logging.exception("Unexpected exception") 127 err = sys.exc_info() 128 result = "Caught exception: %s" % str(err[1]) 129 130 try: 131 reply = rpccl.FormatResponse(success, result) 132 client.send_message(reply) 133 # awake the main thread so that it can write out the data. 134 server.awaker.signal() 135 except: # pylint: disable=W0702 136 logging.exception("Send error") 137 client.close_log()
138
139 140 -class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
141 """Handler for master peers. 142 143 """ 144 _MAX_UNHANDLED = 1 145
146 - def __init__(self, server, connected_socket, client_address, family):
147 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, 148 client_address, 149 constants.LUXI_EOM, 150 family, self._MAX_UNHANDLED) 151 self.server = server
152
153 - def handle_message(self, message, _):
154 self.server.request_workers.AddTask((self.server, message, self))
155
156 157 -class _MasterShutdownCheck(object):
158 """Logic for master daemon shutdown. 159 160 """ 161 #: How long to wait between checks 162 _CHECK_INTERVAL = 5.0 163 164 #: How long to wait after all jobs are done (e.g. to give clients time to 165 #: retrieve the job status) 166 _SHUTDOWN_LINGER = 5.0 167
168 - def __init__(self):
169 """Initializes this class. 170 171 """ 172 self._had_active_jobs = None 173 self._linger_timeout = None
174
175 - def __call__(self, jq_prepare_result):
176 """Determines if master daemon is ready for shutdown. 177 178 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} 179 @rtype: None or number 180 @return: None if master daemon is ready, timeout if the check must be 181 repeated 182 183 """ 184 if jq_prepare_result: 185 # Check again shortly 186 logging.info("Job queue has been notified for shutdown but is still" 187 " busy; next check in %s seconds", self._CHECK_INTERVAL) 188 self._had_active_jobs = True 189 return self._CHECK_INTERVAL 190 191 if not self._had_active_jobs: 192 # Can shut down as there were no active jobs on the first check 193 return None 194 195 # No jobs are running anymore, but maybe some clients want to collect some 196 # information. Give them a short amount of time. 197 if self._linger_timeout is None: 198 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) 199 200 remaining = self._linger_timeout.Remaining() 201 202 logging.info("Job queue no longer busy; shutting down master daemon" 203 " in %s seconds", remaining) 204 205 # TODO: Should the master daemon socket be closed at this point? Doing so 206 # wouldn't affect existing connections. 207 208 if remaining < 0: 209 return None 210 else: 211 return remaining
212
213 214 -class MasterServer(daemon.AsyncStreamServer):
215 """Master Server. 216 217 This is the main asynchronous master server. It handles connections to the 218 master socket. 219 220 """ 221 family = socket.AF_UNIX 222
223 - def __init__(self, address, uid, gid):
224 """MasterServer constructor 225 226 @param address: the unix socket address to bind the MasterServer to 227 @param uid: The uid of the owner of the socket 228 @param gid: The gid of the owner of the socket 229 230 """ 231 temp_name = tempfile.mktemp(dir=os.path.dirname(address)) 232 daemon.AsyncStreamServer.__init__(self, self.family, temp_name) 233 os.chmod(temp_name, 0770) 234 os.chown(temp_name, uid, gid) 235 os.rename(temp_name, address) 236 237 self.awaker = daemon.AsyncAwaker() 238 239 # We'll only start threads once we've forked. 240 self.context = None 241 self.request_workers = None 242 243 self._shutdown_check = None
244
245 - def handle_connection(self, connected_socket, client_address):
246 # TODO: add connection count and limit the number of open connections to a 247 # maximum number to avoid breaking for lack of file descriptors or memory. 248 MasterClientHandler(self, connected_socket, client_address, self.family)
249
250 - def setup_queue(self):
251 self.context = GanetiContext() 252 self.request_workers = workerpool.WorkerPool("ClientReq", 253 CLIENT_REQUEST_WORKERS, 254 ClientRequestWorker)
255
256 - def WaitForShutdown(self):
257 """Prepares server for shutdown. 258 259 """ 260 if self._shutdown_check is None: 261 self._shutdown_check = _MasterShutdownCheck() 262 263 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
264
265 - def server_cleanup(self):
266 """Cleanup the server. 267 268 This involves shutting down the processor threads and the master 269 socket. 270 271 """ 272 try: 273 self.close() 274 finally: 275 if self.request_workers: 276 self.request_workers.TerminateWorkers() 277 if self.context: 278 self.context.jobqueue.Shutdown()
279
280 281 -class ClientOps(object):
282 """Class holding high-level client operations."""
283 - def __init__(self, server):
284 self.server = server
285
286 - def handle_request(self, method, args): # pylint: disable=R0911
287 context = self.server.context 288 queue = context.jobqueue 289 290 # TODO: Parameter validation 291 if not isinstance(args, (tuple, list)): 292 logging.info("Received invalid arguments of type '%s'", type(args)) 293 raise ValueError("Invalid arguments type '%s'" % type(args)) 294 295 if method not in luxi.REQ_ALL: 296 logging.info("Received invalid request '%s'", method) 297 raise ValueError("Invalid operation '%s'" % method) 298 299 # TODO: Rewrite to not exit in each 'if/elif' branch 300 301 if method == luxi.REQ_SUBMIT_JOB: 302 logging.info("Receiving new job") 303 (job_def, ) = args 304 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] 305 job_id = queue.SubmitJob(ops) 306 _LogNewJob(True, job_id, ops) 307 return job_id 308 309 elif method == luxi.REQ_PICKUP_JOB: 310 logging.info("Picking up new job from queue") 311 (job_id, ) = args 312 queue.PickupJob(job_id) 313 314 elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE: 315 logging.info("Forcefully receiving new job") 316 (job_def, ) = args 317 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] 318 job_id = queue.SubmitJobToDrainedQueue(ops) 319 _LogNewJob(True, job_id, ops) 320 return job_id 321 322 elif method == luxi.REQ_SUBMIT_MANY_JOBS: 323 logging.info("Receiving multiple jobs") 324 (job_defs, ) = args 325 jobs = [] 326 for ops in job_defs: 327 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) 328 job_ids = queue.SubmitManyJobs(jobs) 329 for ((status, job_id), ops) in zip(job_ids, jobs): 330 _LogNewJob(status, job_id, ops) 331 return job_ids 332 333 elif method == luxi.REQ_CANCEL_JOB: 334 (job_id, ) = args 335 logging.info("Received job cancel request for %s", job_id) 336 return queue.CancelJob(job_id) 337 338 elif method == luxi.REQ_CHANGE_JOB_PRIORITY: 339 (job_id, priority) = args 340 logging.info("Received request to change priority for job %s to %s", 341 job_id, priority) 342 return queue.ChangeJobPriority(job_id, priority) 343 344 elif method == luxi.REQ_ARCHIVE_JOB: 345 (job_id, ) = args 346 logging.info("Received job archive request for %s", job_id) 347 return queue.ArchiveJob(job_id) 348 349 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS: 350 (age, timeout) = args 351 logging.info("Received job autoarchive request for age %s, timeout %s", 352 age, timeout) 353 return queue.AutoArchiveJobs(age, timeout) 354 355 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: 356 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args 357 logging.info("Received job poll request for %s", job_id) 358 return queue.WaitForJobChanges(job_id, fields, prev_job_info, 359 prev_log_serial, timeout) 360 361 elif method == luxi.REQ_QUERY: 362 (what, fields, qfilter) = args 363 364 if what in constants.QR_VIA_OP: 365 result = self._Query(opcodes.OpQuery(what=what, fields=fields, 366 qfilter=qfilter)) 367 elif what == constants.QR_LOCK: 368 if qfilter is not None: 369 raise errors.OpPrereqError("Lock queries can't be filtered", 370 errors.ECODE_INVAL) 371 return context.glm.QueryLocks(fields) 372 elif what == constants.QR_JOB: 373 return queue.QueryJobs(fields, qfilter) 374 elif what in constants.QR_VIA_LUXI: 375 luxi_client = runtime.GetClient(query=True) 376 result = luxi_client.Query(what, fields, qfilter).ToDict() 377 else: 378 raise errors.OpPrereqError("Resource type '%s' unknown" % what, 379 errors.ECODE_INVAL) 380 381 return result 382 383 elif method == luxi.REQ_QUERY_FIELDS: 384 (what, fields) = args 385 req = objects.QueryFieldsRequest(what=what, fields=fields) 386 387 try: 388 fielddefs = query.ALL_FIELDS[req.what] 389 except KeyError: 390 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, 391 errors.ECODE_INVAL) 392 393 return query.QueryFields(fielddefs, req.fields) 394 395 elif method == luxi.REQ_QUERY_JOBS: 396 (job_ids, fields) = args 397 if isinstance(job_ids, (tuple, list)) and job_ids: 398 msg = utils.CommaJoin(job_ids) 399 else: 400 msg = str(job_ids) 401 logging.info("Received job query request for %s", msg) 402 return queue.OldStyleQueryJobs(job_ids, fields) 403 404 elif method == luxi.REQ_QUERY_CONFIG_VALUES: 405 (fields, ) = args 406 logging.info("Received config values query request for %s", fields) 407 op = opcodes.OpClusterConfigQuery(output_fields=fields) 408 return self._Query(op) 409 410 elif method == luxi.REQ_QUERY_CLUSTER_INFO: 411 logging.info("Received cluster info query request") 412 op = opcodes.OpClusterQuery() 413 return self._Query(op) 414 415 elif method == luxi.REQ_QUERY_TAGS: 416 (kind, name) = args 417 logging.info("Received tags query request") 418 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False) 419 return self._Query(op) 420 421 elif method == luxi.REQ_SET_DRAIN_FLAG: 422 (drain_flag, ) = args 423 logging.info("Received queue drain flag change request to %s", 424 drain_flag) 425 return queue.SetDrainFlag(drain_flag) 426 427 elif method == luxi.REQ_SET_WATCHER_PAUSE: 428 (until, ) = args 429 430 return _SetWatcherPause(context, until) 431 432 else: 433 logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method) 434 raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL," 435 " but not implemented" % method)
436
437 - def _Query(self, op):
438 """Runs the specified opcode and returns the result. 439 440 """ 441 # Queries don't have a job id 442 proc = mcpu.Processor(self.server.context, None, enable_locks=False) 443 444 # TODO: Executing an opcode using locks will acquire them in blocking mode. 445 # Consider using a timeout for retries. 446 return proc.ExecOpCode(op, None)
447
448 449 -class GanetiContext(object):
450 """Context common to all ganeti threads. 451 452 This class creates and holds common objects shared by all threads. 453 454 """ 455 # pylint: disable=W0212 456 # we do want to ensure a singleton here 457 _instance = None 458
459 - def __init__(self):
460 """Constructs a new GanetiContext object. 461 462 There should be only a GanetiContext object at any time, so this 463 function raises an error if this is not the case. 464 465 """ 466 assert self.__class__._instance is None, "double GanetiContext instance" 467 468 # Create global configuration object 469 self.cfg = config.ConfigWriter() 470 471 # Locking manager 472 self.glm = locking.GanetiLockManager( 473 self.cfg.GetNodeList(), 474 self.cfg.GetNodeGroupList(), 475 [inst.name for inst in self.cfg.GetAllInstancesInfo().values()], 476 self.cfg.GetNetworkList()) 477 478 self.cfg.SetContext(self) 479 480 # RPC runner 481 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor) 482 483 # Job queue 484 self.jobqueue = jqueue.JobQueue(self) 485 486 # setting this also locks the class against attribute modifications 487 self.__class__._instance = self
488
489 - def __setattr__(self, name, value):
490 """Setting GanetiContext attributes is forbidden after initialization. 491 492 """ 493 assert self.__class__._instance is None, "Attempt to modify Ganeti Context" 494 object.__setattr__(self, name, value)
495
496 - def AddNode(self, node, ec_id):
497 """Adds a node to the configuration and lock manager. 498 499 """ 500 # Add it to the configuration 501 self.cfg.AddNode(node, ec_id) 502 503 # If preseeding fails it'll not be added 504 self.jobqueue.AddNode(node) 505 506 # Add the new node to the Ganeti Lock Manager 507 self.glm.add(locking.LEVEL_NODE, node.uuid) 508 self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
509
510 - def ReaddNode(self, node):
511 """Updates a node that's already in the configuration 512 513 """ 514 # Synchronize the queue again 515 self.jobqueue.AddNode(node)
516
517 - def RemoveNode(self, node):
518 """Removes a node from the configuration and lock manager. 519 520 """ 521 # Remove node from configuration 522 self.cfg.RemoveNode(node.uuid) 523 524 # Notify job queue 525 self.jobqueue.RemoveNode(node.name) 526 527 # Remove the node from the Ganeti Lock Manager 528 self.glm.remove(locking.LEVEL_NODE, node.uuid) 529 self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
530
531 532 -def _SetWatcherPause(context, until):
533 """Creates or removes the watcher pause file. 534 535 @type context: L{GanetiContext} 536 @param context: Global Ganeti context 537 @type until: None or int 538 @param until: Unix timestamp saying until when the watcher shouldn't run 539 540 """ 541 node_names = context.cfg.GetNodeList() 542 543 if until is None: 544 logging.info("Received request to no longer pause watcher") 545 else: 546 if not ht.TNumber(until): 547 raise TypeError("Duration must be numeric") 548 549 if until < time.time(): 550 raise errors.GenericError("Unable to set pause end time in the past") 551 552 logging.info("Received request to pause watcher until %s", until) 553 554 result = context.rpc.call_set_watcher_pause(node_names, until) 555 556 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg) 557 for (node_name, nres) in result.items() 558 if nres.fail_msg and not nres.offline) 559 if errmsg: 560 raise errors.OpExecError("Watcher pause was set where possible, but failed" 561 " on the following node(s): %s" % errmsg) 562 563 return until
564
565 566 @rpc.RunWithRPC 567 -def CheckAgreement():
568 """Check the agreement on who is the master. 569 570 The function uses a very simple algorithm: we must get more positive 571 than negative answers. Since in most of the cases we are the master, 572 we'll use our own config file for getting the node list. In the 573 future we could collect the current node list from our (possibly 574 obsolete) known nodes. 575 576 In order to account for cold-start of all nodes, we retry for up to 577 a minute until we get a real answer as the top-voted one. If the 578 nodes are more out-of-sync, for now manual startup of the master 579 should be attempted. 580 581 Note that for a even number of nodes cluster, we need at least half 582 of the nodes (beside ourselves) to vote for us. This creates a 583 problem on two-node clusters, since in this case we require the 584 other node to be up too to confirm our status. 585 586 """ 587 myself = netutils.Hostname.GetSysName() 588 #temp instantiation of a config writer, used only to get the node list 589 cfg = config.ConfigWriter() 590 node_names = cfg.GetNodeNames(cfg.GetNodeList()) 591 del cfg 592 retries = 6 593 while retries > 0: 594 votes = bootstrap.GatherMasterVotes(node_names) 595 if not votes: 596 # empty node list, this is a one node cluster 597 return True 598 if votes[0][0] is None: 599 retries -= 1 600 time.sleep(10) 601 continue 602 break 603 if retries == 0: 604 logging.critical("Cluster inconsistent, most of the nodes didn't answer" 605 " after multiple retries. Aborting startup") 606 logging.critical("Use the --no-voting option if you understand what" 607 " effects it has on the cluster state") 608 return False 609 # here a real node is at the top of the list 610 all_votes = sum(item[1] for item in votes) 611 top_node, top_votes = votes[0] 612 613 result = False 614 if top_node != myself: 615 logging.critical("It seems we are not the master (top-voted node" 616 " is %s with %d out of %d votes)", top_node, top_votes, 617 all_votes) 618 elif top_votes < all_votes - top_votes: 619 logging.critical("It seems we are not the master (%d votes for," 620 " %d votes against)", top_votes, all_votes - top_votes) 621 else: 622 result = True 623 624 return result
625
626 627 @rpc.RunWithRPC 628 -def ActivateMasterIP():
629 # activate ip 630 cfg = config.ConfigWriter() 631 master_params = cfg.GetMasterNetworkParameters() 632 ems = cfg.GetUseExternalMipScript() 633 runner = rpc.BootstrapRunner() 634 # we use the node name, as the configuration is only available here yet 635 result = runner.call_node_activate_master_ip( 636 cfg.GetNodeName(master_params.uuid), master_params, ems) 637 638 msg = result.fail_msg 639 if msg: 640 logging.error("Can't activate master IP address: %s", msg)
641
642 643 -def CheckMasterd(options, args):
644 """Initial checks whether to run or exit with a failure. 645 646 """ 647 if args: # masterd doesn't take any arguments 648 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0]) 649 sys.exit(constants.EXIT_FAILURE) 650 651 ssconf.CheckMaster(options.debug) 652 653 try: 654 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid 655 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid 656 except KeyError: 657 print >> sys.stderr, ("User or group not existing on system: %s:%s" % 658 (constants.MASTERD_USER, constants.DAEMONS_GROUP)) 659 sys.exit(constants.EXIT_FAILURE) 660 661 # Determine static runtime architecture information 662 runtime.InitArchInfo() 663 664 # Check the configuration is sane before anything else 665 try: 666 config.ConfigWriter() 667 except errors.ConfigVersionMismatch, err: 668 v1 = "%s.%s.%s" % version.SplitVersion(err.args[0]) 669 v2 = "%s.%s.%s" % version.SplitVersion(err.args[1]) 670 print >> sys.stderr, \ 671 ("Configuration version mismatch. The current Ganeti software" 672 " expects version %s, but the on-disk configuration file has" 673 " version %s. This is likely the result of upgrading the" 674 " software without running the upgrade procedure. Please contact" 675 " your cluster administrator or complete the upgrade using the" 676 " cfgupgrade utility, after reading the upgrade notes." % 677 (v1, v2)) 678 sys.exit(constants.EXIT_FAILURE) 679 except errors.ConfigurationError, err: 680 print >> sys.stderr, \ 681 ("Configuration error while opening the configuration file: %s\n" 682 "This might be caused by an incomplete software upgrade or" 683 " by a corrupted configuration file. Until the problem is fixed" 684 " the master daemon cannot start." % str(err)) 685 sys.exit(constants.EXIT_FAILURE) 686 687 # If CheckMaster didn't fail we believe we are the master, but we have to 688 # confirm with the other nodes. 689 if options.no_voting: 690 if not options.yes_do_it: 691 sys.stdout.write("The 'no voting' option has been selected.\n") 692 sys.stdout.write("This is dangerous, please confirm by" 693 " typing uppercase 'yes': ") 694 sys.stdout.flush() 695 696 confirmation = sys.stdin.readline().strip() 697 if confirmation != "YES": 698 print >> sys.stderr, "Aborting." 699 sys.exit(constants.EXIT_FAILURE) 700 701 else: 702 # CheckAgreement uses RPC and threads, hence it needs to be run in 703 # a separate process before we call utils.Daemonize in the current 704 # process. 705 if not utils.RunInSeparateProcess(CheckAgreement): 706 sys.exit(constants.EXIT_FAILURE) 707 708 # ActivateMasterIP also uses RPC/threads, so we run it again via a 709 # separate process. 710 711 # TODO: decide whether failure to activate the master IP is a fatal error 712 utils.RunInSeparateProcess(ActivateMasterIP)
713
714 715 -def PrepMasterd(options, _):
716 """Prep master daemon function, executed with the PID file held. 717 718 """ 719 # This is safe to do as the pid file guarantees against 720 # concurrent execution. 721 utils.RemoveFile(pathutils.MASTER_SOCKET) 722 723 mainloop = daemon.Mainloop() 724 master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid) 725 return (mainloop, master)
726
727 728 -def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
729 """Main master daemon function, executed with the PID file held. 730 731 """ 732 (mainloop, master) = prep_data 733 try: 734 rpc.Init() 735 try: 736 master.setup_queue() 737 try: 738 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown) 739 finally: 740 master.server_cleanup() 741 finally: 742 rpc.Shutdown() 743 finally: 744 utils.RemoveFile(pathutils.MASTER_SOCKET) 745 746 logging.info("Clean master daemon shutdown") 747
748 749 -def Main():
750 """Main function""" 751 parser = OptionParser(description="Ganeti master daemon", 752 usage="%prog [-f] [-d]", 753 version="%%prog (ganeti) %s" % 754 constants.RELEASE_VERSION) 755 parser.add_option("--no-voting", dest="no_voting", 756 help="Do not check that the nodes agree on this node" 757 " being the master and start the daemon unconditionally", 758 default=False, action="store_true") 759 parser.add_option("--yes-do-it", dest="yes_do_it", 760 help="Override interactive check for --no-voting", 761 default=False, action="store_true") 762 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd, 763 ExecMasterd, multithreaded=True)
764