Package ganeti :: Package server :: Module masterd
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.masterd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Master daemon program. 
 23   
 24  Some classes deviates from the standard style guide since the 
 25  inheritance from parent classes requires it. 
 26   
 27  """ 
 28   
 29  # pylint: disable-msg=C0103 
 30  # C0103: Invalid name ganeti-masterd 
 31   
 32  import grp 
 33  import os 
 34  import pwd 
 35  import sys 
 36  import socket 
 37  import time 
 38  import tempfile 
 39  import logging 
 40   
 41  from optparse import OptionParser 
 42   
 43  from ganeti import config 
 44  from ganeti import constants 
 45  from ganeti import daemon 
 46  from ganeti import mcpu 
 47  from ganeti import opcodes 
 48  from ganeti import jqueue 
 49  from ganeti import locking 
 50  from ganeti import luxi 
 51  from ganeti import utils 
 52  from ganeti import errors 
 53  from ganeti import ssconf 
 54  from ganeti import workerpool 
 55  from ganeti import rpc 
 56  from ganeti import bootstrap 
 57  from ganeti import netutils 
 58   
 59   
 60  CLIENT_REQUEST_WORKERS = 16 
 61   
 62  EXIT_NOTMASTER = constants.EXIT_NOTMASTER 
 63  EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR 
64 65 66 -class ClientRequestWorker(workerpool.BaseWorker):
67 # pylint: disable-msg=W0221
68 - def RunTask(self, server, message, client):
69 """Process the request. 70 71 """ 72 client_ops = ClientOps(server) 73 74 try: 75 (method, args, version) = luxi.ParseRequest(message) 76 except luxi.ProtocolError, err: 77 logging.error("Protocol Error: %s", err) 78 client.close_log() 79 return 80 81 success = False 82 try: 83 # Verify client's version if there was one in the request 84 if version is not None and version != constants.LUXI_VERSION: 85 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % 86 (constants.LUXI_VERSION, version)) 87 88 result = client_ops.handle_request(method, args) 89 success = True 90 except errors.GenericError, err: 91 logging.exception("Unexpected exception") 92 success = False 93 result = errors.EncodeException(err) 94 except: 95 logging.exception("Unexpected exception") 96 err = sys.exc_info() 97 result = "Caught exception: %s" % str(err[1]) 98 99 try: 100 reply = luxi.FormatResponse(success, result) 101 client.send_message(reply) 102 # awake the main thread so that it can write out the data. 103 server.awaker.signal() 104 except: # pylint: disable-msg=W0702 105 logging.exception("Send error") 106 client.close_log()
107
108 109 -class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
110 """Handler for master peers. 111 112 """ 113 _MAX_UNHANDLED = 1
114 - def __init__(self, server, connected_socket, client_address, family):
115 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, 116 client_address, 117 constants.LUXI_EOM, 118 family, self._MAX_UNHANDLED) 119 self.server = server
120
121 - def handle_message(self, message, _):
122 self.server.request_workers.AddTask((self.server, message, self))
123
124 125 -class MasterServer(daemon.AsyncStreamServer):
126 """Master Server. 127 128 This is the main asynchronous master server. It handles connections to the 129 master socket. 130 131 """ 132 family = socket.AF_UNIX 133
134 - def __init__(self, mainloop, address, uid, gid):
135 """MasterServer constructor 136 137 @type mainloop: ganeti.daemon.Mainloop 138 @param mainloop: Mainloop used to poll for I/O events 139 @param address: the unix socket address to bind the MasterServer to 140 @param uid: The uid of the owner of the socket 141 @param gid: The gid of the owner of the socket 142 143 """ 144 temp_name = tempfile.mktemp(dir=os.path.dirname(address)) 145 daemon.AsyncStreamServer.__init__(self, self.family, temp_name) 146 os.chmod(temp_name, 0770) 147 os.chown(temp_name, uid, gid) 148 os.rename(temp_name, address) 149 150 self.mainloop = mainloop 151 self.awaker = daemon.AsyncAwaker() 152 153 # We'll only start threads once we've forked. 154 self.context = None 155 self.request_workers = None
156
157 - def handle_connection(self, connected_socket, client_address):
158 # TODO: add connection count and limit the number of open connections to a 159 # maximum number to avoid breaking for lack of file descriptors or memory. 160 MasterClientHandler(self, connected_socket, client_address, self.family)
161
162 - def setup_queue(self):
163 self.context = GanetiContext() 164 self.request_workers = workerpool.WorkerPool("ClientReq", 165 CLIENT_REQUEST_WORKERS, 166 ClientRequestWorker)
167
168 - def server_cleanup(self):
169 """Cleanup the server. 170 171 This involves shutting down the processor threads and the master 172 socket. 173 174 """ 175 try: 176 self.close() 177 finally: 178 if self.request_workers: 179 self.request_workers.TerminateWorkers() 180 if self.context: 181 self.context.jobqueue.Shutdown()
182
183 184 -class ClientOps:
185 """Class holding high-level client operations."""
186 - def __init__(self, server):
187 self.server = server
188
189 - def handle_request(self, method, args): # pylint: disable-msg=R0911
190 queue = self.server.context.jobqueue 191 192 # TODO: Parameter validation 193 194 # TODO: Rewrite to not exit in each 'if/elif' branch 195 196 if method == luxi.REQ_SUBMIT_JOB: 197 logging.info("Received new job") 198 ops = [opcodes.OpCode.LoadOpCode(state) for state in args] 199 return queue.SubmitJob(ops) 200 201 if method == luxi.REQ_SUBMIT_MANY_JOBS: 202 logging.info("Received multiple jobs") 203 jobs = [] 204 for ops in args: 205 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) 206 return queue.SubmitManyJobs(jobs) 207 208 elif method == luxi.REQ_CANCEL_JOB: 209 job_id = args 210 logging.info("Received job cancel request for %s", job_id) 211 return queue.CancelJob(job_id) 212 213 elif method == luxi.REQ_ARCHIVE_JOB: 214 job_id = args 215 logging.info("Received job archive request for %s", job_id) 216 return queue.ArchiveJob(job_id) 217 218 elif method == luxi.REQ_AUTOARCHIVE_JOBS: 219 (age, timeout) = args 220 logging.info("Received job autoarchive request for age %s, timeout %s", 221 age, timeout) 222 return queue.AutoArchiveJobs(age, timeout) 223 224 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: 225 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args 226 logging.info("Received job poll request for %s", job_id) 227 return queue.WaitForJobChanges(job_id, fields, prev_job_info, 228 prev_log_serial, timeout) 229 230 elif method == luxi.REQ_QUERY_JOBS: 231 (job_ids, fields) = args 232 if isinstance(job_ids, (tuple, list)) and job_ids: 233 msg = utils.CommaJoin(job_ids) 234 else: 235 msg = str(job_ids) 236 logging.info("Received job query request for %s", msg) 237 return queue.QueryJobs(job_ids, fields) 238 239 elif method == luxi.REQ_QUERY_INSTANCES: 240 (names, fields, use_locking) = args 241 logging.info("Received instance query request for %s", names) 242 if use_locking: 243 raise errors.OpPrereqError("Sync queries are not allowed", 244 errors.ECODE_INVAL) 245 op = opcodes.OpQueryInstances(names=names, output_fields=fields, 246 use_locking=use_locking) 247 return self._Query(op) 248 249 elif method == luxi.REQ_QUERY_NODES: 250 (names, fields, use_locking) = args 251 logging.info("Received node query request for %s", names) 252 if use_locking: 253 raise errors.OpPrereqError("Sync queries are not allowed", 254 errors.ECODE_INVAL) 255 op = opcodes.OpQueryNodes(names=names, output_fields=fields, 256 use_locking=use_locking) 257 return self._Query(op) 258 259 elif method == luxi.REQ_QUERY_EXPORTS: 260 nodes, use_locking = args 261 if use_locking: 262 raise errors.OpPrereqError("Sync queries are not allowed", 263 errors.ECODE_INVAL) 264 logging.info("Received exports query request") 265 op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking) 266 return self._Query(op) 267 268 elif method == luxi.REQ_QUERY_CONFIG_VALUES: 269 fields = args 270 logging.info("Received config values query request for %s", fields) 271 op = opcodes.OpQueryConfigValues(output_fields=fields) 272 return self._Query(op) 273 274 elif method == luxi.REQ_QUERY_CLUSTER_INFO: 275 logging.info("Received cluster info query request") 276 op = opcodes.OpQueryClusterInfo() 277 return self._Query(op) 278 279 elif method == luxi.REQ_QUERY_TAGS: 280 kind, name = args 281 logging.info("Received tags query request") 282 op = opcodes.OpGetTags(kind=kind, name=name) 283 return self._Query(op) 284 285 elif method == luxi.REQ_QUERY_LOCKS: 286 (fields, sync) = args 287 logging.info("Received locks query request") 288 return self.server.context.glm.QueryLocks(fields, sync) 289 290 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG: 291 drain_flag = args 292 logging.info("Received queue drain flag change request to %s", 293 drain_flag) 294 return queue.SetDrainFlag(drain_flag) 295 296 elif method == luxi.REQ_SET_WATCHER_PAUSE: 297 (until, ) = args 298 299 if until is None: 300 logging.info("Received request to no longer pause the watcher") 301 else: 302 if not isinstance(until, (int, float)): 303 raise TypeError("Duration must be an integer or float") 304 305 if until < time.time(): 306 raise errors.GenericError("Unable to set pause end time in the past") 307 308 logging.info("Received request to pause the watcher until %s", until) 309 310 return _SetWatcherPause(until) 311 312 else: 313 logging.info("Received invalid request '%s'", method) 314 raise ValueError("Invalid operation '%s'" % method)
315
316 - def _Query(self, op):
317 """Runs the specified opcode and returns the result. 318 319 """ 320 # Queries don't have a job id 321 proc = mcpu.Processor(self.server.context, None) 322 323 # TODO: Executing an opcode using locks will acquire them in blocking mode. 324 # Consider using a timeout for retries. 325 return proc.ExecOpCode(op, None)
326
327 328 -class GanetiContext(object):
329 """Context common to all ganeti threads. 330 331 This class creates and holds common objects shared by all threads. 332 333 """ 334 # pylint: disable-msg=W0212 335 # we do want to ensure a singleton here 336 _instance = None 337
338 - def __init__(self):
339 """Constructs a new GanetiContext object. 340 341 There should be only a GanetiContext object at any time, so this 342 function raises an error if this is not the case. 343 344 """ 345 assert self.__class__._instance is None, "double GanetiContext instance" 346 347 # Create global configuration object 348 self.cfg = config.ConfigWriter() 349 350 # Locking manager 351 self.glm = locking.GanetiLockManager( 352 self.cfg.GetNodeList(), 353 self.cfg.GetInstanceList()) 354 355 # Job queue 356 self.jobqueue = jqueue.JobQueue(self) 357 358 # setting this also locks the class against attribute modifications 359 self.__class__._instance = self
360
361 - def __setattr__(self, name, value):
362 """Setting GanetiContext attributes is forbidden after initialization. 363 364 """ 365 assert self.__class__._instance is None, "Attempt to modify Ganeti Context" 366 object.__setattr__(self, name, value)
367
368 - def AddNode(self, node, ec_id):
369 """Adds a node to the configuration and lock manager. 370 371 """ 372 # Add it to the configuration 373 self.cfg.AddNode(node, ec_id) 374 375 # If preseeding fails it'll not be added 376 self.jobqueue.AddNode(node) 377 378 # Add the new node to the Ganeti Lock Manager 379 self.glm.add(locking.LEVEL_NODE, node.name)
380
381 - def ReaddNode(self, node):
382 """Updates a node that's already in the configuration 383 384 """ 385 # Synchronize the queue again 386 self.jobqueue.AddNode(node)
387
388 - def RemoveNode(self, name):
389 """Removes a node from the configuration and lock manager. 390 391 """ 392 # Remove node from configuration 393 self.cfg.RemoveNode(name) 394 395 # Notify job queue 396 self.jobqueue.RemoveNode(name) 397 398 # Remove the node from the Ganeti Lock Manager 399 self.glm.remove(locking.LEVEL_NODE, name)
400
401 402 -def _SetWatcherPause(until):
403 """Creates or removes the watcher pause file. 404 405 @type until: None or int 406 @param until: Unix timestamp saying until when the watcher shouldn't run 407 408 """ 409 if until is None: 410 utils.RemoveFile(constants.WATCHER_PAUSEFILE) 411 else: 412 utils.WriteFile(constants.WATCHER_PAUSEFILE, 413 data="%d\n" % (until, )) 414 415 return until
416
417 418 @rpc.RunWithRPC 419 -def CheckAgreement():
420 """Check the agreement on who is the master. 421 422 The function uses a very simple algorithm: we must get more positive 423 than negative answers. Since in most of the cases we are the master, 424 we'll use our own config file for getting the node list. In the 425 future we could collect the current node list from our (possibly 426 obsolete) known nodes. 427 428 In order to account for cold-start of all nodes, we retry for up to 429 a minute until we get a real answer as the top-voted one. If the 430 nodes are more out-of-sync, for now manual startup of the master 431 should be attempted. 432 433 Note that for a even number of nodes cluster, we need at least half 434 of the nodes (beside ourselves) to vote for us. This creates a 435 problem on two-node clusters, since in this case we require the 436 other node to be up too to confirm our status. 437 438 """ 439 myself = netutils.Hostname.GetSysName() 440 #temp instantiation of a config writer, used only to get the node list 441 cfg = config.ConfigWriter() 442 node_list = cfg.GetNodeList() 443 del cfg 444 retries = 6 445 while retries > 0: 446 votes = bootstrap.GatherMasterVotes(node_list) 447 if not votes: 448 # empty node list, this is a one node cluster 449 return True 450 if votes[0][0] is None: 451 retries -= 1 452 time.sleep(10) 453 continue 454 break 455 if retries == 0: 456 logging.critical("Cluster inconsistent, most of the nodes didn't answer" 457 " after multiple retries. Aborting startup") 458 logging.critical("Use the --no-voting option if you understand what" 459 " effects it has on the cluster state") 460 return False 461 # here a real node is at the top of the list 462 all_votes = sum(item[1] for item in votes) 463 top_node, top_votes = votes[0] 464 465 result = False 466 if top_node != myself: 467 logging.critical("It seems we are not the master (top-voted node" 468 " is %s with %d out of %d votes)", top_node, top_votes, 469 all_votes) 470 elif top_votes < all_votes - top_votes: 471 logging.critical("It seems we are not the master (%d votes for," 472 " %d votes against)", top_votes, all_votes - top_votes) 473 else: 474 result = True 475 476 return result
477
478 479 @rpc.RunWithRPC 480 -def ActivateMasterIP():
481 # activate ip 482 master_node = ssconf.SimpleStore().GetMasterNode() 483 result = rpc.RpcRunner.call_node_start_master(master_node, False, False) 484 msg = result.fail_msg 485 if msg: 486 logging.error("Can't activate master IP address: %s", msg)
487
488 489 -def CheckMasterd(options, args):
490 """Initial checks whether to run or exit with a failure. 491 492 """ 493 if args: # masterd doesn't take any arguments 494 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0]) 495 sys.exit(constants.EXIT_FAILURE) 496 497 ssconf.CheckMaster(options.debug) 498 499 try: 500 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid 501 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid 502 except KeyError: 503 print >> sys.stderr, ("User or group not existing on system: %s:%s" % 504 (constants.MASTERD_USER, constants.DAEMONS_GROUP)) 505 sys.exit(constants.EXIT_FAILURE) 506 507 # Check the configuration is sane before anything else 508 try: 509 config.ConfigWriter() 510 except errors.ConfigVersionMismatch, err: 511 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0]) 512 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1]) 513 print >> sys.stderr, \ 514 ("Configuration version mismatch. The current Ganeti software" 515 " expects version %s, but the on-disk configuration file has" 516 " version %s. This is likely the result of upgrading the" 517 " software without running the upgrade procedure. Please contact" 518 " your cluster administrator or complete the upgrade using the" 519 " cfgupgrade utility, after reading the upgrade notes." % 520 (v1, v2)) 521 sys.exit(constants.EXIT_FAILURE) 522 except errors.ConfigurationError, err: 523 print >> sys.stderr, \ 524 ("Configuration error while opening the configuration file: %s\n" 525 "This might be caused by an incomplete software upgrade or" 526 " by a corrupted configuration file. Until the problem is fixed" 527 " the master daemon cannot start." % str(err)) 528 sys.exit(constants.EXIT_FAILURE) 529 530 # If CheckMaster didn't fail we believe we are the master, but we have to 531 # confirm with the other nodes. 532 if options.no_voting: 533 if options.yes_do_it: 534 return 535 536 sys.stdout.write("The 'no voting' option has been selected.\n") 537 sys.stdout.write("This is dangerous, please confirm by" 538 " typing uppercase 'yes': ") 539 sys.stdout.flush() 540 541 confirmation = sys.stdin.readline().strip() 542 if confirmation != "YES": 543 print >> sys.stderr, "Aborting." 544 sys.exit(constants.EXIT_FAILURE) 545 546 return 547 548 # CheckAgreement uses RPC and threads, hence it needs to be run in a separate 549 # process before we call utils.Daemonize in the current process. 550 if not utils.RunInSeparateProcess(CheckAgreement): 551 sys.exit(constants.EXIT_FAILURE) 552 553 # ActivateMasterIP also uses RPC/threads, so we run it again via a 554 # separate process. 555 556 # TODO: decide whether failure to activate the master IP is a fatal error 557 utils.RunInSeparateProcess(ActivateMasterIP)
558
559 560 -def PrepMasterd(options, _):
561 """Prep master daemon function, executed with the PID file held. 562 563 """ 564 # This is safe to do as the pid file guarantees against 565 # concurrent execution. 566 utils.RemoveFile(constants.MASTER_SOCKET) 567 568 mainloop = daemon.Mainloop() 569 master = MasterServer(mainloop, constants.MASTER_SOCKET, 570 options.uid, options.gid) 571 return (mainloop, master)
572
573 574 -def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
575 """Main master daemon function, executed with the PID file held. 576 577 """ 578 (mainloop, master) = prep_data 579 try: 580 rpc.Init() 581 try: 582 master.setup_queue() 583 try: 584 mainloop.Run() 585 finally: 586 master.server_cleanup() 587 finally: 588 rpc.Shutdown() 589 finally: 590 utils.RemoveFile(constants.MASTER_SOCKET) 591
592 593 -def Main():
594 """Main function""" 595 parser = OptionParser(description="Ganeti master daemon", 596 usage="%prog [-f] [-d]", 597 version="%%prog (ganeti) %s" % 598 constants.RELEASE_VERSION) 599 parser.add_option("--no-voting", dest="no_voting", 600 help="Do not check that the nodes agree on this node" 601 " being the master and start the daemon unconditionally", 602 default=False, action="store_true") 603 parser.add_option("--yes-do-it", dest="yes_do_it", 604 help="Override interactive check for --no-voting", 605 default=False, action="store_true") 606 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd, 607 ExecMasterd, multithreaded=True)
608