1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module that defines a transport for RPC connections.
32
33 A transport can send to and receive messages from some endpoint.
34
35 """
36
37 import collections
38 import errno
39 import socket
40 import time
41
42 from ganeti import constants
43 from ganeti import utils
44 from ganeti.rpc import errors
45
46
47 DEF_CTMO = constants.LUXI_DEF_CTMO
48 DEF_RWTO = constants.LUXI_DEF_RWTO
52 """Low-level transport class.
53
54 This is used on the client side.
55
56 This could be replace by any other class that provides the same
57 semantics to the Client. This means:
58 - can send messages and receive messages
59 - safe for multithreading
60
61 """
62
63 - def __init__(self, address, timeouts=None):
64 """Constructor for the Client class.
65
66 Arguments:
67 - address: a valid address the the used transport class
68 - timeout: a list of timeouts, to be used on connect and read/write
69
70 There are two timeouts used since we might want to wait for a long
71 time for a response, but the connect timeout should be lower.
72
73 If not passed, we use a default of 10 and respectively 60 seconds.
74
75 Note that on reading data, since the timeout applies to an
76 invidual receive, it might be that the total duration is longer
77 than timeout value passed (we make a hard limit at twice the read
78 timeout).
79
80 """
81 self.address = address
82 if timeouts is None:
83 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
84 else:
85 self._ctimeout, self._rwtimeout = timeouts
86
87 self.socket = None
88 self._buffer = ""
89 self._msgs = collections.deque()
90
91 try:
92 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
93
94
95 try:
96 utils.Retry(self._Connect, 1.0, self._ctimeout,
97 args=(self.socket, address, self._ctimeout))
98 except utils.RetryTimeout:
99 raise errors.TimeoutError("Connect timed out")
100
101 self.socket.settimeout(self._rwtimeout)
102 except (socket.error, errors.NoMasterError):
103 if self.socket is not None:
104 self.socket.close()
105 self.socket = None
106 raise
107
108 @staticmethod
110 sock.settimeout(timeout)
111 try:
112 sock.connect(address)
113 except socket.timeout, err:
114 raise errors.TimeoutError("Connect timed out: %s" % str(err))
115 except socket.error, err:
116 error_code = err.args[0]
117 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
118 raise errors.NoMasterError(address)
119 elif error_code in (errno.EPERM, errno.EACCES):
120 raise errors.PermissionError(address)
121 elif error_code == errno.EAGAIN:
122
123 raise utils.RetryAgain()
124 raise
125
127 """Make sure we are connected.
128
129 """
130 if self.socket is None:
131 raise errors.ProtocolError("Connection is closed")
132
133 - def Send(self, msg):
148
150 """Try to receive a message from the socket.
151
152 In case we already have messages queued, we just return from the
153 queue. Otherwise, we try to read data with a _rwtimeout network
154 timeout, and making sure we don't go over 2x_rwtimeout as a global
155 limit.
156
157 """
158 self._CheckSocket()
159 etime = time.time() + self._rwtimeout
160 while not self._msgs:
161 if time.time() > etime:
162 raise errors.TimeoutError("Extended receive timeout")
163 while True:
164 try:
165 data = self.socket.recv(4096)
166 except socket.timeout, err:
167 raise errors.TimeoutError("Receive timeout: %s" % str(err))
168 except socket.error, err:
169 if err.args and err.args[0] == errno.EAGAIN:
170 continue
171 raise
172 break
173 if not data:
174 raise errors.ConnectionClosedError("Connection closed while reading")
175 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
176 self._buffer = new_msgs.pop()
177 self._msgs.extend(new_msgs)
178 return self._msgs.popleft()
179
180 - def Call(self, msg):
181 """Send a message and wait for the response.
182
183 This is just a wrapper over Send and Recv.
184
185 """
186 self.Send(msg)
187 return self.Recv()
188
190 """Close the socket"""
191 if self.socket is not None:
192 self.socket.close()
193 self.socket = None
194