Package ganeti :: Package server :: Module masterd
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.masterd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Master daemon program. 
 23   
 24  Some classes deviates from the standard style guide since the 
 25  inheritance from parent classes requires it. 
 26   
 27  """ 
 28   
 29  # pylint: disable=C0103 
 30  # C0103: Invalid name ganeti-masterd 
 31   
 32  import grp 
 33  import os 
 34  import pwd 
 35  import sys 
 36  import socket 
 37  import time 
 38  import tempfile 
 39  import logging 
 40   
 41  from optparse import OptionParser 
 42   
 43  from ganeti import config 
 44  from ganeti import constants 
 45  from ganeti import daemon 
 46  from ganeti import mcpu 
 47  from ganeti import opcodes 
 48  from ganeti import jqueue 
 49  from ganeti import locking 
 50  from ganeti import luxi 
 51  from ganeti import utils 
 52  from ganeti import errors 
 53  from ganeti import ssconf 
 54  from ganeti import workerpool 
 55  from ganeti import rpc 
 56  from ganeti import bootstrap 
 57  from ganeti import netutils 
 58  from ganeti import objects 
 59  from ganeti import query 
 60   
 61   
 62  CLIENT_REQUEST_WORKERS = 16 
 63   
 64  EXIT_NOTMASTER = constants.EXIT_NOTMASTER 
 65  EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR 
66 67 68 -class ClientRequestWorker(workerpool.BaseWorker):
69 # pylint: disable=W0221
70 - def RunTask(self, server, message, client):
71 """Process the request. 72 73 """ 74 client_ops = ClientOps(server) 75 76 try: 77 (method, args, version) = luxi.ParseRequest(message) 78 except luxi.ProtocolError, err: 79 logging.error("Protocol Error: %s", err) 80 client.close_log() 81 return 82 83 success = False 84 try: 85 # Verify client's version if there was one in the request 86 if version is not None and version != constants.LUXI_VERSION: 87 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % 88 (constants.LUXI_VERSION, version)) 89 90 result = client_ops.handle_request(method, args) 91 success = True 92 except errors.GenericError, err: 93 logging.exception("Unexpected exception") 94 success = False 95 result = errors.EncodeException(err) 96 except: 97 logging.exception("Unexpected exception") 98 err = sys.exc_info() 99 result = "Caught exception: %s" % str(err[1]) 100 101 try: 102 reply = luxi.FormatResponse(success, result) 103 client.send_message(reply) 104 # awake the main thread so that it can write out the data. 105 server.awaker.signal() 106 except: # pylint: disable=W0702 107 logging.exception("Send error") 108 client.close_log()
109
110 111 -class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112 """Handler for master peers. 113 114 """ 115 _MAX_UNHANDLED = 1 116
117 - def __init__(self, server, connected_socket, client_address, family):
118 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, 119 client_address, 120 constants.LUXI_EOM, 121 family, self._MAX_UNHANDLED) 122 self.server = server
123
124 - def handle_message(self, message, _):
125 self.server.request_workers.AddTask((self.server, message, self))
126
127 128 -class MasterServer(daemon.AsyncStreamServer):
129 """Master Server. 130 131 This is the main asynchronous master server. It handles connections to the 132 master socket. 133 134 """ 135 family = socket.AF_UNIX 136
137 - def __init__(self, mainloop, address, uid, gid):
138 """MasterServer constructor 139 140 @type mainloop: ganeti.daemon.Mainloop 141 @param mainloop: Mainloop used to poll for I/O events 142 @param address: the unix socket address to bind the MasterServer to 143 @param uid: The uid of the owner of the socket 144 @param gid: The gid of the owner of the socket 145 146 """ 147 temp_name = tempfile.mktemp(dir=os.path.dirname(address)) 148 daemon.AsyncStreamServer.__init__(self, self.family, temp_name) 149 os.chmod(temp_name, 0770) 150 os.chown(temp_name, uid, gid) 151 os.rename(temp_name, address) 152 153 self.mainloop = mainloop 154 self.awaker = daemon.AsyncAwaker() 155 156 # We'll only start threads once we've forked. 157 self.context = None 158 self.request_workers = None
159
160 - def handle_connection(self, connected_socket, client_address):
161 # TODO: add connection count and limit the number of open connections to a 162 # maximum number to avoid breaking for lack of file descriptors or memory. 163 MasterClientHandler(self, connected_socket, client_address, self.family)
164
165 - def setup_queue(self):
166 self.context = GanetiContext() 167 self.request_workers = workerpool.WorkerPool("ClientReq", 168 CLIENT_REQUEST_WORKERS, 169 ClientRequestWorker)
170
171 - def server_cleanup(self):
172 """Cleanup the server. 173 174 This involves shutting down the processor threads and the master 175 socket. 176 177 """ 178 try: 179 self.close() 180 finally: 181 if self.request_workers: 182 self.request_workers.TerminateWorkers() 183 if self.context: 184 self.context.jobqueue.Shutdown()
185
186 187 -class ClientOps:
188 """Class holding high-level client operations."""
189 - def __init__(self, server):
190 self.server = server
191
192 - def handle_request(self, method, args): # pylint: disable=R0911
193 queue = self.server.context.jobqueue 194 195 # TODO: Parameter validation 196 197 # TODO: Rewrite to not exit in each 'if/elif' branch 198 199 if method == luxi.REQ_SUBMIT_JOB: 200 logging.info("Received new job") 201 ops = [opcodes.OpCode.LoadOpCode(state) for state in args] 202 return queue.SubmitJob(ops) 203 204 if method == luxi.REQ_SUBMIT_MANY_JOBS: 205 logging.info("Received multiple jobs") 206 jobs = [] 207 for ops in args: 208 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) 209 return queue.SubmitManyJobs(jobs) 210 211 elif method == luxi.REQ_CANCEL_JOB: 212 job_id = args 213 logging.info("Received job cancel request for %s", job_id) 214 return queue.CancelJob(job_id) 215 216 elif method == luxi.REQ_ARCHIVE_JOB: 217 job_id = args 218 logging.info("Received job archive request for %s", job_id) 219 return queue.ArchiveJob(job_id) 220 221 elif method == luxi.REQ_AUTOARCHIVE_JOBS: 222 (age, timeout) = args 223 logging.info("Received job autoarchive request for age %s, timeout %s", 224 age, timeout) 225 return queue.AutoArchiveJobs(age, timeout) 226 227 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: 228 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args 229 logging.info("Received job poll request for %s", job_id) 230 return queue.WaitForJobChanges(job_id, fields, prev_job_info, 231 prev_log_serial, timeout) 232 233 elif method == luxi.REQ_QUERY: 234 req = objects.QueryRequest.FromDict(args) 235 236 if req.what in constants.QR_VIA_OP: 237 result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields, 238 filter=req.filter)) 239 elif req.what == constants.QR_LOCK: 240 if req.filter is not None: 241 raise errors.OpPrereqError("Lock queries can't be filtered") 242 return self.server.context.glm.QueryLocks(req.fields) 243 elif req.what in constants.QR_VIA_LUXI: 244 raise NotImplementedError 245 else: 246 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, 247 errors.ECODE_INVAL) 248 249 return result 250 251 elif method == luxi.REQ_QUERY_FIELDS: 252 req = objects.QueryFieldsRequest.FromDict(args) 253 254 try: 255 fielddefs = query.ALL_FIELDS[req.what] 256 except KeyError: 257 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, 258 errors.ECODE_INVAL) 259 260 return query.QueryFields(fielddefs, req.fields) 261 262 elif method == luxi.REQ_QUERY_JOBS: 263 (job_ids, fields) = args 264 if isinstance(job_ids, (tuple, list)) and job_ids: 265 msg = utils.CommaJoin(job_ids) 266 else: 267 msg = str(job_ids) 268 logging.info("Received job query request for %s", msg) 269 return queue.QueryJobs(job_ids, fields) 270 271 elif method == luxi.REQ_QUERY_INSTANCES: 272 (names, fields, use_locking) = args 273 logging.info("Received instance query request for %s", names) 274 if use_locking: 275 raise errors.OpPrereqError("Sync queries are not allowed", 276 errors.ECODE_INVAL) 277 op = opcodes.OpInstanceQuery(names=names, output_fields=fields, 278 use_locking=use_locking) 279 return self._Query(op) 280 281 elif method == luxi.REQ_QUERY_NODES: 282 (names, fields, use_locking) = args 283 logging.info("Received node query request for %s", names) 284 if use_locking: 285 raise errors.OpPrereqError("Sync queries are not allowed", 286 errors.ECODE_INVAL) 287 op = opcodes.OpNodeQuery(names=names, output_fields=fields, 288 use_locking=use_locking) 289 return self._Query(op) 290 291 elif method == luxi.REQ_QUERY_GROUPS: 292 (names, fields, use_locking) = args 293 logging.info("Received group query request for %s", names) 294 if use_locking: 295 raise errors.OpPrereqError("Sync queries are not allowed", 296 errors.ECODE_INVAL) 297 op = opcodes.OpGroupQuery(names=names, output_fields=fields) 298 return self._Query(op) 299 300 elif method == luxi.REQ_QUERY_EXPORTS: 301 nodes, use_locking = args 302 if use_locking: 303 raise errors.OpPrereqError("Sync queries are not allowed", 304 errors.ECODE_INVAL) 305 logging.info("Received exports query request") 306 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking) 307 return self._Query(op) 308 309 elif method == luxi.REQ_QUERY_CONFIG_VALUES: 310 fields = args 311 logging.info("Received config values query request for %s", fields) 312 op = opcodes.OpClusterConfigQuery(output_fields=fields) 313 return self._Query(op) 314 315 elif method == luxi.REQ_QUERY_CLUSTER_INFO: 316 logging.info("Received cluster info query request") 317 op = opcodes.OpClusterQuery() 318 return self._Query(op) 319 320 elif method == luxi.REQ_QUERY_TAGS: 321 kind, name = args 322 logging.info("Received tags query request") 323 op = opcodes.OpTagsGet(kind=kind, name=name) 324 return self._Query(op) 325 326 elif method == luxi.REQ_QUERY_LOCKS: 327 (fields, sync) = args 328 logging.info("Received locks query request") 329 if sync: 330 raise NotImplementedError("Synchronous queries are not implemented") 331 return self.server.context.glm.OldStyleQueryLocks(fields) 332 333 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG: 334 drain_flag = args 335 logging.info("Received queue drain flag change request to %s", 336 drain_flag) 337 return queue.SetDrainFlag(drain_flag) 338 339 elif method == luxi.REQ_SET_WATCHER_PAUSE: 340 (until, ) = args 341 342 if until is None: 343 logging.info("Received request to no longer pause the watcher") 344 else: 345 if not isinstance(until, (int, float)): 346 raise TypeError("Duration must be an integer or float") 347 348 if until < time.time(): 349 raise errors.GenericError("Unable to set pause end time in the past") 350 351 logging.info("Received request to pause the watcher until %s", until) 352 353 return _SetWatcherPause(until) 354 355 else: 356 logging.info("Received invalid request '%s'", method) 357 raise ValueError("Invalid operation '%s'" % method)
358
359 - def _Query(self, op):
360 """Runs the specified opcode and returns the result. 361 362 """ 363 # Queries don't have a job id 364 proc = mcpu.Processor(self.server.context, None) 365 366 # TODO: Executing an opcode using locks will acquire them in blocking mode. 367 # Consider using a timeout for retries. 368 return proc.ExecOpCode(op, None)
369
370 371 -class GanetiContext(object):
372 """Context common to all ganeti threads. 373 374 This class creates and holds common objects shared by all threads. 375 376 """ 377 # pylint: disable=W0212 378 # we do want to ensure a singleton here 379 _instance = None 380
381 - def __init__(self):
382 """Constructs a new GanetiContext object. 383 384 There should be only a GanetiContext object at any time, so this 385 function raises an error if this is not the case. 386 387 """ 388 assert self.__class__._instance is None, "double GanetiContext instance" 389 390 # Create global configuration object 391 self.cfg = config.ConfigWriter() 392 393 # Locking manager 394 self.glm = locking.GanetiLockManager( 395 self.cfg.GetNodeList(), 396 self.cfg.GetNodeGroupList(), 397 self.cfg.GetInstanceList()) 398 399 # Job queue 400 self.jobqueue = jqueue.JobQueue(self) 401 402 # setting this also locks the class against attribute modifications 403 self.__class__._instance = self
404
405 - def __setattr__(self, name, value):
406 """Setting GanetiContext attributes is forbidden after initialization. 407 408 """ 409 assert self.__class__._instance is None, "Attempt to modify Ganeti Context" 410 object.__setattr__(self, name, value)
411
412 - def AddNode(self, node, ec_id):
413 """Adds a node to the configuration and lock manager. 414 415 """ 416 # Add it to the configuration 417 self.cfg.AddNode(node, ec_id) 418 419 # If preseeding fails it'll not be added 420 self.jobqueue.AddNode(node) 421 422 # Add the new node to the Ganeti Lock Manager 423 self.glm.add(locking.LEVEL_NODE, node.name)
424
425 - def ReaddNode(self, node):
426 """Updates a node that's already in the configuration 427 428 """ 429 # Synchronize the queue again 430 self.jobqueue.AddNode(node)
431
432 - def RemoveNode(self, name):
433 """Removes a node from the configuration and lock manager. 434 435 """ 436 # Remove node from configuration 437 self.cfg.RemoveNode(name) 438 439 # Notify job queue 440 self.jobqueue.RemoveNode(name) 441 442 # Remove the node from the Ganeti Lock Manager 443 self.glm.remove(locking.LEVEL_NODE, name)
444
445 446 -def _SetWatcherPause(until):
447 """Creates or removes the watcher pause file. 448 449 @type until: None or int 450 @param until: Unix timestamp saying until when the watcher shouldn't run 451 452 """ 453 if until is None: 454 utils.RemoveFile(constants.WATCHER_PAUSEFILE) 455 else: 456 utils.WriteFile(constants.WATCHER_PAUSEFILE, 457 data="%d\n" % (until, )) 458 459 return until
460
461 462 @rpc.RunWithRPC 463 -def CheckAgreement():
464 """Check the agreement on who is the master. 465 466 The function uses a very simple algorithm: we must get more positive 467 than negative answers. Since in most of the cases we are the master, 468 we'll use our own config file for getting the node list. In the 469 future we could collect the current node list from our (possibly 470 obsolete) known nodes. 471 472 In order to account for cold-start of all nodes, we retry for up to 473 a minute until we get a real answer as the top-voted one. If the 474 nodes are more out-of-sync, for now manual startup of the master 475 should be attempted. 476 477 Note that for a even number of nodes cluster, we need at least half 478 of the nodes (beside ourselves) to vote for us. This creates a 479 problem on two-node clusters, since in this case we require the 480 other node to be up too to confirm our status. 481 482 """ 483 myself = netutils.Hostname.GetSysName() 484 #temp instantiation of a config writer, used only to get the node list 485 cfg = config.ConfigWriter() 486 node_list = cfg.GetNodeList() 487 del cfg 488 retries = 6 489 while retries > 0: 490 votes = bootstrap.GatherMasterVotes(node_list) 491 if not votes: 492 # empty node list, this is a one node cluster 493 return True 494 if votes[0][0] is None: 495 retries -= 1 496 time.sleep(10) 497 continue 498 break 499 if retries == 0: 500 logging.critical("Cluster inconsistent, most of the nodes didn't answer" 501 " after multiple retries. Aborting startup") 502 logging.critical("Use the --no-voting option if you understand what" 503 " effects it has on the cluster state") 504 return False 505 # here a real node is at the top of the list 506 all_votes = sum(item[1] for item in votes) 507 top_node, top_votes = votes[0] 508 509 result = False 510 if top_node != myself: 511 logging.critical("It seems we are not the master (top-voted node" 512 " is %s with %d out of %d votes)", top_node, top_votes, 513 all_votes) 514 elif top_votes < all_votes - top_votes: 515 logging.critical("It seems we are not the master (%d votes for," 516 " %d votes against)", top_votes, all_votes - top_votes) 517 else: 518 result = True 519 520 return result
521
522 523 @rpc.RunWithRPC 524 -def ActivateMasterIP():
525 # activate ip 526 master_node = ssconf.SimpleStore().GetMasterNode() 527 result = rpc.RpcRunner.call_node_start_master(master_node, False, False) 528 msg = result.fail_msg 529 if msg: 530 logging.error("Can't activate master IP address: %s", msg)
531
532 533 -def CheckMasterd(options, args):
534 """Initial checks whether to run or exit with a failure. 535 536 """ 537 if args: # masterd doesn't take any arguments 538 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0]) 539 sys.exit(constants.EXIT_FAILURE) 540 541 ssconf.CheckMaster(options.debug) 542 543 try: 544 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid 545 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid 546 except KeyError: 547 print >> sys.stderr, ("User or group not existing on system: %s:%s" % 548 (constants.MASTERD_USER, constants.DAEMONS_GROUP)) 549 sys.exit(constants.EXIT_FAILURE) 550 551 # Check the configuration is sane before anything else 552 try: 553 config.ConfigWriter() 554 except errors.ConfigVersionMismatch, err: 555 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0]) 556 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1]) 557 print >> sys.stderr, \ 558 ("Configuration version mismatch. The current Ganeti software" 559 " expects version %s, but the on-disk configuration file has" 560 " version %s. This is likely the result of upgrading the" 561 " software without running the upgrade procedure. Please contact" 562 " your cluster administrator or complete the upgrade using the" 563 " cfgupgrade utility, after reading the upgrade notes." % 564 (v1, v2)) 565 sys.exit(constants.EXIT_FAILURE) 566 except errors.ConfigurationError, err: 567 print >> sys.stderr, \ 568 ("Configuration error while opening the configuration file: %s\n" 569 "This might be caused by an incomplete software upgrade or" 570 " by a corrupted configuration file. Until the problem is fixed" 571 " the master daemon cannot start." % str(err)) 572 sys.exit(constants.EXIT_FAILURE) 573 574 # If CheckMaster didn't fail we believe we are the master, but we have to 575 # confirm with the other nodes. 576 if options.no_voting: 577 if not options.yes_do_it: 578 sys.stdout.write("The 'no voting' option has been selected.\n") 579 sys.stdout.write("This is dangerous, please confirm by" 580 " typing uppercase 'yes': ") 581 sys.stdout.flush() 582 583 confirmation = sys.stdin.readline().strip() 584 if confirmation != "YES": 585 print >> sys.stderr, "Aborting." 586 sys.exit(constants.EXIT_FAILURE) 587 588 else: 589 # CheckAgreement uses RPC and threads, hence it needs to be run in 590 # a separate process before we call utils.Daemonize in the current 591 # process. 592 if not utils.RunInSeparateProcess(CheckAgreement): 593 sys.exit(constants.EXIT_FAILURE) 594 595 # ActivateMasterIP also uses RPC/threads, so we run it again via a 596 # separate process. 597 598 # TODO: decide whether failure to activate the master IP is a fatal error 599 utils.RunInSeparateProcess(ActivateMasterIP)
600
601 602 -def PrepMasterd(options, _):
603 """Prep master daemon function, executed with the PID file held. 604 605 """ 606 # This is safe to do as the pid file guarantees against 607 # concurrent execution. 608 utils.RemoveFile(constants.MASTER_SOCKET) 609 610 mainloop = daemon.Mainloop() 611 master = MasterServer(mainloop, constants.MASTER_SOCKET, 612 options.uid, options.gid) 613 return (mainloop, master)
614
615 616 -def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
617 """Main master daemon function, executed with the PID file held. 618 619 """ 620 (mainloop, master) = prep_data 621 try: 622 rpc.Init() 623 try: 624 master.setup_queue() 625 try: 626 mainloop.Run() 627 finally: 628 master.server_cleanup() 629 finally: 630 rpc.Shutdown() 631 finally: 632 utils.RemoveFile(constants.MASTER_SOCKET) 633
634 635 -def Main():
636 """Main function""" 637 parser = OptionParser(description="Ganeti master daemon", 638 usage="%prog [-f] [-d]", 639 version="%%prog (ganeti) %s" % 640 constants.RELEASE_VERSION) 641 parser.add_option("--no-voting", dest="no_voting", 642 help="Do not check that the nodes agree on this node" 643 " being the master and start the daemon unconditionally", 644 default=False, action="store_true") 645 parser.add_option("--yes-do-it", dest="yes_do_it", 646 help="Override interactive check for --no-voting", 647 default=False, action="store_true") 648 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd, 649 ExecMasterd, multithreaded=True)
650