1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module that defines a transport for RPC connections.
32
33 A transport can send to and receive messages from some endpoint.
34
35 """
36
37 import collections
38 import errno
39 import io
40 import logging
41 import socket
42 import time
43
44 from ganeti import constants
45 import ganeti.errors
46 from ganeti import ssconf
47 from ganeti import utils
48 from ganeti.rpc import errors
49
50
51 DEF_CTMO = constants.LUXI_DEF_CTMO
52 DEF_RWTO = constants.LUXI_DEF_RWTO
56 """Low-level transport class.
57
58 This is used on the client side.
59
60 This could be replaced by any other class that provides the same
61 semantics to the Client. This means:
62 - can send messages and receive messages
63 - safe for multithreading
64
65 """
66
67 - def __init__(self, address, timeouts=None, allow_non_master=None):
68 """Constructor for the Client class.
69
70 There are two timeouts used since we might want to wait for a long
71 time for a response, but the connect timeout should be lower.
72
73 If not passed, we use a default of 10 and respectively 60 seconds.
74
75 Note that on reading data, since the timeout applies to an
76 invidual receive, it might be that the total duration is longer
77 than timeout value passed (we make a hard limit at twice the read
78 timeout).
79
80 @type address: socket address
81 @param address: address the transport connects to
82 @type timeouts: list of ints
83 @param timeouts: timeouts to be used on connect and read/write
84 @type allow_non_master: bool
85 @param allow_non_master: skip checks for the master node on errors
86
87 """
88 self.address = address
89 if timeouts is None:
90 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
91 else:
92 self._ctimeout, self._rwtimeout = timeouts
93
94 self.socket = None
95 self._buffer = ""
96 self._msgs = collections.deque()
97
98 try:
99 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
100
101
102 try:
103 utils.Retry(self._Connect, 1.0, self._ctimeout,
104 args=(self.socket, address, self._ctimeout,
105 allow_non_master))
106 except utils.RetryTimeout:
107 raise errors.TimeoutError("Connect timed out")
108
109 self.socket.settimeout(self._rwtimeout)
110 except (socket.error, errors.NoMasterError):
111 if self.socket is not None:
112 self.socket.close()
113 self.socket = None
114 raise
115
116 @staticmethod
117 - def _Connect(sock, address, timeout, allow_non_master):
143
145 """Make sure we are connected.
146
147 """
148 if self.socket is None:
149 raise errors.ProtocolError("Connection is closed")
150
151 - def Send(self, msg):
166
168 """Try to receive a message from the socket.
169
170 In case we already have messages queued, we just return from the
171 queue. Otherwise, we try to read data with a _rwtimeout network
172 timeout, and making sure we don't go over 2x_rwtimeout as a global
173 limit.
174
175 """
176 self._CheckSocket()
177 etime = time.time() + self._rwtimeout
178 while not self._msgs:
179 if time.time() > etime:
180 raise errors.TimeoutError("Extended receive timeout")
181 while True:
182 try:
183 data = self.socket.recv(4096)
184 except socket.timeout, err:
185 raise errors.TimeoutError("Receive timeout: %s" % str(err))
186 except socket.error, err:
187 if err.args and err.args[0] == errno.EAGAIN:
188 continue
189 raise
190 break
191 if not data:
192 raise errors.ConnectionClosedError("Connection closed while reading")
193 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
194 self._buffer = new_msgs.pop()
195 self._msgs.extend(new_msgs)
196 return self._msgs.popleft()
197
198 - def Call(self, msg):
199 """Send a message and wait for the response.
200
201 This is just a wrapper over Send and Recv.
202
203 """
204 self.Send(msg)
205 return self.Recv()
206
207 @staticmethod
209 """Calls a given function, retrying if it fails on a network IO
210 exception.
211
212 This allows to re-establish a broken connection and retry an IO operation.
213
214 The function receives one an integer argument stating the current retry
215 number, 0 being the first call, 1 being the first retry, 2 the second,
216 and so on.
217
218 If any exception occurs, on_error is invoked first with the exception given
219 as an argument. Then, if the exception is a network exception, the function
220 call is retried once more.
221
222 """
223 for try_no in range(0, retries):
224 try:
225 return fn(try_no)
226 except (socket.error, errors.ConnectionClosedError) as ex:
227 on_error(ex)
228
229 if try_no == retries - 1:
230 raise
231 logging.error("Network error: %s, retring (retry attempt number %d)",
232 ex, try_no + 1)
233 time.sleep(wait_on_error * try_no)
234 except Exception, ex:
235 on_error(ex)
236 raise
237 assert False
238
240 """Close the socket"""
241 if self.socket is not None:
242 self.socket.close()
243 self.socket = None
244
247 """Low-level transport class that works on arbitrary file descriptors.
248
249 Unlike L{Transport}, this doesn't use timeouts.
250 """
251
252 - def __init__(self, fds,
253 timeouts=None, allow_non_master=None):
254 """Constructor for the Client class.
255
256 @type fds: pair of file descriptors
257 @param fds: the file descriptor for reading (the first in the pair)
258 and the file descriptor for writing (the second)
259 @type timeouts: int
260 @param timeouts: unused
261 @type allow_non_master: bool
262 @param allow_non_master: unused
263
264 """
265 self._rstream = io.open(fds[0], 'rb', 0)
266 self._wstream = io.open(fds[1], 'wb', 0)
267
268 self._buffer = ""
269 self._msgs = collections.deque()
270
272 """Make sure we are connected.
273
274 """
275 if self._rstream is None or self._wstream is None:
276 raise errors.ProtocolError("Connection is closed")
277
278 - def Send(self, msg):
290
292 """Try to receive a message from the read part of the socket.
293
294 In case we already have messages queued, we just return from the
295 queue.
296
297 """
298 self._CheckSocket()
299 while not self._msgs:
300 data = self._rstream.read(4096)
301 if not data:
302 raise errors.ConnectionClosedError("Connection closed while reading")
303 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
304 self._buffer = new_msgs.pop()
305 self._msgs.extend(new_msgs)
306 return self._msgs.popleft()
307
308 - def Call(self, msg):
309 """Send a message and wait for the response.
310
311 This is just a wrapper over Send and Recv.
312
313 """
314 self.Send(msg)
315 return self.Recv()
316
318 """Close the socket"""
319 if self._rstream is not None:
320 self._rstream.close()
321 self._rstream = None
322 if self._wstream is not None:
323 self._wstream.close()
324 self._wstream = None
325
328