Package ganeti :: Package server :: Module masterd
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.masterd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Master daemon program. 
 23   
 24  Some classes deviates from the standard style guide since the 
 25  inheritance from parent classes requires it. 
 26   
 27  """ 
 28   
 29  # pylint: disable-msg=C0103 
 30  # C0103: Invalid name ganeti-masterd 
 31   
 32  import grp 
 33  import os 
 34  import pwd 
 35  import sys 
 36  import socket 
 37  import time 
 38  import tempfile 
 39  import logging 
 40   
 41  from optparse import OptionParser 
 42   
 43  from ganeti import config 
 44  from ganeti import constants 
 45  from ganeti import daemon 
 46  from ganeti import mcpu 
 47  from ganeti import opcodes 
 48  from ganeti import jqueue 
 49  from ganeti import locking 
 50  from ganeti import luxi 
 51  from ganeti import utils 
 52  from ganeti import errors 
 53  from ganeti import ssconf 
 54  from ganeti import workerpool 
 55  from ganeti import rpc 
 56  from ganeti import bootstrap 
 57  from ganeti import netutils 
 58  from ganeti import objects 
 59  from ganeti import query 
 60   
 61   
 62  CLIENT_REQUEST_WORKERS = 16 
 63   
 64  EXIT_NOTMASTER = constants.EXIT_NOTMASTER 
 65  EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR 
66 67 68 -class ClientRequestWorker(workerpool.BaseWorker):
69 # pylint: disable-msg=W0221
70 - def RunTask(self, server, message, client):
71 """Process the request. 72 73 """ 74 client_ops = ClientOps(server) 75 76 try: 77 (method, args, version) = luxi.ParseRequest(message) 78 except luxi.ProtocolError, err: 79 logging.error("Protocol Error: %s", err) 80 client.close_log() 81 return 82 83 success = False 84 try: 85 # Verify client's version if there was one in the request 86 if version is not None and version != constants.LUXI_VERSION: 87 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % 88 (constants.LUXI_VERSION, version)) 89 90 result = client_ops.handle_request(method, args) 91 success = True 92 except errors.GenericError, err: 93 logging.exception("Unexpected exception") 94 success = False 95 result = errors.EncodeException(err) 96 except: 97 logging.exception("Unexpected exception") 98 err = sys.exc_info() 99 result = "Caught exception: %s" % str(err[1]) 100 101 try: 102 reply = luxi.FormatResponse(success, result) 103 client.send_message(reply) 104 # awake the main thread so that it can write out the data. 105 server.awaker.signal() 106 except: # pylint: disable-msg=W0702 107 logging.exception("Send error") 108 client.close_log()
109
110 111 -class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112 """Handler for master peers. 113 114 """ 115 _MAX_UNHANDLED = 1
116 - def __init__(self, server, connected_socket, client_address, family):
117 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, 118 client_address, 119 constants.LUXI_EOM, 120 family, self._MAX_UNHANDLED) 121 self.server = server
122
123 - def handle_message(self, message, _):
124 self.server.request_workers.AddTask((self.server, message, self))
125
126 127 -class MasterServer(daemon.AsyncStreamServer):
128 """Master Server. 129 130 This is the main asynchronous master server. It handles connections to the 131 master socket. 132 133 """ 134 family = socket.AF_UNIX 135
136 - def __init__(self, mainloop, address, uid, gid):
137 """MasterServer constructor 138 139 @type mainloop: ganeti.daemon.Mainloop 140 @param mainloop: Mainloop used to poll for I/O events 141 @param address: the unix socket address to bind the MasterServer to 142 @param uid: The uid of the owner of the socket 143 @param gid: The gid of the owner of the socket 144 145 """ 146 temp_name = tempfile.mktemp(dir=os.path.dirname(address)) 147 daemon.AsyncStreamServer.__init__(self, self.family, temp_name) 148 os.chmod(temp_name, 0770) 149 os.chown(temp_name, uid, gid) 150 os.rename(temp_name, address) 151 152 self.mainloop = mainloop 153 self.awaker = daemon.AsyncAwaker() 154 155 # We'll only start threads once we've forked. 156 self.context = None 157 self.request_workers = None
158
159 - def handle_connection(self, connected_socket, client_address):
160 # TODO: add connection count and limit the number of open connections to a 161 # maximum number to avoid breaking for lack of file descriptors or memory. 162 MasterClientHandler(self, connected_socket, client_address, self.family)
163
164 - def setup_queue(self):
165 self.context = GanetiContext() 166 self.request_workers = workerpool.WorkerPool("ClientReq", 167 CLIENT_REQUEST_WORKERS, 168 ClientRequestWorker)
169
170 - def server_cleanup(self):
171 """Cleanup the server. 172 173 This involves shutting down the processor threads and the master 174 socket. 175 176 """ 177 try: 178 self.close() 179 finally: 180 if self.request_workers: 181 self.request_workers.TerminateWorkers() 182 if self.context: 183 self.context.jobqueue.Shutdown()
184
185 186 -class ClientOps:
187 """Class holding high-level client operations."""
188 - def __init__(self, server):
189 self.server = server
190
191 - def handle_request(self, method, args): # pylint: disable-msg=R0911
192 queue = self.server.context.jobqueue 193 194 # TODO: Parameter validation 195 196 # TODO: Rewrite to not exit in each 'if/elif' branch 197 198 if method == luxi.REQ_SUBMIT_JOB: 199 logging.info("Received new job") 200 ops = [opcodes.OpCode.LoadOpCode(state) for state in args] 201 return queue.SubmitJob(ops) 202 203 if method == luxi.REQ_SUBMIT_MANY_JOBS: 204 logging.info("Received multiple jobs") 205 jobs = [] 206 for ops in args: 207 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) 208 return queue.SubmitManyJobs(jobs) 209 210 elif method == luxi.REQ_CANCEL_JOB: 211 job_id = args 212 logging.info("Received job cancel request for %s", job_id) 213 return queue.CancelJob(job_id) 214 215 elif method == luxi.REQ_ARCHIVE_JOB: 216 job_id = args 217 logging.info("Received job archive request for %s", job_id) 218 return queue.ArchiveJob(job_id) 219 220 elif method == luxi.REQ_AUTOARCHIVE_JOBS: 221 (age, timeout) = args 222 logging.info("Received job autoarchive request for age %s, timeout %s", 223 age, timeout) 224 return queue.AutoArchiveJobs(age, timeout) 225 226 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: 227 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args 228 logging.info("Received job poll request for %s", job_id) 229 return queue.WaitForJobChanges(job_id, fields, prev_job_info, 230 prev_log_serial, timeout) 231 232 elif method == luxi.REQ_QUERY: 233 req = objects.QueryRequest.FromDict(args) 234 235 if req.what in constants.QR_OP_QUERY: 236 result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields, 237 filter=req.filter)) 238 elif req.what == constants.QR_LOCK: 239 if req.filter is not None: 240 raise errors.OpPrereqError("Lock queries can't be filtered") 241 return self.server.context.glm.QueryLocks(req.fields) 242 elif req.what in constants.QR_OP_LUXI: 243 raise NotImplementedError 244 else: 245 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, 246 errors.ECODE_INVAL) 247 248 return result 249 250 elif method == luxi.REQ_QUERY_FIELDS: 251 req = objects.QueryFieldsRequest.FromDict(args) 252 253 if req.what in constants.QR_OP_QUERY: 254 result = self._Query(opcodes.OpQueryFields(what=req.what, 255 fields=req.fields)) 256 elif req.what == constants.QR_LOCK: 257 return query.QueryFields(query.LOCK_FIELDS, req.fields) 258 elif req.what in constants.QR_OP_LUXI: 259 raise NotImplementedError 260 else: 261 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, 262 errors.ECODE_INVAL) 263 264 return result 265 266 elif method == luxi.REQ_QUERY_JOBS: 267 (job_ids, fields) = args 268 if isinstance(job_ids, (tuple, list)) and job_ids: 269 msg = utils.CommaJoin(job_ids) 270 else: 271 msg = str(job_ids) 272 logging.info("Received job query request for %s", msg) 273 return queue.QueryJobs(job_ids, fields) 274 275 elif method == luxi.REQ_QUERY_INSTANCES: 276 (names, fields, use_locking) = args 277 logging.info("Received instance query request for %s", names) 278 if use_locking: 279 raise errors.OpPrereqError("Sync queries are not allowed", 280 errors.ECODE_INVAL) 281 op = opcodes.OpInstanceQuery(names=names, output_fields=fields, 282 use_locking=use_locking) 283 return self._Query(op) 284 285 elif method == luxi.REQ_QUERY_NODES: 286 (names, fields, use_locking) = args 287 logging.info("Received node query request for %s", names) 288 if use_locking: 289 raise errors.OpPrereqError("Sync queries are not allowed", 290 errors.ECODE_INVAL) 291 op = opcodes.OpNodeQuery(names=names, output_fields=fields, 292 use_locking=use_locking) 293 return self._Query(op) 294 295 elif method == luxi.REQ_QUERY_GROUPS: 296 (names, fields, use_locking) = args 297 logging.info("Received group query request for %s", names) 298 if use_locking: 299 raise errors.OpPrereqError("Sync queries are not allowed", 300 errors.ECODE_INVAL) 301 op = opcodes.OpGroupQuery(names=names, output_fields=fields) 302 return self._Query(op) 303 304 elif method == luxi.REQ_QUERY_EXPORTS: 305 nodes, use_locking = args 306 if use_locking: 307 raise errors.OpPrereqError("Sync queries are not allowed", 308 errors.ECODE_INVAL) 309 logging.info("Received exports query request") 310 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking) 311 return self._Query(op) 312 313 elif method == luxi.REQ_QUERY_CONFIG_VALUES: 314 fields = args 315 logging.info("Received config values query request for %s", fields) 316 op = opcodes.OpClusterConfigQuery(output_fields=fields) 317 return self._Query(op) 318 319 elif method == luxi.REQ_QUERY_CLUSTER_INFO: 320 logging.info("Received cluster info query request") 321 op = opcodes.OpClusterQuery() 322 return self._Query(op) 323 324 elif method == luxi.REQ_QUERY_TAGS: 325 kind, name = args 326 logging.info("Received tags query request") 327 op = opcodes.OpTagsGet(kind=kind, name=name) 328 return self._Query(op) 329 330 elif method == luxi.REQ_QUERY_LOCKS: 331 (fields, sync) = args 332 logging.info("Received locks query request") 333 if sync: 334 raise NotImplementedError("Synchronous queries are not implemented") 335 return self.server.context.glm.OldStyleQueryLocks(fields) 336 337 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG: 338 drain_flag = args 339 logging.info("Received queue drain flag change request to %s", 340 drain_flag) 341 return queue.SetDrainFlag(drain_flag) 342 343 elif method == luxi.REQ_SET_WATCHER_PAUSE: 344 (until, ) = args 345 346 if until is None: 347 logging.info("Received request to no longer pause the watcher") 348 else: 349 if not isinstance(until, (int, float)): 350 raise TypeError("Duration must be an integer or float") 351 352 if until < time.time(): 353 raise errors.GenericError("Unable to set pause end time in the past") 354 355 logging.info("Received request to pause the watcher until %s", until) 356 357 return _SetWatcherPause(until) 358 359 else: 360 logging.info("Received invalid request '%s'", method) 361 raise ValueError("Invalid operation '%s'" % method)
362
363 - def _Query(self, op):
364 """Runs the specified opcode and returns the result. 365 366 """ 367 # Queries don't have a job id 368 proc = mcpu.Processor(self.server.context, None) 369 370 # TODO: Executing an opcode using locks will acquire them in blocking mode. 371 # Consider using a timeout for retries. 372 return proc.ExecOpCode(op, None)
373
374 375 -class GanetiContext(object):
376 """Context common to all ganeti threads. 377 378 This class creates and holds common objects shared by all threads. 379 380 """ 381 # pylint: disable-msg=W0212 382 # we do want to ensure a singleton here 383 _instance = None 384
385 - def __init__(self):
386 """Constructs a new GanetiContext object. 387 388 There should be only a GanetiContext object at any time, so this 389 function raises an error if this is not the case. 390 391 """ 392 assert self.__class__._instance is None, "double GanetiContext instance" 393 394 # Create global configuration object 395 self.cfg = config.ConfigWriter() 396 397 # Locking manager 398 self.glm = locking.GanetiLockManager( 399 self.cfg.GetNodeList(), 400 self.cfg.GetNodeGroupList(), 401 self.cfg.GetInstanceList()) 402 403 # Job queue 404 self.jobqueue = jqueue.JobQueue(self) 405 406 # setting this also locks the class against attribute modifications 407 self.__class__._instance = self
408
409 - def __setattr__(self, name, value):
410 """Setting GanetiContext attributes is forbidden after initialization. 411 412 """ 413 assert self.__class__._instance is None, "Attempt to modify Ganeti Context" 414 object.__setattr__(self, name, value)
415
416 - def AddNode(self, node, ec_id):
417 """Adds a node to the configuration and lock manager. 418 419 """ 420 # Add it to the configuration 421 self.cfg.AddNode(node, ec_id) 422 423 # If preseeding fails it'll not be added 424 self.jobqueue.AddNode(node) 425 426 # Add the new node to the Ganeti Lock Manager 427 self.glm.add(locking.LEVEL_NODE, node.name)
428
429 - def ReaddNode(self, node):
430 """Updates a node that's already in the configuration 431 432 """ 433 # Synchronize the queue again 434 self.jobqueue.AddNode(node)
435
436 - def RemoveNode(self, name):
437 """Removes a node from the configuration and lock manager. 438 439 """ 440 # Remove node from configuration 441 self.cfg.RemoveNode(name) 442 443 # Notify job queue 444 self.jobqueue.RemoveNode(name) 445 446 # Remove the node from the Ganeti Lock Manager 447 self.glm.remove(locking.LEVEL_NODE, name)
448
449 450 -def _SetWatcherPause(until):
451 """Creates or removes the watcher pause file. 452 453 @type until: None or int 454 @param until: Unix timestamp saying until when the watcher shouldn't run 455 456 """ 457 if until is None: 458 utils.RemoveFile(constants.WATCHER_PAUSEFILE) 459 else: 460 utils.WriteFile(constants.WATCHER_PAUSEFILE, 461 data="%d\n" % (until, )) 462 463 return until
464
465 466 @rpc.RunWithRPC 467 -def CheckAgreement():
468 """Check the agreement on who is the master. 469 470 The function uses a very simple algorithm: we must get more positive 471 than negative answers. Since in most of the cases we are the master, 472 we'll use our own config file for getting the node list. In the 473 future we could collect the current node list from our (possibly 474 obsolete) known nodes. 475 476 In order to account for cold-start of all nodes, we retry for up to 477 a minute until we get a real answer as the top-voted one. If the 478 nodes are more out-of-sync, for now manual startup of the master 479 should be attempted. 480 481 Note that for a even number of nodes cluster, we need at least half 482 of the nodes (beside ourselves) to vote for us. This creates a 483 problem on two-node clusters, since in this case we require the 484 other node to be up too to confirm our status. 485 486 """ 487 myself = netutils.Hostname.GetSysName() 488 #temp instantiation of a config writer, used only to get the node list 489 cfg = config.ConfigWriter() 490 node_list = cfg.GetNodeList() 491 del cfg 492 retries = 6 493 while retries > 0: 494 votes = bootstrap.GatherMasterVotes(node_list) 495 if not votes: 496 # empty node list, this is a one node cluster 497 return True 498 if votes[0][0] is None: 499 retries -= 1 500 time.sleep(10) 501 continue 502 break 503 if retries == 0: 504 logging.critical("Cluster inconsistent, most of the nodes didn't answer" 505 " after multiple retries. Aborting startup") 506 logging.critical("Use the --no-voting option if you understand what" 507 " effects it has on the cluster state") 508 return False 509 # here a real node is at the top of the list 510 all_votes = sum(item[1] for item in votes) 511 top_node, top_votes = votes[0] 512 513 result = False 514 if top_node != myself: 515 logging.critical("It seems we are not the master (top-voted node" 516 " is %s with %d out of %d votes)", top_node, top_votes, 517 all_votes) 518 elif top_votes < all_votes - top_votes: 519 logging.critical("It seems we are not the master (%d votes for," 520 " %d votes against)", top_votes, all_votes - top_votes) 521 else: 522 result = True 523 524 return result
525
526 527 @rpc.RunWithRPC 528 -def ActivateMasterIP():
529 # activate ip 530 master_node = ssconf.SimpleStore().GetMasterNode() 531 result = rpc.RpcRunner.call_node_start_master(master_node, False, False) 532 msg = result.fail_msg 533 if msg: 534 logging.error("Can't activate master IP address: %s", msg)
535
536 537 -def CheckMasterd(options, args):
538 """Initial checks whether to run or exit with a failure. 539 540 """ 541 if args: # masterd doesn't take any arguments 542 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0]) 543 sys.exit(constants.EXIT_FAILURE) 544 545 ssconf.CheckMaster(options.debug) 546 547 try: 548 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid 549 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid 550 except KeyError: 551 print >> sys.stderr, ("User or group not existing on system: %s:%s" % 552 (constants.MASTERD_USER, constants.DAEMONS_GROUP)) 553 sys.exit(constants.EXIT_FAILURE) 554 555 # Check the configuration is sane before anything else 556 try: 557 config.ConfigWriter() 558 except errors.ConfigVersionMismatch, err: 559 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0]) 560 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1]) 561 print >> sys.stderr, \ 562 ("Configuration version mismatch. The current Ganeti software" 563 " expects version %s, but the on-disk configuration file has" 564 " version %s. This is likely the result of upgrading the" 565 " software without running the upgrade procedure. Please contact" 566 " your cluster administrator or complete the upgrade using the" 567 " cfgupgrade utility, after reading the upgrade notes." % 568 (v1, v2)) 569 sys.exit(constants.EXIT_FAILURE) 570 except errors.ConfigurationError, err: 571 print >> sys.stderr, \ 572 ("Configuration error while opening the configuration file: %s\n" 573 "This might be caused by an incomplete software upgrade or" 574 " by a corrupted configuration file. Until the problem is fixed" 575 " the master daemon cannot start." % str(err)) 576 sys.exit(constants.EXIT_FAILURE) 577 578 # If CheckMaster didn't fail we believe we are the master, but we have to 579 # confirm with the other nodes. 580 if options.no_voting: 581 if not options.yes_do_it: 582 sys.stdout.write("The 'no voting' option has been selected.\n") 583 sys.stdout.write("This is dangerous, please confirm by" 584 " typing uppercase 'yes': ") 585 sys.stdout.flush() 586 587 confirmation = sys.stdin.readline().strip() 588 if confirmation != "YES": 589 print >> sys.stderr, "Aborting." 590 sys.exit(constants.EXIT_FAILURE) 591 592 else: 593 # CheckAgreement uses RPC and threads, hence it needs to be run in 594 # a separate process before we call utils.Daemonize in the current 595 # process. 596 if not utils.RunInSeparateProcess(CheckAgreement): 597 sys.exit(constants.EXIT_FAILURE) 598 599 # ActivateMasterIP also uses RPC/threads, so we run it again via a 600 # separate process. 601 602 # TODO: decide whether failure to activate the master IP is a fatal error 603 utils.RunInSeparateProcess(ActivateMasterIP)
604
605 606 -def PrepMasterd(options, _):
607 """Prep master daemon function, executed with the PID file held. 608 609 """ 610 # This is safe to do as the pid file guarantees against 611 # concurrent execution. 612 utils.RemoveFile(constants.MASTER_SOCKET) 613 614 mainloop = daemon.Mainloop() 615 master = MasterServer(mainloop, constants.MASTER_SOCKET, 616 options.uid, options.gid) 617 return (mainloop, master)
618
619 620 -def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
621 """Main master daemon function, executed with the PID file held. 622 623 """ 624 (mainloop, master) = prep_data 625 try: 626 rpc.Init() 627 try: 628 master.setup_queue() 629 try: 630 mainloop.Run() 631 finally: 632 master.server_cleanup() 633 finally: 634 rpc.Shutdown() 635 finally: 636 utils.RemoveFile(constants.MASTER_SOCKET) 637
638 639 -def Main():
640 """Main function""" 641 parser = OptionParser(description="Ganeti master daemon", 642 usage="%prog [-f] [-d]", 643 version="%%prog (ganeti) %s" % 644 constants.RELEASE_VERSION) 645 parser.add_option("--no-voting", dest="no_voting", 646 help="Do not check that the nodes agree on this node" 647 " being the master and start the daemon unconditionally", 648 default=False, action="store_true") 649 parser.add_option("--yes-do-it", dest="yes_do_it", 650 help="Override interactive check for --no-voting", 651 default=False, action="store_true") 652 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd, 653 ExecMasterd, multithreaded=True)
654