1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module for generic RPC clients.
32
33 """
34
35 import logging
36
37 from ganeti import pathutils
38 import ganeti.rpc.transport as t
39
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti.rpc.errors import (ProtocolError, RequestError, LuxiError)
43 from ganeti import serializer
44
45 KEY_METHOD = constants.LUXI_KEY_METHOD
46 KEY_ARGS = constants.LUXI_KEY_ARGS
47 KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
48 KEY_RESULT = constants.LUXI_KEY_RESULT
49 KEY_VERSION = constants.LUXI_KEY_VERSION
50
51
53 """Parses a request message.
54
55 """
56 try:
57 request = serializer.LoadJson(msg)
58 except ValueError, err:
59 raise ProtocolError("Invalid RPC request (parsing error): %s" % err)
60
61 logging.debug("RPC request: %s", request)
62
63 if not isinstance(request, dict):
64 logging.error("RPC request not a dict: %r", msg)
65 raise ProtocolError("Invalid RPC request (not a dict)")
66
67 method = request.get(KEY_METHOD, None)
68 args = request.get(KEY_ARGS, None)
69 version = request.get(KEY_VERSION, None)
70
71 if method is None or args is None:
72 logging.error("RPC request missing method or arguments: %r", msg)
73 raise ProtocolError(("Invalid RPC request (no method or arguments"
74 " in request): %r") % msg)
75
76 return (method, args, version)
77
78
99
100
116
117
133
134
136 """Send a RPC request via a transport and return the response.
137
138 """
139 assert callable(transport_cb)
140
141 request_msg = FormatRequest(method, args, version=version)
142
143
144 response_msg = transport_cb(request_msg)
145
146 (success, result, resp_version) = ParseResponse(response_msg)
147
148
149 if resp_version is not None and resp_version != version:
150 raise LuxiError("RPC version mismatch, client %s, response %s" %
151 (version, resp_version))
152
153 if success:
154 return result
155
156 errors.MaybeRaise(result)
157 raise RequestError(result)
158
159
161 """High-level client abstraction.
162
163 This uses a backing Transport-like class on top of which it
164 implements data serialization/deserialization.
165
166 """
167
170 """Constructor for the Client class.
171
172 Arguments:
173 - address: a valid address the the used transport class
174 - timeout: a list of timeouts, to be used on connect and read/write
175 - transport: a Transport-like class
176
177
178 If timeout is not passed, the default timeouts of the transport
179 class are used.
180
181 """
182 if address is None:
183 address = pathutils.MASTER_SOCKET
184 self.address = address
185 self.timeouts = timeouts
186 self.transport_class = transport
187 self.transport = None
188 self._InitTransport()
189
190 self.version = None
191
193 """(Re)initialize the transport if needed.
194
195 """
196 if self.transport is None:
197 self.transport = self.transport_class(self.address,
198 timeouts=self.timeouts)
199
201 """Close the transport, ignoring errors.
202
203 """
204 if self.transport is None:
205 return
206 try:
207 old_transp = self.transport
208 self.transport = None
209 old_transp.Close()
210 except Exception:
211 pass
212
221
223 """Close the underlying connection.
224
225 """
226 self._CloseTransport()
227
229 """Same as L{Close}, to be used with contextlib.closing(...).
230
231 """
232 self.Close()
233
243