Package ganeti :: Package server :: Module masterd
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.masterd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Master daemon program. 
 32   
 33  Some classes deviates from the standard style guide since the 
 34  inheritance from parent classes requires it. 
 35   
 36  """ 
 37   
 38  # pylint: disable=C0103 
 39  # C0103: Invalid name ganeti-masterd 
 40   
 41  import grp 
 42  import os 
 43  import pwd 
 44  import sys 
 45  import socket 
 46  import time 
 47  import tempfile 
 48  import logging 
 49   
 50  from optparse import OptionParser 
 51   
 52  from ganeti import config 
 53  from ganeti import constants 
 54  from ganeti import daemon 
 55  from ganeti import mcpu 
 56  from ganeti import opcodes 
 57  from ganeti import jqueue 
 58  from ganeti import locking 
 59  from ganeti import luxi 
 60  from ganeti import utils 
 61  from ganeti import errors 
 62  from ganeti import ssconf 
 63  from ganeti import workerpool 
 64  from ganeti import rpc 
 65  from ganeti import bootstrap 
 66  from ganeti import netutils 
 67  from ganeti import objects 
 68  from ganeti import query 
 69  from ganeti import runtime 
 70  from ganeti import pathutils 
 71  from ganeti import ht 
 72   
 73  from ganeti.utils import version 
 74   
 75   
 76  CLIENT_REQUEST_WORKERS = 16 
 77   
 78  EXIT_NOTMASTER = constants.EXIT_NOTMASTER 
 79  EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR 
80 81 82 -def _LogNewJob(status, info, ops):
83 """Log information about a recently submitted job. 84 85 """ 86 op_summary = utils.CommaJoin(op.Summary() for op in ops) 87 88 if status: 89 logging.info("New job with id %s, summary: %s", info, op_summary) 90 else: 91 logging.info("Failed to submit job, reason: '%s', summary: %s", 92 info, op_summary)
93
94 95 -class ClientRequestWorker(workerpool.BaseWorker):
96 # pylint: disable=W0221
97 - def RunTask(self, server, message, client):
98 """Process the request. 99 100 """ 101 client_ops = ClientOps(server) 102 103 try: 104 (method, args, ver) = luxi.ParseRequest(message) 105 except luxi.ProtocolError, err: 106 logging.error("Protocol Error: %s", err) 107 client.close_log() 108 return 109 110 success = False 111 try: 112 # Verify client's version if there was one in the request 113 if ver is not None and ver != constants.LUXI_VERSION: 114 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % 115 (constants.LUXI_VERSION, ver)) 116 117 result = client_ops.handle_request(method, args) 118 success = True 119 except errors.GenericError, err: 120 logging.exception("Unexpected exception") 121 success = False 122 result = errors.EncodeException(err) 123 except: 124 logging.exception("Unexpected exception") 125 err = sys.exc_info() 126 result = "Caught exception: %s" % str(err[1]) 127 128 try: 129 reply = luxi.FormatResponse(success, result) 130 client.send_message(reply) 131 # awake the main thread so that it can write out the data. 132 server.awaker.signal() 133 except: # pylint: disable=W0702 134 logging.exception("Send error") 135 client.close_log()
136
137 138 -class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
139 """Handler for master peers. 140 141 """ 142 _MAX_UNHANDLED = 1 143
144 - def __init__(self, server, connected_socket, client_address, family):
145 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, 146 client_address, 147 constants.LUXI_EOM, 148 family, self._MAX_UNHANDLED) 149 self.server = server
150
151 - def handle_message(self, message, _):
152 self.server.request_workers.AddTask((self.server, message, self))
153
154 155 -class _MasterShutdownCheck(object):
156 """Logic for master daemon shutdown. 157 158 """ 159 #: How long to wait between checks 160 _CHECK_INTERVAL = 5.0 161 162 #: How long to wait after all jobs are done (e.g. to give clients time to 163 #: retrieve the job status) 164 _SHUTDOWN_LINGER = 5.0 165
166 - def __init__(self):
167 """Initializes this class. 168 169 """ 170 self._had_active_jobs = None 171 self._linger_timeout = None
172
173 - def __call__(self, jq_prepare_result):
174 """Determines if master daemon is ready for shutdown. 175 176 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} 177 @rtype: None or number 178 @return: None if master daemon is ready, timeout if the check must be 179 repeated 180 181 """ 182 if jq_prepare_result: 183 # Check again shortly 184 logging.info("Job queue has been notified for shutdown but is still" 185 " busy; next check in %s seconds", self._CHECK_INTERVAL) 186 self._had_active_jobs = True 187 return self._CHECK_INTERVAL 188 189 if not self._had_active_jobs: 190 # Can shut down as there were no active jobs on the first check 191 return None 192 193 # No jobs are running anymore, but maybe some clients want to collect some 194 # information. Give them a short amount of time. 195 if self._linger_timeout is None: 196 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) 197 198 remaining = self._linger_timeout.Remaining() 199 200 logging.info("Job queue no longer busy; shutting down master daemon" 201 " in %s seconds", remaining) 202 203 # TODO: Should the master daemon socket be closed at this point? Doing so 204 # wouldn't affect existing connections. 205 206 if remaining < 0: 207 return None 208 else: 209 return remaining
210
211 212 -class MasterServer(daemon.AsyncStreamServer):
213 """Master Server. 214 215 This is the main asynchronous master server. It handles connections to the 216 master socket. 217 218 """ 219 family = socket.AF_UNIX 220
221 - def __init__(self, address, uid, gid):
222 """MasterServer constructor 223 224 @param address: the unix socket address to bind the MasterServer to 225 @param uid: The uid of the owner of the socket 226 @param gid: The gid of the owner of the socket 227 228 """ 229 temp_name = tempfile.mktemp(dir=os.path.dirname(address)) 230 daemon.AsyncStreamServer.__init__(self, self.family, temp_name) 231 os.chmod(temp_name, 0770) 232 os.chown(temp_name, uid, gid) 233 os.rename(temp_name, address) 234 235 self.awaker = daemon.AsyncAwaker() 236 237 # We'll only start threads once we've forked. 238 self.context = None 239 self.request_workers = None 240 241 self._shutdown_check = None
242
243 - def handle_connection(self, connected_socket, client_address):
244 # TODO: add connection count and limit the number of open connections to a 245 # maximum number to avoid breaking for lack of file descriptors or memory. 246 MasterClientHandler(self, connected_socket, client_address, self.family)
247
248 - def setup_queue(self):
249 self.context = GanetiContext() 250 self.request_workers = workerpool.WorkerPool("ClientReq", 251 CLIENT_REQUEST_WORKERS, 252 ClientRequestWorker)
253
254 - def WaitForShutdown(self):
255 """Prepares server for shutdown. 256 257 """ 258 if self._shutdown_check is None: 259 self._shutdown_check = _MasterShutdownCheck() 260 261 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
262
263 - def server_cleanup(self):
264 """Cleanup the server. 265 266 This involves shutting down the processor threads and the master 267 socket. 268 269 """ 270 try: 271 self.close() 272 finally: 273 if self.request_workers: 274 self.request_workers.TerminateWorkers() 275 if self.context: 276 self.context.jobqueue.Shutdown()
277
278 279 -class ClientOps(object):
280 """Class holding high-level client operations."""
281 - def __init__(self, server):
282 self.server = server
283
284 - def handle_request(self, method, args): # pylint: disable=R0911
285 context = self.server.context 286 queue = context.jobqueue 287 288 # TODO: Parameter validation 289 if not isinstance(args, (tuple, list)): 290 logging.info("Received invalid arguments of type '%s'", type(args)) 291 raise ValueError("Invalid arguments type '%s'" % type(args)) 292 293 if method not in luxi.REQ_ALL: 294 logging.info("Received invalid request '%s'", method) 295 raise ValueError("Invalid operation '%s'" % method) 296 297 # TODO: Rewrite to not exit in each 'if/elif' branch 298 299 if method == luxi.REQ_SUBMIT_JOB: 300 logging.info("Receiving new job") 301 (job_def, ) = args 302 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] 303 job_id = queue.SubmitJob(ops) 304 _LogNewJob(True, job_id, ops) 305 return job_id 306 307 elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE: 308 logging.info("Forcefully receiving new job") 309 (job_def, ) = args 310 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] 311 job_id = queue.SubmitJobToDrainedQueue(ops) 312 _LogNewJob(True, job_id, ops) 313 return job_id 314 315 elif method == luxi.REQ_SUBMIT_MANY_JOBS: 316 logging.info("Receiving multiple jobs") 317 (job_defs, ) = args 318 jobs = [] 319 for ops in job_defs: 320 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) 321 job_ids = queue.SubmitManyJobs(jobs) 322 for ((status, job_id), ops) in zip(job_ids, jobs): 323 _LogNewJob(status, job_id, ops) 324 return job_ids 325 326 elif method == luxi.REQ_CANCEL_JOB: 327 (job_id, ) = args 328 logging.info("Received job cancel request for %s", job_id) 329 return queue.CancelJob(job_id) 330 331 elif method == luxi.REQ_CHANGE_JOB_PRIORITY: 332 (job_id, priority) = args 333 logging.info("Received request to change priority for job %s to %s", 334 job_id, priority) 335 return queue.ChangeJobPriority(job_id, priority) 336 337 elif method == luxi.REQ_ARCHIVE_JOB: 338 (job_id, ) = args 339 logging.info("Received job archive request for %s", job_id) 340 return queue.ArchiveJob(job_id) 341 342 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS: 343 (age, timeout) = args 344 logging.info("Received job autoarchive request for age %s, timeout %s", 345 age, timeout) 346 return queue.AutoArchiveJobs(age, timeout) 347 348 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: 349 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args 350 logging.info("Received job poll request for %s", job_id) 351 return queue.WaitForJobChanges(job_id, fields, prev_job_info, 352 prev_log_serial, timeout) 353 354 elif method == luxi.REQ_QUERY: 355 (what, fields, qfilter) = args 356 357 if what in constants.QR_VIA_OP: 358 result = self._Query(opcodes.OpQuery(what=what, fields=fields, 359 qfilter=qfilter)) 360 elif what == constants.QR_LOCK: 361 if qfilter is not None: 362 raise errors.OpPrereqError("Lock queries can't be filtered", 363 errors.ECODE_INVAL) 364 return context.glm.QueryLocks(fields) 365 elif what == constants.QR_JOB: 366 return queue.QueryJobs(fields, qfilter) 367 elif what in constants.QR_VIA_LUXI: 368 raise NotImplementedError 369 else: 370 raise errors.OpPrereqError("Resource type '%s' unknown" % what, 371 errors.ECODE_INVAL) 372 373 return result 374 375 elif method == luxi.REQ_QUERY_FIELDS: 376 (what, fields) = args 377 req = objects.QueryFieldsRequest(what=what, fields=fields) 378 379 try: 380 fielddefs = query.ALL_FIELDS[req.what] 381 except KeyError: 382 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, 383 errors.ECODE_INVAL) 384 385 return query.QueryFields(fielddefs, req.fields) 386 387 elif method == luxi.REQ_QUERY_JOBS: 388 (job_ids, fields) = args 389 if isinstance(job_ids, (tuple, list)) and job_ids: 390 msg = utils.CommaJoin(job_ids) 391 else: 392 msg = str(job_ids) 393 logging.info("Received job query request for %s", msg) 394 return queue.OldStyleQueryJobs(job_ids, fields) 395 396 elif method == luxi.REQ_QUERY_INSTANCES: 397 (names, fields, use_locking) = args 398 logging.info("Received instance query request for %s", names) 399 if use_locking: 400 raise errors.OpPrereqError("Sync queries are not allowed", 401 errors.ECODE_INVAL) 402 op = opcodes.OpInstanceQuery(names=names, output_fields=fields, 403 use_locking=use_locking) 404 return self._Query(op) 405 406 elif method == luxi.REQ_QUERY_NODES: 407 (names, fields, use_locking) = args 408 logging.info("Received node query request for %s", names) 409 if use_locking: 410 raise errors.OpPrereqError("Sync queries are not allowed", 411 errors.ECODE_INVAL) 412 op = opcodes.OpNodeQuery(names=names, output_fields=fields, 413 use_locking=use_locking) 414 return self._Query(op) 415 416 elif method == luxi.REQ_QUERY_GROUPS: 417 (names, fields, use_locking) = args 418 logging.info("Received group query request for %s", names) 419 if use_locking: 420 raise errors.OpPrereqError("Sync queries are not allowed", 421 errors.ECODE_INVAL) 422 op = opcodes.OpGroupQuery(names=names, output_fields=fields) 423 return self._Query(op) 424 425 elif method == luxi.REQ_QUERY_NETWORKS: 426 (names, fields, use_locking) = args 427 logging.info("Received network query request for %s", names) 428 if use_locking: 429 raise errors.OpPrereqError("Sync queries are not allowed", 430 errors.ECODE_INVAL) 431 op = opcodes.OpNetworkQuery(names=names, output_fields=fields) 432 return self._Query(op) 433 434 elif method == luxi.REQ_QUERY_EXPORTS: 435 (nodes, use_locking) = args 436 if use_locking: 437 raise errors.OpPrereqError("Sync queries are not allowed", 438 errors.ECODE_INVAL) 439 logging.info("Received exports query request") 440 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking) 441 return self._Query(op) 442 443 elif method == luxi.REQ_QUERY_CONFIG_VALUES: 444 (fields, ) = args 445 logging.info("Received config values query request for %s", fields) 446 op = opcodes.OpClusterConfigQuery(output_fields=fields) 447 return self._Query(op) 448 449 elif method == luxi.REQ_QUERY_CLUSTER_INFO: 450 logging.info("Received cluster info query request") 451 op = opcodes.OpClusterQuery() 452 return self._Query(op) 453 454 elif method == luxi.REQ_QUERY_TAGS: 455 (kind, name) = args 456 logging.info("Received tags query request") 457 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False) 458 return self._Query(op) 459 460 elif method == luxi.REQ_SET_DRAIN_FLAG: 461 (drain_flag, ) = args 462 logging.info("Received queue drain flag change request to %s", 463 drain_flag) 464 return queue.SetDrainFlag(drain_flag) 465 466 elif method == luxi.REQ_SET_WATCHER_PAUSE: 467 (until, ) = args 468 469 return _SetWatcherPause(context, until) 470 471 else: 472 logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method) 473 raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL," 474 " but not implemented" % method)
475
476 - def _Query(self, op):
477 """Runs the specified opcode and returns the result. 478 479 """ 480 # Queries don't have a job id 481 proc = mcpu.Processor(self.server.context, None, enable_locks=False) 482 483 # TODO: Executing an opcode using locks will acquire them in blocking mode. 484 # Consider using a timeout for retries. 485 return proc.ExecOpCode(op, None)
486
487 488 -class GanetiContext(object):
489 """Context common to all ganeti threads. 490 491 This class creates and holds common objects shared by all threads. 492 493 """ 494 # pylint: disable=W0212 495 # we do want to ensure a singleton here 496 _instance = None 497
498 - def __init__(self):
499 """Constructs a new GanetiContext object. 500 501 There should be only a GanetiContext object at any time, so this 502 function raises an error if this is not the case. 503 504 """ 505 assert self.__class__._instance is None, "double GanetiContext instance" 506 507 # Create global configuration object 508 self.cfg = config.ConfigWriter() 509 510 # Locking manager 511 self.glm = locking.GanetiLockManager( 512 self.cfg.GetNodeList(), 513 self.cfg.GetNodeGroupList(), 514 [inst.name for inst in self.cfg.GetAllInstancesInfo().values()], 515 self.cfg.GetNetworkList()) 516 517 self.cfg.SetContext(self) 518 519 # RPC runner 520 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor) 521 522 # Job queue 523 self.jobqueue = jqueue.JobQueue(self) 524 525 # setting this also locks the class against attribute modifications 526 self.__class__._instance = self
527
528 - def __setattr__(self, name, value):
529 """Setting GanetiContext attributes is forbidden after initialization. 530 531 """ 532 assert self.__class__._instance is None, "Attempt to modify Ganeti Context" 533 object.__setattr__(self, name, value)
534
535 - def AddNode(self, node, ec_id):
536 """Adds a node to the configuration and lock manager. 537 538 """ 539 # Add it to the configuration 540 self.cfg.AddNode(node, ec_id) 541 542 # If preseeding fails it'll not be added 543 self.jobqueue.AddNode(node) 544 545 # Add the new node to the Ganeti Lock Manager 546 self.glm.add(locking.LEVEL_NODE, node.uuid) 547 self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
548
549 - def ReaddNode(self, node):
550 """Updates a node that's already in the configuration 551 552 """ 553 # Synchronize the queue again 554 self.jobqueue.AddNode(node)
555
556 - def RemoveNode(self, node):
557 """Removes a node from the configuration and lock manager. 558 559 """ 560 # Remove node from configuration 561 self.cfg.RemoveNode(node.uuid) 562 563 # Notify job queue 564 self.jobqueue.RemoveNode(node.name) 565 566 # Remove the node from the Ganeti Lock Manager 567 self.glm.remove(locking.LEVEL_NODE, node.uuid) 568 self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
569
570 571 -def _SetWatcherPause(context, until):
572 """Creates or removes the watcher pause file. 573 574 @type context: L{GanetiContext} 575 @param context: Global Ganeti context 576 @type until: None or int 577 @param until: Unix timestamp saying until when the watcher shouldn't run 578 579 """ 580 node_names = context.cfg.GetNodeList() 581 582 if until is None: 583 logging.info("Received request to no longer pause watcher") 584 else: 585 if not ht.TNumber(until): 586 raise TypeError("Duration must be numeric") 587 588 if until < time.time(): 589 raise errors.GenericError("Unable to set pause end time in the past") 590 591 logging.info("Received request to pause watcher until %s", until) 592 593 result = context.rpc.call_set_watcher_pause(node_names, until) 594 595 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg) 596 for (node_name, nres) in result.items() 597 if nres.fail_msg and not nres.offline) 598 if errmsg: 599 raise errors.OpExecError("Watcher pause was set where possible, but failed" 600 " on the following node(s): %s" % errmsg) 601 602 return until
603
604 605 @rpc.RunWithRPC 606 -def CheckAgreement():
607 """Check the agreement on who is the master. 608 609 The function uses a very simple algorithm: we must get more positive 610 than negative answers. Since in most of the cases we are the master, 611 we'll use our own config file for getting the node list. In the 612 future we could collect the current node list from our (possibly 613 obsolete) known nodes. 614 615 In order to account for cold-start of all nodes, we retry for up to 616 a minute until we get a real answer as the top-voted one. If the 617 nodes are more out-of-sync, for now manual startup of the master 618 should be attempted. 619 620 Note that for a even number of nodes cluster, we need at least half 621 of the nodes (beside ourselves) to vote for us. This creates a 622 problem on two-node clusters, since in this case we require the 623 other node to be up too to confirm our status. 624 625 """ 626 myself = netutils.Hostname.GetSysName() 627 #temp instantiation of a config writer, used only to get the node list 628 cfg = config.ConfigWriter() 629 node_names = cfg.GetNodeNames(cfg.GetNodeList()) 630 del cfg 631 retries = 6 632 while retries > 0: 633 votes = bootstrap.GatherMasterVotes(node_names) 634 if not votes: 635 # empty node list, this is a one node cluster 636 return True 637 if votes[0][0] is None: 638 retries -= 1 639 time.sleep(10) 640 continue 641 break 642 if retries == 0: 643 logging.critical("Cluster inconsistent, most of the nodes didn't answer" 644 " after multiple retries. Aborting startup") 645 logging.critical("Use the --no-voting option if you understand what" 646 " effects it has on the cluster state") 647 return False 648 # here a real node is at the top of the list 649 all_votes = sum(item[1] for item in votes) 650 top_node, top_votes = votes[0] 651 652 result = False 653 if top_node != myself: 654 logging.critical("It seems we are not the master (top-voted node" 655 " is %s with %d out of %d votes)", top_node, top_votes, 656 all_votes) 657 elif top_votes < all_votes - top_votes: 658 logging.critical("It seems we are not the master (%d votes for," 659 " %d votes against)", top_votes, all_votes - top_votes) 660 else: 661 result = True 662 663 return result
664
665 666 @rpc.RunWithRPC 667 -def ActivateMasterIP():
668 # activate ip 669 cfg = config.ConfigWriter() 670 master_params = cfg.GetMasterNetworkParameters() 671 ems = cfg.GetUseExternalMipScript() 672 runner = rpc.BootstrapRunner() 673 # we use the node name, as the configuration is only available here yet 674 result = runner.call_node_activate_master_ip( 675 cfg.GetNodeName(master_params.uuid), master_params, ems) 676 677 msg = result.fail_msg 678 if msg: 679 logging.error("Can't activate master IP address: %s", msg)
680
681 682 -def CheckMasterd(options, args):
683 """Initial checks whether to run or exit with a failure. 684 685 """ 686 if args: # masterd doesn't take any arguments 687 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0]) 688 sys.exit(constants.EXIT_FAILURE) 689 690 ssconf.CheckMaster(options.debug) 691 692 try: 693 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid 694 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid 695 except KeyError: 696 print >> sys.stderr, ("User or group not existing on system: %s:%s" % 697 (constants.MASTERD_USER, constants.DAEMONS_GROUP)) 698 sys.exit(constants.EXIT_FAILURE) 699 700 # Determine static runtime architecture information 701 runtime.InitArchInfo() 702 703 # Check the configuration is sane before anything else 704 try: 705 config.ConfigWriter() 706 except errors.ConfigVersionMismatch, err: 707 v1 = "%s.%s.%s" % version.SplitVersion(err.args[0]) 708 v2 = "%s.%s.%s" % version.SplitVersion(err.args[1]) 709 print >> sys.stderr, \ 710 ("Configuration version mismatch. The current Ganeti software" 711 " expects version %s, but the on-disk configuration file has" 712 " version %s. This is likely the result of upgrading the" 713 " software without running the upgrade procedure. Please contact" 714 " your cluster administrator or complete the upgrade using the" 715 " cfgupgrade utility, after reading the upgrade notes." % 716 (v1, v2)) 717 sys.exit(constants.EXIT_FAILURE) 718 except errors.ConfigurationError, err: 719 print >> sys.stderr, \ 720 ("Configuration error while opening the configuration file: %s\n" 721 "This might be caused by an incomplete software upgrade or" 722 " by a corrupted configuration file. Until the problem is fixed" 723 " the master daemon cannot start." % str(err)) 724 sys.exit(constants.EXIT_FAILURE) 725 726 # If CheckMaster didn't fail we believe we are the master, but we have to 727 # confirm with the other nodes. 728 if options.no_voting: 729 if not options.yes_do_it: 730 sys.stdout.write("The 'no voting' option has been selected.\n") 731 sys.stdout.write("This is dangerous, please confirm by" 732 " typing uppercase 'yes': ") 733 sys.stdout.flush() 734 735 confirmation = sys.stdin.readline().strip() 736 if confirmation != "YES": 737 print >> sys.stderr, "Aborting." 738 sys.exit(constants.EXIT_FAILURE) 739 740 else: 741 # CheckAgreement uses RPC and threads, hence it needs to be run in 742 # a separate process before we call utils.Daemonize in the current 743 # process. 744 if not utils.RunInSeparateProcess(CheckAgreement): 745 sys.exit(constants.EXIT_FAILURE) 746 747 # ActivateMasterIP also uses RPC/threads, so we run it again via a 748 # separate process. 749 750 # TODO: decide whether failure to activate the master IP is a fatal error 751 utils.RunInSeparateProcess(ActivateMasterIP)
752
753 754 -def PrepMasterd(options, _):
755 """Prep master daemon function, executed with the PID file held. 756 757 """ 758 # This is safe to do as the pid file guarantees against 759 # concurrent execution. 760 utils.RemoveFile(pathutils.MASTER_SOCKET) 761 762 mainloop = daemon.Mainloop() 763 master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid) 764 return (mainloop, master)
765
766 767 -def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
768 """Main master daemon function, executed with the PID file held. 769 770 """ 771 (mainloop, master) = prep_data 772 try: 773 rpc.Init() 774 try: 775 master.setup_queue() 776 try: 777 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown) 778 finally: 779 master.server_cleanup() 780 finally: 781 rpc.Shutdown() 782 finally: 783 utils.RemoveFile(pathutils.MASTER_SOCKET) 784 785 logging.info("Clean master daemon shutdown") 786
787 788 -def Main():
789 """Main function""" 790 parser = OptionParser(description="Ganeti master daemon", 791 usage="%prog [-f] [-d]", 792 version="%%prog (ganeti) %s" % 793 constants.RELEASE_VERSION) 794 parser.add_option("--no-voting", dest="no_voting", 795 help="Do not check that the nodes agree on this node" 796 " being the master and start the daemon unconditionally", 797 default=False, action="store_true") 798 parser.add_option("--yes-do-it", dest="yes_do_it", 799 help="Override interactive check for --no-voting", 800 default=False, action="store_true") 801 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd, 802 ExecMasterd, multithreaded=True)
803