Package ganeti :: Package server :: Module masterd
[hide private]
[frames] | no frames]

Source Code for Module ganeti.server.masterd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Master daemon program. 
 32   
 33  Some classes deviates from the standard style guide since the 
 34  inheritance from parent classes requires it. 
 35   
 36  """ 
 37   
 38  # pylint: disable=C0103 
 39  # C0103: Invalid name ganeti-masterd 
 40   
 41  import os 
 42  import sys 
 43  import socket 
 44  import time 
 45  import tempfile 
 46  import logging 
 47   
 48   
 49  from ganeti import config 
 50  from ganeti import constants 
 51  from ganeti import daemon 
 52  from ganeti import jqueue 
 53  from ganeti import luxi 
 54  import ganeti.rpc.errors as rpcerr 
 55  from ganeti import utils 
 56  from ganeti import errors 
 57  from ganeti import workerpool 
 58  import ganeti.rpc.node as rpc 
 59  import ganeti.rpc.client as rpccl 
 60  from ganeti import ht 
 61   
 62   
 63  CLIENT_REQUEST_WORKERS = 16 
 64   
 65  EXIT_NOTMASTER = constants.EXIT_NOTMASTER 
 66  EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR 
67 68 69 -def _LogNewJob(status, info, ops):
70 """Log information about a recently submitted job. 71 72 """ 73 op_summary = utils.CommaJoin(op.Summary() for op in ops) 74 75 if status: 76 logging.info("New job with id %s, summary: %s", info, op_summary) 77 else: 78 logging.info("Failed to submit job, reason: '%s', summary: %s", 79 info, op_summary)
80
81 82 -class ClientRequestWorker(workerpool.BaseWorker):
83 # pylint: disable=W0221
84 - def RunTask(self, server, message, client):
85 """Process the request. 86 87 """ 88 client_ops = ClientOps(server) 89 90 try: 91 (method, args, ver) = rpccl.ParseRequest(message) 92 except rpcerr.ProtocolError, err: 93 logging.error("Protocol Error: %s", err) 94 client.close_log() 95 return 96 97 success = False 98 try: 99 # Verify client's version if there was one in the request 100 if ver is not None and ver != constants.LUXI_VERSION: 101 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % 102 (constants.LUXI_VERSION, ver)) 103 104 result = client_ops.handle_request(method, args) 105 success = True 106 except errors.GenericError, err: 107 logging.exception("Unexpected exception") 108 success = False 109 result = errors.EncodeException(err) 110 except: 111 logging.exception("Unexpected exception") 112 err = sys.exc_info() 113 result = "Caught exception: %s" % str(err[1]) 114 115 try: 116 reply = rpccl.FormatResponse(success, result) 117 client.send_message(reply) 118 # awake the main thread so that it can write out the data. 119 server.awaker.signal() 120 except: # pylint: disable=W0702 121 logging.exception("Send error") 122 client.close_log()
123
124 125 -class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
126 """Handler for master peers. 127 128 """ 129 _MAX_UNHANDLED = 1 130
131 - def __init__(self, server, connected_socket, client_address, family):
132 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, 133 client_address, 134 constants.LUXI_EOM, 135 family, self._MAX_UNHANDLED) 136 self.server = server
137
138 - def handle_message(self, message, _):
139 self.server.request_workers.AddTask((self.server, message, self))
140
141 142 -class _MasterShutdownCheck(object):
143 """Logic for master daemon shutdown. 144 145 """ 146 #: How long to wait between checks 147 _CHECK_INTERVAL = 5.0 148 149 #: How long to wait after all jobs are done (e.g. to give clients time to 150 #: retrieve the job status) 151 _SHUTDOWN_LINGER = 5.0 152
153 - def __init__(self):
154 """Initializes this class. 155 156 """ 157 self._had_active_jobs = None 158 self._linger_timeout = None
159
160 - def __call__(self, jq_prepare_result):
161 """Determines if master daemon is ready for shutdown. 162 163 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} 164 @rtype: None or number 165 @return: None if master daemon is ready, timeout if the check must be 166 repeated 167 168 """ 169 if jq_prepare_result: 170 # Check again shortly 171 logging.info("Job queue has been notified for shutdown but is still" 172 " busy; next check in %s seconds", self._CHECK_INTERVAL) 173 self._had_active_jobs = True 174 return self._CHECK_INTERVAL 175 176 if not self._had_active_jobs: 177 # Can shut down as there were no active jobs on the first check 178 return None 179 180 # No jobs are running anymore, but maybe some clients want to collect some 181 # information. Give them a short amount of time. 182 if self._linger_timeout is None: 183 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) 184 185 remaining = self._linger_timeout.Remaining() 186 187 logging.info("Job queue no longer busy; shutting down master daemon" 188 " in %s seconds", remaining) 189 190 # TODO: Should the master daemon socket be closed at this point? Doing so 191 # wouldn't affect existing connections. 192 193 if remaining < 0: 194 return None 195 else: 196 return remaining
197
198 199 -class MasterServer(daemon.AsyncStreamServer):
200 """Master Server. 201 202 This is the main asynchronous master server. It handles connections to the 203 master socket. 204 205 """ 206 family = socket.AF_UNIX 207
208 - def __init__(self, address, uid, gid):
209 """MasterServer constructor 210 211 @param address: the unix socket address to bind the MasterServer to 212 @param uid: The uid of the owner of the socket 213 @param gid: The gid of the owner of the socket 214 215 """ 216 temp_name = tempfile.mktemp(dir=os.path.dirname(address)) 217 daemon.AsyncStreamServer.__init__(self, self.family, temp_name) 218 os.chmod(temp_name, 0770) 219 os.chown(temp_name, uid, gid) 220 os.rename(temp_name, address) 221 222 self.awaker = daemon.AsyncAwaker() 223 224 # We'll only start threads once we've forked. 225 self.context = None 226 self.request_workers = None 227 228 self._shutdown_check = None
229
230 - def handle_connection(self, connected_socket, client_address):
231 # TODO: add connection count and limit the number of open connections to a 232 # maximum number to avoid breaking for lack of file descriptors or memory. 233 MasterClientHandler(self, connected_socket, client_address, self.family)
234
235 - def setup_context(self):
236 self.context = GanetiContext() 237 self.request_workers = workerpool.WorkerPool("ClientReq", 238 CLIENT_REQUEST_WORKERS, 239 ClientRequestWorker)
240
241 - def WaitForShutdown(self):
242 """Prepares server for shutdown. 243 244 """ 245 if self._shutdown_check is None: 246 self._shutdown_check = _MasterShutdownCheck() 247 248 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
249
250 - def server_cleanup(self):
251 """Cleanup the server. 252 253 This involves shutting down the processor threads and the master 254 socket. 255 256 """ 257 try: 258 self.close() 259 finally: 260 if self.request_workers: 261 self.request_workers.TerminateWorkers() 262 if self.context: 263 self.context.jobqueue.Shutdown() 264 self.context.livelock.close()
265
266 267 -class ClientOps(object):
268 """Class holding high-level client operations."""
269 - def __init__(self, server):
270 self.server = server
271 272 @staticmethod
273 - def _PickupJob(args, queue):
274 logging.info("Picking up new job from queue") 275 (job_id, ) = args 276 queue.PickupJob(job_id) 277 return job_id
278 279 @staticmethod
280 - def _ChangeJobPriority(args, queue):
281 (job_id, priority) = args 282 logging.info("Received request to change priority for job %s to %s", 283 job_id, priority) 284 return queue.ChangeJobPriority(job_id, priority)
285
286 - def handle_request(self, method, args): # pylint: disable=R0911
287 context = self.server.context 288 queue = context.jobqueue 289 290 # TODO: Parameter validation 291 if not isinstance(args, (tuple, list)): 292 logging.info("Received invalid arguments of type '%s'", type(args)) 293 raise ValueError("Invalid arguments type '%s'" % type(args)) 294 295 if method not in luxi.REQ_ALL: 296 logging.info("Received invalid request '%s'", method) 297 raise ValueError("Invalid operation '%s'" % method) 298 299 job_id = None 300 if method == luxi.REQ_PICKUP_JOB: 301 job_id = self._PickupJob(args, queue) 302 elif method == luxi.REQ_CHANGE_JOB_PRIORITY: 303 job_id = self._ChangeJobPriority(args, queue) 304 else: 305 logging.info("Request '%s' not supported by masterd", method) 306 raise ValueError("Unsupported operation '%s'" % method) 307 308 return job_id
309
310 311 -class GanetiContext(object):
312 """Context common to all ganeti threads. 313 314 This class creates and holds common objects shared by all threads. 315 316 """ 317 # pylint: disable=W0212 318 # we do want to ensure a singleton here 319 _instance = None 320
321 - def __init__(self, livelock=None):
322 """Constructs a new GanetiContext object. 323 324 There should be only a GanetiContext object at any time, so this 325 function raises an error if this is not the case. 326 327 """ 328 assert self.__class__._instance is None, "double GanetiContext instance" 329 330 # Create a livelock file 331 if livelock is None: 332 self.livelock = utils.livelock.LiveLock("masterd") 333 else: 334 self.livelock = livelock 335 336 # Job queue 337 cfg = self.GetConfig(None) 338 logging.debug("Creating the job queue") 339 self.jobqueue = jqueue.JobQueue(self, cfg) 340 341 # setting this also locks the class against attribute modifications 342 self.__class__._instance = self
343
344 - def __setattr__(self, name, value):
345 """Setting GanetiContext attributes is forbidden after initialization. 346 347 """ 348 assert self.__class__._instance is None, "Attempt to modify Ganeti Context" 349 object.__setattr__(self, name, value)
350
351 - def GetWConfdContext(self, ec_id):
352 return config.GetWConfdContext(ec_id, self.livelock)
353
354 - def GetConfig(self, ec_id):
355 return config.GetConfig(ec_id, self.livelock)
356 357 # pylint: disable=R0201 358 # method could be a function, but keep interface backwards compatible
359 - def GetRpc(self, cfg):
360 return rpc.RpcRunner(cfg, lambda _: None)
361
362 - def AddNode(self, cfg, node, ec_id):
363 """Adds a node to the configuration. 364 365 """ 366 # Add it to the configuration 367 cfg.AddNode(node, ec_id) 368 369 # If preseeding fails it'll not be added 370 self.jobqueue.AddNode(node)
371
372 - def ReaddNode(self, node):
373 """Updates a node that's already in the configuration 374 375 """ 376 # Synchronize the queue again 377 self.jobqueue.AddNode(node)
378
379 - def RemoveNode(self, cfg, node):
380 """Removes a node from the configuration and lock manager. 381 382 """ 383 # Remove node from configuration 384 cfg.RemoveNode(node.uuid) 385 386 # Notify job queue 387 self.jobqueue.RemoveNode(node.name)
388
389 390 -def _SetWatcherPause(context, ec_id, until):
391 """Creates or removes the watcher pause file. 392 393 @type context: L{GanetiContext} 394 @param context: Global Ganeti context 395 @type until: None or int 396 @param until: Unix timestamp saying until when the watcher shouldn't run 397 398 """ 399 node_names = context.GetConfig(ec_id).GetNodeList() 400 401 if until is None: 402 logging.info("Received request to no longer pause watcher") 403 else: 404 if not ht.TNumber(until): 405 raise TypeError("Duration must be numeric") 406 407 if until < time.time(): 408 raise errors.GenericError("Unable to set pause end time in the past") 409 410 logging.info("Received request to pause watcher until %s", until) 411 412 result = context.rpc.call_set_watcher_pause(node_names, until) 413 414 errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg) 415 for (node_name, nres) in result.items() 416 if nres.fail_msg and not nres.offline) 417 if errmsg: 418 raise errors.OpExecError("Watcher pause was set where possible, but failed" 419 " on the following node(s): %s" % errmsg) 420 421 return until
422