Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module for the unix socket protocol 
 23   
 24  This module implements the local unix socket protocol. You only need 
 25  this module and the opcodes module in the client program in order to 
 26  communicate with the master. 
 27   
 28  The module is also used by the master daemon. 
 29   
 30  """ 
 31   
 32  import socket 
 33  import collections 
 34  import time 
 35  import errno 
 36  import logging 
 37   
 38  from ganeti import serializer 
 39  from ganeti import constants 
 40  from ganeti import errors 
 41  from ganeti import utils 
 42   
 43   
 44  KEY_METHOD = "method" 
 45  KEY_ARGS = "args" 
 46  KEY_SUCCESS = "success" 
 47  KEY_RESULT = "result" 
 48  KEY_VERSION = "version" 
 49   
 50  REQ_SUBMIT_JOB = "SubmitJob" 
 51  REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs" 
 52  REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" 
 53  REQ_CANCEL_JOB = "CancelJob" 
 54  REQ_ARCHIVE_JOB = "ArchiveJob" 
 55  REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs" 
 56  REQ_QUERY_JOBS = "QueryJobs" 
 57  REQ_QUERY_INSTANCES = "QueryInstances" 
 58  REQ_QUERY_NODES = "QueryNodes" 
 59  REQ_QUERY_EXPORTS = "QueryExports" 
 60  REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" 
 61  REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" 
 62  REQ_QUERY_TAGS = "QueryTags" 
 63  REQ_QUERY_LOCKS = "QueryLocks" 
 64  REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag" 
 65  REQ_SET_WATCHER_PAUSE = "SetWatcherPause" 
 66   
 67  DEF_CTMO = 10 
 68  DEF_RWTO = 60 
 69   
 70  # WaitForJobChange timeout 
 71  WFJC_TIMEOUT = (DEF_RWTO - 1) / 2 
72 73 74 -class ProtocolError(errors.LuxiError):
75 """Denotes an error in the LUXI protocol."""
76
77 78 -class ConnectionClosedError(ProtocolError):
79 """Connection closed error."""
80
81 82 -class TimeoutError(ProtocolError):
83 """Operation timeout error."""
84
85 86 -class RequestError(ProtocolError):
87 """Error on request. 88 89 This signifies an error in the request format or request handling, 90 but not (e.g.) an error in starting up an instance. 91 92 Some common conditions that can trigger this exception: 93 - job submission failed because the job data was wrong 94 - query failed because required fields were missing 95 96 """
97
98 99 -class NoMasterError(ProtocolError):
100 """The master cannot be reached. 101 102 This means that the master daemon is not running or the socket has 103 been removed. 104 105 """
106
107 108 -class PermissionError(ProtocolError):
109 """Permission denied while connecting to the master socket. 110 111 This means the user doesn't have the proper rights. 112 113 """
114
115 116 -class Transport:
117 """Low-level transport class. 118 119 This is used on the client side. 120 121 This could be replace by any other class that provides the same 122 semantics to the Client. This means: 123 - can send messages and receive messages 124 - safe for multithreading 125 126 """ 127
128 - def __init__(self, address, timeouts=None):
129 """Constructor for the Client class. 130 131 Arguments: 132 - address: a valid address the the used transport class 133 - timeout: a list of timeouts, to be used on connect and read/write 134 135 There are two timeouts used since we might want to wait for a long 136 time for a response, but the connect timeout should be lower. 137 138 If not passed, we use a default of 10 and respectively 60 seconds. 139 140 Note that on reading data, since the timeout applies to an 141 invidual receive, it might be that the total duration is longer 142 than timeout value passed (we make a hard limit at twice the read 143 timeout). 144 145 """ 146 self.address = address 147 if timeouts is None: 148 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 149 else: 150 self._ctimeout, self._rwtimeout = timeouts 151 152 self.socket = None 153 self._buffer = "" 154 self._msgs = collections.deque() 155 156 try: 157 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 158 159 # Try to connect 160 try: 161 utils.Retry(self._Connect, 1.0, self._ctimeout, 162 args=(self.socket, address, self._ctimeout)) 163 except utils.RetryTimeout: 164 raise TimeoutError("Connect timed out") 165 166 self.socket.settimeout(self._rwtimeout) 167 except (socket.error, NoMasterError): 168 if self.socket is not None: 169 self.socket.close() 170 self.socket = None 171 raise
172 173 @staticmethod
174 - def _Connect(sock, address, timeout):
175 sock.settimeout(timeout) 176 try: 177 sock.connect(address) 178 except socket.timeout, err: 179 raise TimeoutError("Connect timed out: %s" % str(err)) 180 except socket.error, err: 181 error_code = err.args[0] 182 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 183 raise NoMasterError(address) 184 elif error_code in (errno.EPERM, errno.EACCES): 185 raise PermissionError(address) 186 elif error_code == errno.EAGAIN: 187 # Server's socket backlog is full at the moment 188 raise utils.RetryAgain() 189 raise
190
191 - def _CheckSocket(self):
192 """Make sure we are connected. 193 194 """ 195 if self.socket is None: 196 raise ProtocolError("Connection is closed")
197
198 - def Send(self, msg):
199 """Send a message. 200 201 This just sends a message and doesn't wait for the response. 202 203 """ 204 if constants.LUXI_EOM in msg: 205 raise ProtocolError("Message terminator found in payload") 206 207 self._CheckSocket() 208 try: 209 # TODO: sendall is not guaranteed to send everything 210 self.socket.sendall(msg + constants.LUXI_EOM) 211 except socket.timeout, err: 212 raise TimeoutError("Sending timeout: %s" % str(err))
213
214 - def Recv(self):
215 """Try to receive a message from the socket. 216 217 In case we already have messages queued, we just return from the 218 queue. Otherwise, we try to read data with a _rwtimeout network 219 timeout, and making sure we don't go over 2x_rwtimeout as a global 220 limit. 221 222 """ 223 self._CheckSocket() 224 etime = time.time() + self._rwtimeout 225 while not self._msgs: 226 if time.time() > etime: 227 raise TimeoutError("Extended receive timeout") 228 while True: 229 try: 230 data = self.socket.recv(4096) 231 except socket.error, err: 232 if err.args and err.args[0] == errno.EAGAIN: 233 continue 234 raise 235 except socket.timeout, err: 236 raise TimeoutError("Receive timeout: %s" % str(err)) 237 break 238 if not data: 239 raise ConnectionClosedError("Connection closed while reading") 240 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 241 self._buffer = new_msgs.pop() 242 self._msgs.extend(new_msgs) 243 return self._msgs.popleft()
244
245 - def Call(self, msg):
246 """Send a message and wait for the response. 247 248 This is just a wrapper over Send and Recv. 249 250 """ 251 self.Send(msg) 252 return self.Recv()
253
254 - def Close(self):
255 """Close the socket""" 256 if self.socket is not None: 257 self.socket.close() 258 self.socket = None
259
260 261 -def ParseRequest(msg):
262 """Parses a LUXI request message. 263 264 """ 265 try: 266 request = serializer.LoadJson(msg) 267 except ValueError, err: 268 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) 269 270 logging.debug("LUXI request: %s", request) 271 272 if not isinstance(request, dict): 273 logging.error("LUXI request not a dict: %r", msg) 274 raise ProtocolError("Invalid LUXI request (not a dict)") 275 276 method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103 277 args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103 278 version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103 279 280 if method is None or args is None: 281 logging.error("LUXI request missing method or arguments: %r", msg) 282 raise ProtocolError(("Invalid LUXI request (no method or arguments" 283 " in request): %r") % msg) 284 285 return (method, args, version)
286
287 288 -def ParseResponse(msg):
289 """Parses a LUXI response message. 290 291 """ 292 # Parse the result 293 try: 294 data = serializer.LoadJson(msg) 295 except Exception, err: 296 raise ProtocolError("Error while deserializing response: %s" % str(err)) 297 298 # Validate response 299 if not (isinstance(data, dict) and 300 KEY_SUCCESS in data and 301 KEY_RESULT in data): 302 raise ProtocolError("Invalid response from server: %r" % data) 303 304 return (data[KEY_SUCCESS], data[KEY_RESULT], 305 data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
306
307 308 -def FormatResponse(success, result, version=None):
309 """Formats a LUXI response message. 310 311 """ 312 response = { 313 KEY_SUCCESS: success, 314 KEY_RESULT: result, 315 } 316 317 if version is not None: 318 response[KEY_VERSION] = version 319 320 logging.debug("LUXI response: %s", response) 321 322 return serializer.DumpJson(response)
323
324 325 -def FormatRequest(method, args, version=None):
326 """Formats a LUXI request message. 327 328 """ 329 # Build request 330 request = { 331 KEY_METHOD: method, 332 KEY_ARGS: args, 333 } 334 335 if version is not None: 336 request[KEY_VERSION] = version 337 338 # Serialize the request 339 return serializer.DumpJson(request, indent=False)
340
341 342 -def CallLuxiMethod(transport_cb, method, args, version=None):
343 """Send a LUXI request via a transport and return the response. 344 345 """ 346 assert callable(transport_cb) 347 348 request_msg = FormatRequest(method, args, version=version) 349 350 # Send request and wait for response 351 response_msg = transport_cb(request_msg) 352 353 (success, result, resp_version) = ParseResponse(response_msg) 354 355 # Verify version if there was one in the response 356 if resp_version is not None and resp_version != version: 357 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" % 358 (version, resp_version)) 359 360 if success: 361 return result 362 363 errors.MaybeRaise(result) 364 raise RequestError(result)
365
366 367 -class Client(object):
368 """High-level client implementation. 369 370 This uses a backing Transport-like class on top of which it 371 implements data serialization/deserialization. 372 373 """
374 - def __init__(self, address=None, timeouts=None, transport=Transport):
375 """Constructor for the Client class. 376 377 Arguments: 378 - address: a valid address the the used transport class 379 - timeout: a list of timeouts, to be used on connect and read/write 380 - transport: a Transport-like class 381 382 383 If timeout is not passed, the default timeouts of the transport 384 class are used. 385 386 """ 387 if address is None: 388 address = constants.MASTER_SOCKET 389 self.address = address 390 self.timeouts = timeouts 391 self.transport_class = transport 392 self.transport = None 393 self._InitTransport()
394
395 - def _InitTransport(self):
396 """(Re)initialize the transport if needed. 397 398 """ 399 if self.transport is None: 400 self.transport = self.transport_class(self.address, 401 timeouts=self.timeouts)
402
403 - def _CloseTransport(self):
404 """Close the transport, ignoring errors. 405 406 """ 407 if self.transport is None: 408 return 409 try: 410 old_transp = self.transport 411 self.transport = None 412 old_transp.Close() 413 except Exception: # pylint: disable-msg=W0703 414 pass
415
416 - def _SendMethodCall(self, data):
417 # Send request and wait for response 418 try: 419 self._InitTransport() 420 return self.transport.Call(data) 421 except Exception: 422 self._CloseTransport() 423 raise
424
425 - def CallMethod(self, method, args):
426 """Send a generic request and return the response. 427 428 """ 429 return CallLuxiMethod(self._SendMethodCall, method, args, 430 version=constants.LUXI_VERSION)
431
432 - def SetQueueDrainFlag(self, drain_flag):
433 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
434
435 - def SetWatcherPause(self, until):
436 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
437
438 - def SubmitJob(self, ops):
439 ops_state = map(lambda op: op.__getstate__(), ops) 440 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
441
442 - def SubmitManyJobs(self, jobs):
443 jobs_state = [] 444 for ops in jobs: 445 jobs_state.append([op.__getstate__() for op in ops]) 446 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
447
448 - def CancelJob(self, job_id):
449 return self.CallMethod(REQ_CANCEL_JOB, job_id)
450
451 - def ArchiveJob(self, job_id):
452 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
453
454 - def AutoArchiveJobs(self, age):
455 timeout = (DEF_RWTO - 1) / 2 456 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
457
458 - def WaitForJobChangeOnce(self, job_id, fields, 459 prev_job_info, prev_log_serial, 460 timeout=WFJC_TIMEOUT):
461 """Waits for changes on a job. 462 463 @param job_id: Job ID 464 @type fields: list 465 @param fields: List of field names to be observed 466 @type prev_job_info: None or list 467 @param prev_job_info: Previously received job information 468 @type prev_log_serial: None or int/long 469 @param prev_log_serial: Highest log serial number previously received 470 @type timeout: int/float 471 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 472 be capped to that value) 473 474 """ 475 assert timeout >= 0, "Timeout can not be negative" 476 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 477 (job_id, fields, prev_job_info, 478 prev_log_serial, 479 min(WFJC_TIMEOUT, timeout)))
480
481 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
482 while True: 483 result = self.WaitForJobChangeOnce(job_id, fields, 484 prev_job_info, prev_log_serial) 485 if result != constants.JOB_NOTCHANGED: 486 break 487 return result
488
489 - def QueryJobs(self, job_ids, fields):
490 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
491
492 - def QueryInstances(self, names, fields, use_locking):
493 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
494
495 - def QueryNodes(self, names, fields, use_locking):
496 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
497
498 - def QueryExports(self, nodes, use_locking):
499 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
500
501 - def QueryClusterInfo(self):
502 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
503
504 - def QueryConfigValues(self, fields):
505 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
506
507 - def QueryTags(self, kind, name):
508 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
509
510 - def QueryLocks(self, fields, sync):
511 return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))
512