1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module for generic RPC clients.
32
33 """
34
35 import logging
36
37 import ganeti.rpc.transport as t
38
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti.rpc.errors import (ProtocolError, RequestError, LuxiError)
42 from ganeti import serializer
43
44 KEY_METHOD = constants.LUXI_KEY_METHOD
45 KEY_ARGS = constants.LUXI_KEY_ARGS
46 KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
47 KEY_RESULT = constants.LUXI_KEY_RESULT
48 KEY_VERSION = constants.LUXI_KEY_VERSION
49
50
52 """Parses a request message.
53
54 """
55 try:
56 request = serializer.LoadJson(msg)
57 except ValueError, err:
58 raise ProtocolError("Invalid RPC request (parsing error): %s" % err)
59
60 logging.debug("RPC request: %s", request)
61
62 if not isinstance(request, dict):
63 logging.error("RPC request not a dict: %r", msg)
64 raise ProtocolError("Invalid RPC request (not a dict)")
65
66 method = request.get(KEY_METHOD, None)
67 args = request.get(KEY_ARGS, None)
68 version = request.get(KEY_VERSION, None)
69
70 if method is None or args is None:
71 logging.error("RPC request missing method or arguments: %r", msg)
72 raise ProtocolError(("Invalid RPC request (no method or arguments"
73 " in request): %r") % msg)
74
75 return (method, args, version)
76
77
98
99
115
116
133
134
136 """Send a RPC request via a transport and return the response.
137
138 """
139 assert callable(transport_cb)
140
141 request_msg = FormatRequest(method, args, version=version)
142
143
144 response_msg = transport_cb(request_msg)
145
146 (success, result, resp_version) = ParseResponse(response_msg)
147
148
149 if resp_version is not None and resp_version != version:
150 raise LuxiError("RPC version mismatch, client %s, response %s" %
151 (version, resp_version))
152
153 if success:
154 return result
155
156 errors.MaybeRaise(result)
157 raise RequestError(result)
158
159
161 """High-level client abstraction.
162
163 This uses a backing Transport-like class on top of which it
164 implements data serialization/deserialization.
165
166 """
167
168 - def __init__(self, timeouts=None, transport=t.Transport,
169 allow_non_master=False):
170 """Constructor for the Client class.
171
172 If timeout is not passed, the default timeouts of the transport
173 class are used.
174
175 @type timeouts: list of ints
176 @param timeouts: timeouts to be used on connect and read/write
177 @type transport: L{Transport} or another compatible class
178 @param transport: the underlying transport to use for the RPC calls
179 @type allow_non_master: bool
180 @param allow_non_master: skip checks for the master node on errors
181
182 """
183 self.timeouts = timeouts
184 self.transport_class = transport
185 self.allow_non_master = allow_non_master
186 self.transport = None
187
188 self.version = None
189
191 """Returns the socket address
192
193 """
194 raise NotImplementedError
195
197 """(Re)initialize the transport if needed.
198
199 """
200 if self.transport is None:
201 self.transport = \
202 self.transport_class(self._GetAddress(),
203 timeouts=self.timeouts,
204 allow_non_master=self.allow_non_master)
205
207 """Close the transport, ignoring errors.
208
209 """
210 if self.transport is None:
211 return
212 try:
213 old_transp = self.transport
214 self.transport = None
215 old_transp.Close()
216 except Exception:
217 pass
218
220
221 def send(try_no):
222 if try_no:
223 logging.debug("RPC peer disconnected, retrying")
224 self._InitTransport()
225 return self.transport.Call(data)
226 return t.Transport.RetryOnNetworkError(send,
227 lambda _: self._CloseTransport())
228
230 """Close the underlying connection.
231
232 """
233 self._CloseTransport()
234
236 """Same as L{Close}, to be used with contextlib.closing(...).
237
238 """
239 self.Close()
240
250
251
253 """An abstract Client that connects a generated stub client to a L{Transport}.
254
255 Subclasses should inherit from this class (first) as well and a designated
256 stub (second).
257 """
258
259 - def __init__(self, timeouts=None, transport=t.Transport,
260 allow_non_master=None):
261 """Constructor for the class.
262
263 Arguments are the same as for L{AbstractClient}. Checks that SOCKET_PATH
264 attribute is defined (in the stub class).
265
266 @type timeouts: list of ints
267 @param timeouts: timeouts to be used on connect and read/write
268 @type transport: L{Transport} or another compatible class
269 @param transport: the underlying transport to use for the RPC calls
270 @type allow_non_master: bool
271 @param allow_non_master: skip checks for the master node on errors
272 """
273
274 super(AbstractStubClient, self).__init__(timeouts=timeouts,
275 transport=transport,
276 allow_non_master=allow_non_master)
277
280
283