1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module for generic RPC clients.
32
33 """
34
35 import logging
36 import time
37
38 import ganeti.rpc.transport as t
39
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti.rpc.errors import (ProtocolError, RequestError, LuxiError)
43 from ganeti import serializer
44
45 KEY_METHOD = constants.LUXI_KEY_METHOD
46 KEY_ARGS = constants.LUXI_KEY_ARGS
47 KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
48 KEY_RESULT = constants.LUXI_KEY_RESULT
49 KEY_VERSION = constants.LUXI_KEY_VERSION
50
51
53 """Parses a request message.
54
55 """
56 try:
57 request = serializer.LoadJson(msg)
58 except ValueError, err:
59 raise ProtocolError("Invalid RPC request (parsing error): %s" % err)
60
61 logging.debug("RPC request: %s", request)
62
63 if not isinstance(request, dict):
64 logging.error("RPC request not a dict: %r", msg)
65 raise ProtocolError("Invalid RPC request (not a dict)")
66
67 method = request.get(KEY_METHOD, None)
68 args = request.get(KEY_ARGS, None)
69 version = request.get(KEY_VERSION, None)
70
71 if method is None or args is None:
72 logging.error("RPC request missing method or arguments: %r", msg)
73 raise ProtocolError(("Invalid RPC request (no method or arguments"
74 " in request): %r") % msg)
75
76 return (method, args, version)
77
78
99
100
116
117
134
135
137 """Send a RPC request via a transport and return the response.
138
139 """
140 assert callable(transport_cb)
141 t1 = time.time()*1000
142 request_msg = FormatRequest(method, args, version=version)
143 t2 = time.time()*1000
144
145 response_msg = transport_cb(request_msg)
146 t3 = time.time()*1000
147 (success, result, resp_version) = ParseResponse(response_msg)
148 t4 = time.time()*1000
149 logging.info("CallRPCMethod %s: format: %dms, sock: %dms, "
150 "parse: %dms", method, int(t2-t1), int(t3-t2), int(t4-t3))
151
152 if resp_version is not None and resp_version != version:
153 raise LuxiError("RPC version mismatch, client %s, response %s" %
154 (version, resp_version))
155
156 if success:
157 return result
158
159 errors.MaybeRaise(result)
160 raise RequestError(result)
161
162
164 """High-level client abstraction.
165
166 This uses a backing Transport-like class on top of which it
167 implements data serialization/deserialization.
168
169 """
170
171 - def __init__(self, timeouts=None, transport=t.Transport,
172 allow_non_master=False):
173 """Constructor for the Client class.
174
175 If timeout is not passed, the default timeouts of the transport
176 class are used.
177
178 @type timeouts: list of ints
179 @param timeouts: timeouts to be used on connect and read/write
180 @type transport: L{Transport} or another compatible class
181 @param transport: the underlying transport to use for the RPC calls
182 @type allow_non_master: bool
183 @param allow_non_master: skip checks for the master node on errors
184
185 """
186 self.timeouts = timeouts
187 self.transport_class = transport
188 self.allow_non_master = allow_non_master
189 self.transport = None
190
191 self.version = None
192
194 """Returns the socket address
195
196 """
197 raise NotImplementedError
198
200 """(Re)initialize the transport if needed.
201
202 """
203 if self.transport is None:
204 self.transport = \
205 self.transport_class(self._GetAddress(),
206 timeouts=self.timeouts,
207 allow_non_master=self.allow_non_master)
208
210 """Close the transport, ignoring errors.
211
212 """
213 if self.transport is None:
214 return
215 try:
216 old_transp = self.transport
217 self.transport = None
218 old_transp.Close()
219 except Exception:
220 pass
221
223
224 def send(try_no):
225 if try_no:
226 logging.debug("RPC peer disconnected, retrying")
227 self._InitTransport()
228 return self.transport.Call(data)
229 return t.Transport.RetryOnNetworkError(send,
230 lambda _: self._CloseTransport())
231
233 """Close the underlying connection.
234
235 """
236 self._CloseTransport()
237
239 """Same as L{Close}, to be used with contextlib.closing(...).
240
241 """
242 self.Close()
243
253
254
256 """An abstract Client that connects a generated stub client to a L{Transport}.
257
258 Subclasses should inherit from this class (first) as well and a designated
259 stub (second).
260 """
261
262 - def __init__(self, timeouts=None, transport=t.Transport,
263 allow_non_master=None):
264 """Constructor for the class.
265
266 Arguments are the same as for L{AbstractClient}. Checks that SOCKET_PATH
267 attribute is defined (in the stub class).
268
269 @type timeouts: list of ints
270 @param timeouts: timeouts to be used on connect and read/write
271 @type transport: L{Transport} or another compatible class
272 @param transport: the underlying transport to use for the RPC calls
273 @type allow_non_master: bool
274 @param allow_non_master: skip checks for the master node on errors
275 """
276
277 super(AbstractStubClient, self).__init__(timeouts=timeouts,
278 transport=transport,
279 allow_non_master=allow_non_master)
280
283
286