1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module that defines a transport for RPC connections.
32
33 A transport can send to and receive messages from some endpoint.
34
35 """
36
37 import collections
38 import errno
39 import io
40 import logging
41 import socket
42 import time
43
44 from ganeti import constants
45 import ganeti.errors
46 from ganeti import ssconf
47 from ganeti import utils
48 from ganeti.rpc import errors
49
50
51 DEF_CTMO = constants.LUXI_DEF_CTMO
52 DEF_RWTO = constants.LUXI_DEF_RWTO
56 """Low-level transport class.
57
58 This is used on the client side.
59
60 This could be replaced by any other class that provides the same
61 semantics to the Client. This means:
62 - can send messages and receive messages
63 - safe for multithreading
64
65 """
66
67 - def __init__(self, address, timeouts=None, allow_non_master=None):
68 """Constructor for the Client class.
69
70 There are two timeouts used since we might want to wait for a long
71 time for a response, but the connect timeout should be lower.
72
73 If not passed, we use the default luxi timeouts from the global
74 constants file.
75
76 Note that on reading data, since the timeout applies to an
77 invidual receive, it might be that the total duration is longer
78 than timeout value passed (we make a hard limit at twice the read
79 timeout).
80
81 @type address: socket address
82 @param address: address the transport connects to
83 @type timeouts: list of ints
84 @param timeouts: timeouts to be used on connect and read/write
85 @type allow_non_master: bool
86 @param allow_non_master: skip checks for the master node on errors
87
88 """
89 self.address = address
90 if timeouts is None:
91 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
92 else:
93 self._ctimeout, self._rwtimeout = timeouts
94
95 self.socket = None
96 self._buffer = ""
97 self._msgs = collections.deque()
98
99 try:
100 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
101
102
103 try:
104 utils.Retry(self._Connect, 1.0, self._ctimeout,
105 args=(self.socket, address, self._ctimeout,
106 allow_non_master))
107 except utils.RetryTimeout:
108 raise errors.TimeoutError("Connect timed out")
109
110 self.socket.settimeout(self._rwtimeout)
111 except (socket.error, errors.NoMasterError):
112 if self.socket is not None:
113 self.socket.close()
114 self.socket = None
115 raise
116
117 @staticmethod
118 - def _Connect(sock, address, timeout, allow_non_master):
144
146 """Make sure we are connected.
147
148 """
149 if self.socket is None:
150 raise errors.ProtocolError("Connection is closed")
151
152 - def Send(self, msg):
167
169 """Try to receive a message from the socket.
170
171 In case we already have messages queued, we just return from the
172 queue. Otherwise, we try to read data with a _rwtimeout network
173 timeout, and making sure we don't go over 2x_rwtimeout as a global
174 limit.
175
176 """
177 self._CheckSocket()
178 etime = time.time() + self._rwtimeout
179 while not self._msgs:
180 if time.time() > etime:
181 raise errors.TimeoutError("Extended receive timeout")
182 while True:
183 try:
184 data = self.socket.recv(4096)
185 except socket.timeout, err:
186 raise errors.TimeoutError("Receive timeout: %s" % str(err))
187 except socket.error, err:
188 if err.args and err.args[0] == errno.EAGAIN:
189 continue
190 raise
191 break
192 if not data:
193 raise errors.ConnectionClosedError("Connection closed while reading")
194 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
195 self._buffer = new_msgs.pop()
196 self._msgs.extend(new_msgs)
197 return self._msgs.popleft()
198
199 - def Call(self, msg):
200 """Send a message and wait for the response.
201
202 This is just a wrapper over Send and Recv.
203
204 """
205 self.Send(msg)
206 return self.Recv()
207
208 @staticmethod
210 """Calls a given function, retrying if it fails on a network IO
211 exception.
212
213 This allows to re-establish a broken connection and retry an IO operation.
214
215 The function receives one an integer argument stating the current retry
216 number, 0 being the first call, 1 being the first retry, 2 the second,
217 and so on.
218
219 If any exception occurs, on_error is invoked first with the exception given
220 as an argument. Then, if the exception is a network exception, the function
221 call is retried once more.
222
223 """
224 for try_no in range(0, retries):
225 try:
226 return fn(try_no)
227 except (socket.error, errors.ConnectionClosedError) as ex:
228 on_error(ex)
229
230 if try_no == retries - 1:
231 raise
232 logging.error("Network error: %s, retring (retry attempt number %d)",
233 ex, try_no + 1)
234 time.sleep(wait_on_error * try_no)
235 except Exception, ex:
236 on_error(ex)
237 raise
238 assert False
239
241 """Close the socket"""
242 if self.socket is not None:
243 self.socket.close()
244 self.socket = None
245
248 """Low-level transport class that works on arbitrary file descriptors.
249
250 Unlike L{Transport}, this doesn't use timeouts.
251 """
252
253 - def __init__(self, fds,
254 timeouts=None, allow_non_master=None):
255 """Constructor for the Client class.
256
257 @type fds: pair of file descriptors
258 @param fds: the file descriptor for reading (the first in the pair)
259 and the file descriptor for writing (the second)
260 @type timeouts: int
261 @param timeouts: unused
262 @type allow_non_master: bool
263 @param allow_non_master: unused
264
265 """
266 self._rstream = io.open(fds[0], 'rb', 0)
267 self._wstream = io.open(fds[1], 'wb', 0)
268
269 self._buffer = ""
270 self._msgs = collections.deque()
271
273 """Make sure we are connected.
274
275 """
276 if self._rstream is None or self._wstream is None:
277 raise errors.ProtocolError("Connection is closed")
278
279 - def Send(self, msg):
291
293 """Try to receive a message from the read part of the socket.
294
295 In case we already have messages queued, we just return from the
296 queue.
297
298 """
299 self._CheckSocket()
300 while not self._msgs:
301 data = self._rstream.read(4096)
302 if not data:
303 raise errors.ConnectionClosedError("Connection closed while reading")
304 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
305 self._buffer = new_msgs.pop()
306 self._msgs.extend(new_msgs)
307 return self._msgs.popleft()
308
309 - def Call(self, msg):
310 """Send a message and wait for the response.
311
312 This is just a wrapper over Send and Recv.
313
314 """
315 self.Send(msg)
316 return self.Recv()
317
319 """Close the socket"""
320 if self._rstream is not None:
321 self._rstream.close()
322 self._rstream = None
323 if self._wstream is not None:
324 self._wstream.close()
325 self._wstream = None
326
329