Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module for the unix socket protocol 
 23   
 24  This module implements the local unix socket protocol. You only need 
 25  this module and the opcodes module in the client program in order to 
 26  communicate with the master. 
 27   
 28  The module is also used by the master daemon. 
 29   
 30  """ 
 31   
 32  import socket 
 33  import collections 
 34  import time 
 35  import errno 
 36  import logging 
 37   
 38  from ganeti import serializer 
 39  from ganeti import constants 
 40  from ganeti import errors 
 41  from ganeti import utils 
 42   
 43   
 44  KEY_METHOD = "method" 
 45  KEY_ARGS = "args" 
 46  KEY_SUCCESS = "success" 
 47  KEY_RESULT = "result" 
 48   
 49  REQ_SUBMIT_JOB = "SubmitJob" 
 50  REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs" 
 51  REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" 
 52  REQ_CANCEL_JOB = "CancelJob" 
 53  REQ_ARCHIVE_JOB = "ArchiveJob" 
 54  REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs" 
 55  REQ_QUERY_JOBS = "QueryJobs" 
 56  REQ_QUERY_INSTANCES = "QueryInstances" 
 57  REQ_QUERY_NODES = "QueryNodes" 
 58  REQ_QUERY_EXPORTS = "QueryExports" 
 59  REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" 
 60  REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" 
 61  REQ_QUERY_TAGS = "QueryTags" 
 62  REQ_QUERY_LOCKS = "QueryLocks" 
 63  REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag" 
 64  REQ_SET_WATCHER_PAUSE = "SetWatcherPause" 
 65   
 66  DEF_CTMO = 10 
 67  DEF_RWTO = 60 
 68   
 69  # WaitForJobChange timeout 
 70  WFJC_TIMEOUT = (DEF_RWTO - 1) / 2 
71 72 73 -class ProtocolError(errors.GenericError):
74 """Denotes an error in the LUXI protocol."""
75
76 77 -class ConnectionClosedError(ProtocolError):
78 """Connection closed error."""
79
80 81 -class TimeoutError(ProtocolError):
82 """Operation timeout error."""
83
84 85 -class RequestError(ProtocolError):
86 """Error on request. 87 88 This signifies an error in the request format or request handling, 89 but not (e.g.) an error in starting up an instance. 90 91 Some common conditions that can trigger this exception: 92 - job submission failed because the job data was wrong 93 - query failed because required fields were missing 94 95 """
96
97 98 -class NoMasterError(ProtocolError):
99 """The master cannot be reached. 100 101 This means that the master daemon is not running or the socket has 102 been removed. 103 104 """
105
106 107 -class PermissionError(ProtocolError):
108 """Permission denied while connecting to the master socket. 109 110 This means the user doesn't have the proper rights. 111 112 """
113
114 115 -class Transport:
116 """Low-level transport class. 117 118 This is used on the client side. 119 120 This could be replace by any other class that provides the same 121 semantics to the Client. This means: 122 - can send messages and receive messages 123 - safe for multithreading 124 125 """ 126
127 - def __init__(self, address, timeouts=None):
128 """Constructor for the Client class. 129 130 Arguments: 131 - address: a valid address the the used transport class 132 - timeout: a list of timeouts, to be used on connect and read/write 133 134 There are two timeouts used since we might want to wait for a long 135 time for a response, but the connect timeout should be lower. 136 137 If not passed, we use a default of 10 and respectively 60 seconds. 138 139 Note that on reading data, since the timeout applies to an 140 invidual receive, it might be that the total duration is longer 141 than timeout value passed (we make a hard limit at twice the read 142 timeout). 143 144 """ 145 self.address = address 146 if timeouts is None: 147 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 148 else: 149 self._ctimeout, self._rwtimeout = timeouts 150 151 self.socket = None 152 self._buffer = "" 153 self._msgs = collections.deque() 154 155 try: 156 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 157 158 # Try to connect 159 try: 160 utils.Retry(self._Connect, 1.0, self._ctimeout, 161 args=(self.socket, address, self._ctimeout)) 162 except utils.RetryTimeout: 163 raise TimeoutError("Connect timed out") 164 165 self.socket.settimeout(self._rwtimeout) 166 except (socket.error, NoMasterError): 167 if self.socket is not None: 168 self.socket.close() 169 self.socket = None 170 raise
171 172 @staticmethod
173 - def _Connect(sock, address, timeout):
174 sock.settimeout(timeout) 175 try: 176 sock.connect(address) 177 except socket.timeout, err: 178 raise TimeoutError("Connect timed out: %s" % str(err)) 179 except socket.error, err: 180 error_code = err.args[0] 181 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 182 raise NoMasterError(address) 183 elif error_code in (errno.EPERM, errno.EACCES): 184 raise PermissionError(address) 185 elif error_code == errno.EAGAIN: 186 # Server's socket backlog is full at the moment 187 raise utils.RetryAgain() 188 raise
189
190 - def _CheckSocket(self):
191 """Make sure we are connected. 192 193 """ 194 if self.socket is None: 195 raise ProtocolError("Connection is closed")
196
197 - def Send(self, msg):
198 """Send a message. 199 200 This just sends a message and doesn't wait for the response. 201 202 """ 203 if constants.LUXI_EOM in msg: 204 raise ProtocolError("Message terminator found in payload") 205 206 self._CheckSocket() 207 try: 208 # TODO: sendall is not guaranteed to send everything 209 self.socket.sendall(msg + constants.LUXI_EOM) 210 except socket.timeout, err: 211 raise TimeoutError("Sending timeout: %s" % str(err))
212
213 - def Recv(self):
214 """Try to receive a message from the socket. 215 216 In case we already have messages queued, we just return from the 217 queue. Otherwise, we try to read data with a _rwtimeout network 218 timeout, and making sure we don't go over 2x_rwtimeout as a global 219 limit. 220 221 """ 222 self._CheckSocket() 223 etime = time.time() + self._rwtimeout 224 while not self._msgs: 225 if time.time() > etime: 226 raise TimeoutError("Extended receive timeout") 227 while True: 228 try: 229 data = self.socket.recv(4096) 230 except socket.error, err: 231 if err.args and err.args[0] == errno.EAGAIN: 232 continue 233 raise 234 except socket.timeout, err: 235 raise TimeoutError("Receive timeout: %s" % str(err)) 236 break 237 if not data: 238 raise ConnectionClosedError("Connection closed while reading") 239 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 240 self._buffer = new_msgs.pop() 241 self._msgs.extend(new_msgs) 242 return self._msgs.popleft()
243
244 - def Call(self, msg):
245 """Send a message and wait for the response. 246 247 This is just a wrapper over Send and Recv. 248 249 """ 250 self.Send(msg) 251 return self.Recv()
252
253 - def Close(self):
254 """Close the socket""" 255 if self.socket is not None: 256 self.socket.close() 257 self.socket = None
258
259 260 -def ParseRequest(msg):
261 """Parses a LUXI request message. 262 263 """ 264 try: 265 request = serializer.LoadJson(msg) 266 except ValueError, err: 267 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) 268 269 logging.debug("LUXI request: %s", request) 270 271 if not isinstance(request, dict): 272 logging.error("LUXI request not a dict: %r", msg) 273 raise ProtocolError("Invalid LUXI request (not a dict)") 274 275 method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103 276 args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103 277 278 if method is None or args is None: 279 logging.error("LUXI request missing method or arguments: %r", msg) 280 raise ProtocolError(("Invalid LUXI request (no method or arguments" 281 " in request): %r") % msg) 282 283 return (method, args)
284
285 286 -def ParseResponse(msg):
287 """Parses a LUXI response message. 288 289 """ 290 # Parse the result 291 try: 292 data = serializer.LoadJson(msg) 293 except Exception, err: 294 raise ProtocolError("Error while deserializing response: %s" % str(err)) 295 296 # Validate response 297 if not (isinstance(data, dict) and 298 KEY_SUCCESS in data and 299 KEY_RESULT in data): 300 raise ProtocolError("Invalid response from server: %r" % data) 301 302 return (data[KEY_SUCCESS], data[KEY_RESULT])
303
304 305 -def FormatResponse(success, result):
306 """Formats a LUXI response message. 307 308 """ 309 response = { 310 KEY_SUCCESS: success, 311 KEY_RESULT: result, 312 } 313 314 logging.debug("LUXI response: %s", response) 315 316 return serializer.DumpJson(response)
317
318 319 -def FormatRequest(method, args):
320 """Formats a LUXI request message. 321 322 """ 323 # Build request 324 request = { 325 KEY_METHOD: method, 326 KEY_ARGS: args, 327 } 328 329 # Serialize the request 330 return serializer.DumpJson(request, indent=False)
331
332 333 -def CallLuxiMethod(transport_cb, method, args):
334 """Send a LUXI request via a transport and return the response. 335 336 """ 337 assert callable(transport_cb) 338 339 request_msg = FormatRequest(method, args) 340 341 # Send request and wait for response 342 response_msg = transport_cb(request_msg) 343 344 (success, result) = ParseResponse(response_msg) 345 346 if success: 347 return result 348 349 errors.MaybeRaise(result) 350 raise RequestError(result)
351
352 353 -class Client(object):
354 """High-level client implementation. 355 356 This uses a backing Transport-like class on top of which it 357 implements data serialization/deserialization. 358 359 """
360 - def __init__(self, address=None, timeouts=None, transport=Transport):
361 """Constructor for the Client class. 362 363 Arguments: 364 - address: a valid address the the used transport class 365 - timeout: a list of timeouts, to be used on connect and read/write 366 - transport: a Transport-like class 367 368 369 If timeout is not passed, the default timeouts of the transport 370 class are used. 371 372 """ 373 if address is None: 374 address = constants.MASTER_SOCKET 375 self.address = address 376 self.timeouts = timeouts 377 self.transport_class = transport 378 self.transport = None 379 self._InitTransport()
380
381 - def _InitTransport(self):
382 """(Re)initialize the transport if needed. 383 384 """ 385 if self.transport is None: 386 self.transport = self.transport_class(self.address, 387 timeouts=self.timeouts)
388
389 - def _CloseTransport(self):
390 """Close the transport, ignoring errors. 391 392 """ 393 if self.transport is None: 394 return 395 try: 396 old_transp = self.transport 397 self.transport = None 398 old_transp.Close() 399 except Exception: # pylint: disable-msg=W0703 400 pass
401
402 - def _SendMethodCall(self, data):
403 # Send request and wait for response 404 try: 405 self._InitTransport() 406 return self.transport.Call(data) 407 except Exception: 408 self._CloseTransport() 409 raise
410
411 - def CallMethod(self, method, args):
412 """Send a generic request and return the response. 413 414 """ 415 return CallLuxiMethod(self._SendMethodCall, method, args)
416
417 - def SetQueueDrainFlag(self, drain_flag):
418 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
419
420 - def SetWatcherPause(self, until):
421 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
422
423 - def SubmitJob(self, ops):
424 ops_state = map(lambda op: op.__getstate__(), ops) 425 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
426
427 - def SubmitManyJobs(self, jobs):
428 jobs_state = [] 429 for ops in jobs: 430 jobs_state.append([op.__getstate__() for op in ops]) 431 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
432
433 - def CancelJob(self, job_id):
434 return self.CallMethod(REQ_CANCEL_JOB, job_id)
435
436 - def ArchiveJob(self, job_id):
437 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
438
439 - def AutoArchiveJobs(self, age):
440 timeout = (DEF_RWTO - 1) / 2 441 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
442
443 - def WaitForJobChangeOnce(self, job_id, fields, 444 prev_job_info, prev_log_serial, 445 timeout=WFJC_TIMEOUT):
446 """Waits for changes on a job. 447 448 @param job_id: Job ID 449 @type fields: list 450 @param fields: List of field names to be observed 451 @type prev_job_info: None or list 452 @param prev_job_info: Previously received job information 453 @type prev_log_serial: None or int/long 454 @param prev_log_serial: Highest log serial number previously received 455 @type timeout: int/float 456 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 457 be capped to that value) 458 459 """ 460 assert timeout >= 0, "Timeout can not be negative" 461 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 462 (job_id, fields, prev_job_info, 463 prev_log_serial, 464 min(WFJC_TIMEOUT, timeout)))
465
466 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
467 while True: 468 result = self.WaitForJobChangeOnce(job_id, fields, 469 prev_job_info, prev_log_serial) 470 if result != constants.JOB_NOTCHANGED: 471 break 472 return result
473
474 - def QueryJobs(self, job_ids, fields):
475 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
476
477 - def QueryInstances(self, names, fields, use_locking):
478 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
479
480 - def QueryNodes(self, names, fields, use_locking):
481 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
482
483 - def QueryExports(self, nodes, use_locking):
484 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
485
486 - def QueryClusterInfo(self):
487 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
488
489 - def QueryConfigValues(self, fields):
490 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
491
492 - def QueryTags(self, kind, name):
493 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
494
495 - def QueryLocks(self, fields, sync):
496 return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))
497