Package ganeti :: Package rpc :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module for generic RPC clients. 
 32   
 33  """ 
 34   
 35  import logging 
 36   
 37  from ganeti import pathutils 
 38  import ganeti.rpc.transport as t 
 39   
 40  from ganeti import constants 
 41  from ganeti import errors 
 42  from ganeti.rpc.errors import (ProtocolError, RequestError, LuxiError) 
 43  from ganeti import serializer 
 44   
 45  KEY_METHOD = constants.LUXI_KEY_METHOD 
 46  KEY_ARGS = constants.LUXI_KEY_ARGS 
 47  KEY_SUCCESS = constants.LUXI_KEY_SUCCESS 
 48  KEY_RESULT = constants.LUXI_KEY_RESULT 
 49  KEY_VERSION = constants.LUXI_KEY_VERSION 
 50   
 51   
52 -def ParseRequest(msg):
53 """Parses a request message. 54 55 """ 56 try: 57 request = serializer.LoadJson(msg) 58 except ValueError, err: 59 raise ProtocolError("Invalid RPC request (parsing error): %s" % err) 60 61 logging.debug("RPC request: %s", request) 62 63 if not isinstance(request, dict): 64 logging.error("RPC request not a dict: %r", msg) 65 raise ProtocolError("Invalid RPC request (not a dict)") 66 67 method = request.get(KEY_METHOD, None) # pylint: disable=E1103 68 args = request.get(KEY_ARGS, None) # pylint: disable=E1103 69 version = request.get(KEY_VERSION, None) # pylint: disable=E1103 70 71 if method is None or args is None: 72 logging.error("RPC request missing method or arguments: %r", msg) 73 raise ProtocolError(("Invalid RPC request (no method or arguments" 74 " in request): %r") % msg) 75 76 return (method, args, version)
77 78
79 -def ParseResponse(msg):
80 """Parses a response message. 81 82 """ 83 # Parse the result 84 try: 85 data = serializer.LoadJson(msg) 86 except KeyboardInterrupt: 87 raise 88 except Exception, err: 89 raise ProtocolError("Error while deserializing response: %s" % str(err)) 90 91 # Validate response 92 if not (isinstance(data, dict) and 93 KEY_SUCCESS in data and 94 KEY_RESULT in data): 95 raise ProtocolError("Invalid response from server: %r" % data) 96 97 return (data[KEY_SUCCESS], data[KEY_RESULT], 98 data.get(KEY_VERSION, None)) # pylint: disable=E1103
99 100
101 -def FormatResponse(success, result, version=None):
102 """Formats a response message. 103 104 """ 105 response = { 106 KEY_SUCCESS: success, 107 KEY_RESULT: result, 108 } 109 110 if version is not None: 111 response[KEY_VERSION] = version 112 113 logging.debug("RPC response: %s", response) 114 115 return serializer.DumpJson(response)
116 117
118 -def FormatRequest(method, args, version=None):
119 """Formats a request message. 120 121 """ 122 # Build request 123 request = { 124 KEY_METHOD: method, 125 KEY_ARGS: args, 126 } 127 128 if version is not None: 129 request[KEY_VERSION] = version 130 131 # Serialize the request 132 return serializer.DumpJson(request)
133 134
135 -def CallRPCMethod(transport_cb, method, args, version=None):
136 """Send a RPC request via a transport and return the response. 137 138 """ 139 assert callable(transport_cb) 140 141 request_msg = FormatRequest(method, args, version=version) 142 143 # Send request and wait for response 144 response_msg = transport_cb(request_msg) 145 146 (success, result, resp_version) = ParseResponse(response_msg) 147 148 # Verify version if there was one in the response 149 if resp_version is not None and resp_version != version: 150 raise LuxiError("RPC version mismatch, client %s, response %s" % 151 (version, resp_version)) 152 153 if success: 154 return result 155 156 errors.MaybeRaise(result) 157 raise RequestError(result)
158 159
160 -class AbstractClient(object):
161 """High-level client abstraction. 162 163 This uses a backing Transport-like class on top of which it 164 implements data serialization/deserialization. 165 166 """ 167
168 - def __init__(self, address=None, timeouts=None, 169 transport=t.Transport):
170 """Constructor for the Client class. 171 172 Arguments: 173 - address: a valid address the the used transport class 174 - timeout: a list of timeouts, to be used on connect and read/write 175 - transport: a Transport-like class 176 177 178 If timeout is not passed, the default timeouts of the transport 179 class are used. 180 181 """ 182 if address is None: 183 address = pathutils.MASTER_SOCKET 184 self.address = address 185 self.timeouts = timeouts 186 self.transport_class = transport 187 self.transport = None 188 self._InitTransport() 189 # The version used in RPC communication, by default unused: 190 self.version = None
191
192 - def _InitTransport(self):
193 """(Re)initialize the transport if needed. 194 195 """ 196 if self.transport is None: 197 self.transport = self.transport_class(self.address, 198 timeouts=self.timeouts)
199
200 - def _CloseTransport(self):
201 """Close the transport, ignoring errors. 202 203 """ 204 if self.transport is None: 205 return 206 try: 207 old_transp = self.transport 208 self.transport = None 209 old_transp.Close() 210 except Exception: # pylint: disable=W0703 211 pass
212
213 - def _SendMethodCall(self, data):
214 # Send request and wait for response 215 try: 216 self._InitTransport() 217 return self.transport.Call(data) 218 except Exception: 219 self._CloseTransport() 220 raise
221
222 - def Close(self):
223 """Close the underlying connection. 224 225 """ 226 self._CloseTransport()
227
228 - def close(self):
229 """Same as L{Close}, to be used with contextlib.closing(...). 230 231 """ 232 self.Close()
233
234 - def CallMethod(self, method, args):
235 """Send a generic request and return the response. 236 237 """ 238 if not isinstance(args, (list, tuple)): 239 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:" 240 " expected list, got %s" % type(args)) 241 return CallRPCMethod(self._SendMethodCall, method, args, 242 version=self.version)
243