Package ganeti :: Package rpc :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module for generic RPC clients. 
 32   
 33  """ 
 34   
 35  import logging 
 36   
 37  import ganeti.rpc.transport as t 
 38   
 39  from ganeti import constants 
 40  from ganeti import errors 
 41  from ganeti.rpc.errors import (ProtocolError, RequestError, LuxiError) 
 42  from ganeti import serializer 
 43   
 44  KEY_METHOD = constants.LUXI_KEY_METHOD 
 45  KEY_ARGS = constants.LUXI_KEY_ARGS 
 46  KEY_SUCCESS = constants.LUXI_KEY_SUCCESS 
 47  KEY_RESULT = constants.LUXI_KEY_RESULT 
 48  KEY_VERSION = constants.LUXI_KEY_VERSION 
 49   
 50   
51 -def ParseRequest(msg):
52 """Parses a request message. 53 54 """ 55 try: 56 request = serializer.LoadJson(msg) 57 except ValueError, err: 58 raise ProtocolError("Invalid RPC request (parsing error): %s" % err) 59 60 logging.debug("RPC request: %s", request) 61 62 if not isinstance(request, dict): 63 logging.error("RPC request not a dict: %r", msg) 64 raise ProtocolError("Invalid RPC request (not a dict)") 65 66 method = request.get(KEY_METHOD, None) # pylint: disable=E1103 67 args = request.get(KEY_ARGS, None) # pylint: disable=E1103 68 version = request.get(KEY_VERSION, None) # pylint: disable=E1103 69 70 if method is None or args is None: 71 logging.error("RPC request missing method or arguments: %r", msg) 72 raise ProtocolError(("Invalid RPC request (no method or arguments" 73 " in request): %r") % msg) 74 75 return (method, args, version)
76 77
78 -def ParseResponse(msg):
79 """Parses a response message. 80 81 """ 82 # Parse the result 83 try: 84 data = serializer.LoadJson(msg) 85 except KeyboardInterrupt: 86 raise 87 except Exception, err: 88 raise ProtocolError("Error while deserializing response: %s" % str(err)) 89 90 # Validate response 91 if not (isinstance(data, dict) and 92 KEY_SUCCESS in data and 93 KEY_RESULT in data): 94 raise ProtocolError("Invalid response from server: %r" % data) 95 96 return (data[KEY_SUCCESS], data[KEY_RESULT], 97 data.get(KEY_VERSION, None)) # pylint: disable=E1103
98 99
100 -def FormatResponse(success, result, version=None):
101 """Formats a response message. 102 103 """ 104 response = { 105 KEY_SUCCESS: success, 106 KEY_RESULT: result, 107 } 108 109 if version is not None: 110 response[KEY_VERSION] = version 111 112 logging.debug("RPC response: %s", response) 113 114 return serializer.DumpJson(response)
115 116
117 -def FormatRequest(method, args, version=None):
118 """Formats a request message. 119 120 """ 121 # Build request 122 request = { 123 KEY_METHOD: method, 124 KEY_ARGS: args, 125 } 126 127 if version is not None: 128 request[KEY_VERSION] = version 129 130 # Serialize the request 131 return serializer.DumpJson(request, 132 private_encoder=serializer.EncodeWithPrivateFields)
133 134
135 -def CallRPCMethod(transport_cb, method, args, version=None):
136 """Send a RPC request via a transport and return the response. 137 138 """ 139 assert callable(transport_cb) 140 141 request_msg = FormatRequest(method, args, version=version) 142 143 # Send request and wait for response 144 response_msg = transport_cb(request_msg) 145 146 (success, result, resp_version) = ParseResponse(response_msg) 147 148 # Verify version if there was one in the response 149 if resp_version is not None and resp_version != version: 150 raise LuxiError("RPC version mismatch, client %s, response %s" % 151 (version, resp_version)) 152 153 if success: 154 return result 155 156 errors.MaybeRaise(result) 157 raise RequestError(result)
158 159
160 -class AbstractClient(object):
161 """High-level client abstraction. 162 163 This uses a backing Transport-like class on top of which it 164 implements data serialization/deserialization. 165 166 """ 167
168 - def __init__(self, timeouts=None, transport=t.Transport, 169 allow_non_master=False):
170 """Constructor for the Client class. 171 172 If timeout is not passed, the default timeouts of the transport 173 class are used. 174 175 @type timeouts: list of ints 176 @param timeouts: timeouts to be used on connect and read/write 177 @type transport: L{Transport} or another compatible class 178 @param transport: the underlying transport to use for the RPC calls 179 @type allow_non_master: bool 180 @param allow_non_master: skip checks for the master node on errors 181 182 """ 183 self.timeouts = timeouts 184 self.transport_class = transport 185 self.allow_non_master = allow_non_master 186 self.transport = None 187 # The version used in RPC communication, by default unused: 188 self.version = None
189
190 - def _GetAddress(self):
191 """Returns the socket address 192 193 """ 194 raise NotImplementedError
195
196 - def _InitTransport(self):
197 """(Re)initialize the transport if needed. 198 199 """ 200 if self.transport is None: 201 self.transport = \ 202 self.transport_class(self._GetAddress(), 203 timeouts=self.timeouts, 204 allow_non_master=self.allow_non_master)
205
206 - def _CloseTransport(self):
207 """Close the transport, ignoring errors. 208 209 """ 210 if self.transport is None: 211 return 212 try: 213 old_transp = self.transport 214 self.transport = None 215 old_transp.Close() 216 except Exception: # pylint: disable=W0703 217 pass
218
219 - def _SendMethodCall(self, data):
220 # Send request and wait for response 221 def send(try_no): 222 if try_no: 223 logging.debug("RPC peer disconnected, retrying") 224 self._InitTransport() 225 return self.transport.Call(data)
226 return t.Transport.RetryOnNetworkError(send, 227 lambda _: self._CloseTransport())
228
229 - def Close(self):
230 """Close the underlying connection. 231 232 """ 233 self._CloseTransport()
234
235 - def close(self):
236 """Same as L{Close}, to be used with contextlib.closing(...). 237 238 """ 239 self.Close()
240
241 - def CallMethod(self, method, args):
242 """Send a generic request and return the response. 243 244 """ 245 if not isinstance(args, (list, tuple)): 246 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:" 247 " expected list, got %s" % type(args)) 248 return CallRPCMethod(self._SendMethodCall, method, args, 249 version=self.version)
250 251
252 -class AbstractStubClient(AbstractClient):
253 """An abstract Client that connects a generated stub client to a L{Transport}. 254 255 Subclasses should inherit from this class (first) as well and a designated 256 stub (second). 257 """ 258
259 - def __init__(self, timeouts=None, transport=t.Transport, 260 allow_non_master=None):
261 """Constructor for the class. 262 263 Arguments are the same as for L{AbstractClient}. Checks that SOCKET_PATH 264 attribute is defined (in the stub class). 265 266 @type timeouts: list of ints 267 @param timeouts: timeouts to be used on connect and read/write 268 @type transport: L{Transport} or another compatible class 269 @param transport: the underlying transport to use for the RPC calls 270 @type allow_non_master: bool 271 @param allow_non_master: skip checks for the master node on errors 272 """ 273 274 super(AbstractStubClient, self).__init__(timeouts=timeouts, 275 transport=transport, 276 allow_non_master=allow_non_master)
277
278 - def _GenericInvoke(self, method, *args):
279 return self.CallMethod(method, args)
280
281 - def _GetAddress(self):
282 return self._GetSocketPath() # pylint: disable=E1101
283