Package ganeti :: Package rpc :: Module transport
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc.transport

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module that defines a transport for RPC connections. 
 32   
 33  A transport can send to and receive messages from some endpoint. 
 34   
 35  """ 
 36   
 37  import collections 
 38  import errno 
 39  import io 
 40  import logging 
 41  import socket 
 42  import time 
 43   
 44  from ganeti import constants 
 45  import ganeti.errors 
 46  from ganeti import ssconf 
 47  from ganeti import utils 
 48  from ganeti.rpc import errors 
 49   
 50   
 51  DEF_CTMO = constants.LUXI_DEF_CTMO 
 52  DEF_RWTO = constants.LUXI_DEF_RWTO 
53 54 55 -class Transport:
56 """Low-level transport class. 57 58 This is used on the client side. 59 60 This could be replaced by any other class that provides the same 61 semantics to the Client. This means: 62 - can send messages and receive messages 63 - safe for multithreading 64 65 """ 66
67 - def __init__(self, address, timeouts=None, allow_non_master=None):
68 """Constructor for the Client class. 69 70 There are two timeouts used since we might want to wait for a long 71 time for a response, but the connect timeout should be lower. 72 73 If not passed, we use the default luxi timeouts from the global 74 constants file. 75 76 Note that on reading data, since the timeout applies to an 77 invidual receive, it might be that the total duration is longer 78 than timeout value passed (we make a hard limit at twice the read 79 timeout). 80 81 @type address: socket address 82 @param address: address the transport connects to 83 @type timeouts: list of ints 84 @param timeouts: timeouts to be used on connect and read/write 85 @type allow_non_master: bool 86 @param allow_non_master: skip checks for the master node on errors 87 88 """ 89 self.address = address 90 if timeouts is None: 91 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 92 else: 93 self._ctimeout, self._rwtimeout = timeouts 94 95 self.socket = None 96 self._buffer = "" 97 self._msgs = collections.deque() 98 99 try: 100 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 101 102 # Try to connect 103 try: 104 utils.Retry(self._Connect, 1.0, self._ctimeout, 105 args=(self.socket, address, self._ctimeout, 106 allow_non_master)) 107 except utils.RetryTimeout: 108 raise errors.TimeoutError("Connect timed out") 109 110 self.socket.settimeout(self._rwtimeout) 111 except (socket.error, errors.NoMasterError): 112 if self.socket is not None: 113 self.socket.close() 114 self.socket = None 115 raise
116 117 @staticmethod
118 - def _Connect(sock, address, timeout, allow_non_master):
119 sock.settimeout(timeout) 120 try: 121 sock.connect(address) 122 except socket.timeout, err: 123 raise errors.TimeoutError("Connect timed out: %s" % str(err)) 124 except socket.error, err: 125 error_code = err.args[0] 126 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 127 if not allow_non_master: 128 # Verify if we're actually on the master node before trying 129 # again. 130 ss = ssconf.SimpleStore() 131 try: 132 master, myself = ssconf.GetMasterAndMyself(ss=ss) 133 except ganeti.errors.ConfigurationError: 134 raise errors.NoMasterError(address) 135 if master != myself: 136 raise errors.NoMasterError(address) 137 raise utils.RetryAgain() 138 elif error_code in (errno.EPERM, errno.EACCES): 139 raise errors.PermissionError(address) 140 elif error_code == errno.EAGAIN: 141 # Server's socket backlog is full at the moment 142 raise utils.RetryAgain() 143 raise
144
145 - def _CheckSocket(self):
146 """Make sure we are connected. 147 148 """ 149 if self.socket is None: 150 raise errors.ProtocolError("Connection is closed")
151
152 - def Send(self, msg):
153 """Send a message. 154 155 This just sends a message and doesn't wait for the response. 156 157 """ 158 if constants.LUXI_EOM in msg: 159 raise errors.ProtocolError("Message terminator found in payload") 160 161 self._CheckSocket() 162 try: 163 # TODO: sendall is not guaranteed to send everything 164 self.socket.sendall(msg + constants.LUXI_EOM) 165 except socket.timeout, err: 166 raise errors.TimeoutError("Sending timeout: %s" % str(err))
167
168 - def Recv(self):
169 """Try to receive a message from the socket. 170 171 In case we already have messages queued, we just return from the 172 queue. Otherwise, we try to read data with a _rwtimeout network 173 timeout, and making sure we don't go over 2x_rwtimeout as a global 174 limit. 175 176 """ 177 self._CheckSocket() 178 etime = time.time() + self._rwtimeout 179 while not self._msgs: 180 if time.time() > etime: 181 raise errors.TimeoutError("Extended receive timeout") 182 while True: 183 try: 184 data = self.socket.recv(4096) 185 except socket.timeout, err: 186 raise errors.TimeoutError("Receive timeout: %s" % str(err)) 187 except socket.error, err: 188 if err.args and err.args[0] == errno.EAGAIN: 189 continue 190 raise 191 break 192 if not data: 193 raise errors.ConnectionClosedError("Connection closed while reading") 194 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 195 self._buffer = new_msgs.pop() 196 self._msgs.extend(new_msgs) 197 return self._msgs.popleft()
198
199 - def Call(self, msg):
200 """Send a message and wait for the response. 201 202 This is just a wrapper over Send and Recv. 203 204 """ 205 self.Send(msg) 206 return self.Recv()
207 208 @staticmethod
209 - def RetryOnNetworkError(fn, on_error, retries=15, wait_on_error=5):
210 """Calls a given function, retrying if it fails on a network IO 211 exception. 212 213 This allows to re-establish a broken connection and retry an IO operation. 214 215 The function receives one an integer argument stating the current retry 216 number, 0 being the first call, 1 being the first retry, 2 the second, 217 and so on. 218 219 If any exception occurs, on_error is invoked first with the exception given 220 as an argument. Then, if the exception is a network exception, the function 221 call is retried once more. 222 223 """ 224 for try_no in range(0, retries): 225 try: 226 return fn(try_no) 227 except (socket.error, errors.ConnectionClosedError) as ex: 228 on_error(ex) 229 # we retry on a network error, unless it's the last try 230 if try_no == retries - 1: 231 raise 232 logging.error("Network error: %s, retring (retry attempt number %d)", 233 ex, try_no + 1) 234 time.sleep(wait_on_error * try_no) 235 except Exception, ex: 236 on_error(ex) 237 raise 238 assert False # we should never get here
239
240 - def Close(self):
241 """Close the socket""" 242 if self.socket is not None: 243 self.socket.close() 244 self.socket = None
245
246 247 -class FdTransport:
248 """Low-level transport class that works on arbitrary file descriptors. 249 250 Unlike L{Transport}, this doesn't use timeouts. 251 """ 252
253 - def __init__(self, fds, 254 timeouts=None, allow_non_master=None): # pylint: disable=W0613
255 """Constructor for the Client class. 256 257 @type fds: pair of file descriptors 258 @param fds: the file descriptor for reading (the first in the pair) 259 and the file descriptor for writing (the second) 260 @type timeouts: int 261 @param timeouts: unused 262 @type allow_non_master: bool 263 @param allow_non_master: unused 264 265 """ 266 self._rstream = io.open(fds[0], 'rb', 0) 267 self._wstream = io.open(fds[1], 'wb', 0) 268 269 self._buffer = "" 270 self._msgs = collections.deque()
271
272 - def _CheckSocket(self):
273 """Make sure we are connected. 274 275 """ 276 if self._rstream is None or self._wstream is None: 277 raise errors.ProtocolError("Connection is closed")
278
279 - def Send(self, msg):
280 """Send a message. 281 282 This just sends a message and doesn't wait for the response. 283 284 """ 285 if constants.LUXI_EOM in msg: 286 raise errors.ProtocolError("Message terminator found in payload") 287 288 self._CheckSocket() 289 self._wstream.write(msg + constants.LUXI_EOM) 290 self._wstream.flush()
291
292 - def Recv(self):
293 """Try to receive a message from the read part of the socket. 294 295 In case we already have messages queued, we just return from the 296 queue. 297 298 """ 299 self._CheckSocket() 300 while not self._msgs: 301 data = self._rstream.read(4096) 302 if not data: 303 raise errors.ConnectionClosedError("Connection closed while reading") 304 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 305 self._buffer = new_msgs.pop() 306 self._msgs.extend(new_msgs) 307 return self._msgs.popleft()
308
309 - def Call(self, msg):
310 """Send a message and wait for the response. 311 312 This is just a wrapper over Send and Recv. 313 314 """ 315 self.Send(msg) 316 return self.Recv()
317
318 - def Close(self):
319 """Close the socket""" 320 if self._rstream is not None: 321 self._rstream.close() 322 self._rstream = None 323 if self._wstream is not None: 324 self._wstream.close() 325 self._wstream = None
326
327 - def close(self):
328 self.Close()
329