Package ganeti :: Package rpc :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module for generic RPC clients. 
 32   
 33  """ 
 34   
 35  import logging 
 36  import time 
 37   
 38  import ganeti.rpc.transport as t 
 39   
 40  from ganeti import constants 
 41  from ganeti import errors 
 42  from ganeti.rpc.errors import (ProtocolError, RequestError, LuxiError) 
 43  from ganeti import serializer 
 44   
 45  KEY_METHOD = constants.LUXI_KEY_METHOD 
 46  KEY_ARGS = constants.LUXI_KEY_ARGS 
 47  KEY_SUCCESS = constants.LUXI_KEY_SUCCESS 
 48  KEY_RESULT = constants.LUXI_KEY_RESULT 
 49  KEY_VERSION = constants.LUXI_KEY_VERSION 
 50   
 51   
52 -def ParseRequest(msg):
53 """Parses a request message. 54 55 """ 56 try: 57 request = serializer.LoadJson(msg) 58 except ValueError, err: 59 raise ProtocolError("Invalid RPC request (parsing error): %s" % err) 60 61 logging.debug("RPC request: %s", request) 62 63 if not isinstance(request, dict): 64 logging.error("RPC request not a dict: %r", msg) 65 raise ProtocolError("Invalid RPC request (not a dict)") 66 67 method = request.get(KEY_METHOD, None) # pylint: disable=E1103 68 args = request.get(KEY_ARGS, None) # pylint: disable=E1103 69 version = request.get(KEY_VERSION, None) # pylint: disable=E1103 70 71 if method is None or args is None: 72 logging.error("RPC request missing method or arguments: %r", msg) 73 raise ProtocolError(("Invalid RPC request (no method or arguments" 74 " in request): %r") % msg) 75 76 return (method, args, version)
77 78
79 -def ParseResponse(msg):
80 """Parses a response message. 81 82 """ 83 # Parse the result 84 try: 85 data = serializer.LoadJson(msg) 86 except KeyboardInterrupt: 87 raise 88 except Exception, err: 89 raise ProtocolError("Error while deserializing response: %s" % str(err)) 90 91 # Validate response 92 if not (isinstance(data, dict) and 93 KEY_SUCCESS in data and 94 KEY_RESULT in data): 95 raise ProtocolError("Invalid response from server: %r" % data) 96 97 return (data[KEY_SUCCESS], data[KEY_RESULT], 98 data.get(KEY_VERSION, None)) # pylint: disable=E1103
99 100
101 -def FormatResponse(success, result, version=None):
102 """Formats a response message. 103 104 """ 105 response = { 106 KEY_SUCCESS: success, 107 KEY_RESULT: result, 108 } 109 110 if version is not None: 111 response[KEY_VERSION] = version 112 113 logging.debug("RPC response: %s", response) 114 115 return serializer.DumpJson(response)
116 117
118 -def FormatRequest(method, args, version=None):
119 """Formats a request message. 120 121 """ 122 # Build request 123 request = { 124 KEY_METHOD: method, 125 KEY_ARGS: args, 126 } 127 128 if version is not None: 129 request[KEY_VERSION] = version 130 131 # Serialize the request 132 return serializer.DumpJson(request, 133 private_encoder=serializer.EncodeWithPrivateFields)
134 135
136 -def CallRPCMethod(transport_cb, method, args, version=None):
137 """Send a RPC request via a transport and return the response. 138 139 """ 140 assert callable(transport_cb) 141 t1 = time.time() * 1000 142 request_msg = FormatRequest(method, args, version=version) 143 t2 = time.time() * 1000 144 # Send request and wait for response 145 response_msg = transport_cb(request_msg) 146 t3 = time.time() * 1000 147 (success, result, resp_version) = ParseResponse(response_msg) 148 t4 = time.time() * 1000 149 logging.info("CallRPCMethod %s: format: %dms, sock: %dms, parse: %dms", 150 method, int(t2 - t1), int(t3 - t2), int(t4 - t3)) 151 # Verify version if there was one in the response 152 if resp_version is not None and resp_version != version: 153 raise LuxiError("RPC version mismatch, client %s, response %s" % 154 (version, resp_version)) 155 156 if success: 157 return result 158 159 errors.MaybeRaise(result) 160 raise RequestError(result)
161 162
163 -class AbstractClient(object):
164 """High-level client abstraction. 165 166 This uses a backing Transport-like class on top of which it 167 implements data serialization/deserialization. 168 169 """ 170
171 - def __init__(self, timeouts=None, transport=t.Transport, 172 allow_non_master=False):
173 """Constructor for the Client class. 174 175 If timeout is not passed, the default timeouts of the transport 176 class are used. 177 178 @type timeouts: list of ints 179 @param timeouts: timeouts to be used on connect and read/write 180 @type transport: L{Transport} or another compatible class 181 @param transport: the underlying transport to use for the RPC calls 182 @type allow_non_master: bool 183 @param allow_non_master: skip checks for the master node on errors 184 185 """ 186 self.timeouts = timeouts 187 self.transport_class = transport 188 self.allow_non_master = allow_non_master 189 self.transport = None 190 # The version used in RPC communication, by default unused: 191 self.version = None
192
193 - def _GetAddress(self):
194 """Returns the socket address 195 196 """ 197 raise NotImplementedError
198
199 - def _InitTransport(self):
200 """(Re)initialize the transport if needed. 201 202 """ 203 if self.transport is None: 204 self.transport = \ 205 self.transport_class(self._GetAddress(), 206 timeouts=self.timeouts, 207 allow_non_master=self.allow_non_master)
208
209 - def _CloseTransport(self):
210 """Close the transport, ignoring errors. 211 212 """ 213 if self.transport is None: 214 return 215 try: 216 old_transp = self.transport 217 self.transport = None 218 old_transp.Close() 219 except Exception: # pylint: disable=W0703 220 pass
221
222 - def _SendMethodCall(self, data):
223 # Send request and wait for response 224 def send(try_no): 225 if try_no: 226 logging.debug("RPC peer disconnected, retrying") 227 self._InitTransport() 228 return self.transport.Call(data)
229 return t.Transport.RetryOnNetworkError(send, 230 lambda _: self._CloseTransport())
231
232 - def Close(self):
233 """Close the underlying connection. 234 235 """ 236 self._CloseTransport()
237
238 - def close(self):
239 """Same as L{Close}, to be used with contextlib.closing(...). 240 241 """ 242 self.Close()
243
244 - def CallMethod(self, method, args):
245 """Send a generic request and return the response. 246 247 """ 248 if not isinstance(args, (list, tuple)): 249 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:" 250 " expected list, got %s" % type(args)) 251 return CallRPCMethod(self._SendMethodCall, method, args, 252 version=self.version)
253 254
255 -class AbstractStubClient(AbstractClient):
256 """An abstract Client that connects a generated stub client to a L{Transport}. 257 258 Subclasses should inherit from this class (first) as well and a designated 259 stub (second). 260 """ 261
262 - def __init__(self, timeouts=None, transport=t.Transport, 263 allow_non_master=None):
264 """Constructor for the class. 265 266 Arguments are the same as for L{AbstractClient}. Checks that SOCKET_PATH 267 attribute is defined (in the stub class). 268 269 @type timeouts: list of ints 270 @param timeouts: timeouts to be used on connect and read/write 271 @type transport: L{Transport} or another compatible class 272 @param transport: the underlying transport to use for the RPC calls 273 @type allow_non_master: bool 274 @param allow_non_master: skip checks for the master node on errors 275 """ 276 277 super(AbstractStubClient, self).__init__(timeouts=timeouts, 278 transport=transport, 279 allow_non_master=allow_non_master)
280
281 - def _GenericInvoke(self, method, *args):
282 return self.CallMethod(method, args)
283
284 - def _GetAddress(self):
285 return self._GetSocketPath() # pylint: disable=E1101
286