Package ganeti :: Package rpc :: Module transport
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc.transport

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module that defines a transport for RPC connections. 
 32   
 33  A transport can send to and receive messages from some endpoint. 
 34   
 35  """ 
 36   
 37  import collections 
 38  import errno 
 39  import io 
 40  import logging 
 41  import socket 
 42  import time 
 43   
 44  from ganeti import constants 
 45  import ganeti.errors 
 46  from ganeti import ssconf 
 47  from ganeti import utils 
 48  from ganeti.rpc import errors 
 49   
 50   
 51  DEF_CTMO = constants.LUXI_DEF_CTMO 
 52  DEF_RWTO = constants.LUXI_DEF_RWTO 
53 54 55 -class Transport:
56 """Low-level transport class. 57 58 This is used on the client side. 59 60 This could be replaced by any other class that provides the same 61 semantics to the Client. This means: 62 - can send messages and receive messages 63 - safe for multithreading 64 65 """ 66
67 - def __init__(self, address, timeouts=None, allow_non_master=None):
68 """Constructor for the Client class. 69 70 There are two timeouts used since we might want to wait for a long 71 time for a response, but the connect timeout should be lower. 72 73 If not passed, we use a default of 10 and respectively 60 seconds. 74 75 Note that on reading data, since the timeout applies to an 76 invidual receive, it might be that the total duration is longer 77 than timeout value passed (we make a hard limit at twice the read 78 timeout). 79 80 @type address: socket address 81 @param address: address the transport connects to 82 @type timeouts: list of ints 83 @param timeouts: timeouts to be used on connect and read/write 84 @type allow_non_master: bool 85 @param allow_non_master: skip checks for the master node on errors 86 87 """ 88 self.address = address 89 if timeouts is None: 90 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 91 else: 92 self._ctimeout, self._rwtimeout = timeouts 93 94 self.socket = None 95 self._buffer = "" 96 self._msgs = collections.deque() 97 98 try: 99 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 100 101 # Try to connect 102 try: 103 utils.Retry(self._Connect, 1.0, self._ctimeout, 104 args=(self.socket, address, self._ctimeout, 105 allow_non_master)) 106 except utils.RetryTimeout: 107 raise errors.TimeoutError("Connect timed out") 108 109 self.socket.settimeout(self._rwtimeout) 110 except (socket.error, errors.NoMasterError): 111 if self.socket is not None: 112 self.socket.close() 113 self.socket = None 114 raise
115 116 @staticmethod
117 - def _Connect(sock, address, timeout, allow_non_master):
118 sock.settimeout(timeout) 119 try: 120 sock.connect(address) 121 except socket.timeout, err: 122 raise errors.TimeoutError("Connect timed out: %s" % str(err)) 123 except socket.error, err: 124 error_code = err.args[0] 125 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 126 if not allow_non_master: 127 # Verify if we're actually on the master node before trying 128 # again. 129 ss = ssconf.SimpleStore() 130 try: 131 master, myself = ssconf.GetMasterAndMyself(ss=ss) 132 except ganeti.errors.ConfigurationError: 133 raise errors.NoMasterError(address) 134 if master != myself: 135 raise errors.NoMasterError(address) 136 raise utils.RetryAgain() 137 elif error_code in (errno.EPERM, errno.EACCES): 138 raise errors.PermissionError(address) 139 elif error_code == errno.EAGAIN: 140 # Server's socket backlog is full at the moment 141 raise utils.RetryAgain() 142 raise
143
144 - def _CheckSocket(self):
145 """Make sure we are connected. 146 147 """ 148 if self.socket is None: 149 raise errors.ProtocolError("Connection is closed")
150
151 - def Send(self, msg):
152 """Send a message. 153 154 This just sends a message and doesn't wait for the response. 155 156 """ 157 if constants.LUXI_EOM in msg: 158 raise errors.ProtocolError("Message terminator found in payload") 159 160 self._CheckSocket() 161 try: 162 # TODO: sendall is not guaranteed to send everything 163 self.socket.sendall(msg + constants.LUXI_EOM) 164 except socket.timeout, err: 165 raise errors.TimeoutError("Sending timeout: %s" % str(err))
166
167 - def Recv(self):
168 """Try to receive a message from the socket. 169 170 In case we already have messages queued, we just return from the 171 queue. Otherwise, we try to read data with a _rwtimeout network 172 timeout, and making sure we don't go over 2x_rwtimeout as a global 173 limit. 174 175 """ 176 self._CheckSocket() 177 etime = time.time() + self._rwtimeout 178 while not self._msgs: 179 if time.time() > etime: 180 raise errors.TimeoutError("Extended receive timeout") 181 while True: 182 try: 183 data = self.socket.recv(4096) 184 except socket.timeout, err: 185 raise errors.TimeoutError("Receive timeout: %s" % str(err)) 186 except socket.error, err: 187 if err.args and err.args[0] == errno.EAGAIN: 188 continue 189 raise 190 break 191 if not data: 192 raise errors.ConnectionClosedError("Connection closed while reading") 193 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 194 self._buffer = new_msgs.pop() 195 self._msgs.extend(new_msgs) 196 return self._msgs.popleft()
197
198 - def Call(self, msg):
199 """Send a message and wait for the response. 200 201 This is just a wrapper over Send and Recv. 202 203 """ 204 self.Send(msg) 205 return self.Recv()
206 207 @staticmethod
208 - def RetryOnNetworkError(fn, on_error, retries=15, wait_on_error=5):
209 """Calls a given function, retrying if it fails on a network IO 210 exception. 211 212 This allows to re-establish a broken connection and retry an IO operation. 213 214 The function receives one an integer argument stating the current retry 215 number, 0 being the first call, 1 being the first retry, 2 the second, 216 and so on. 217 218 If any exception occurs, on_error is invoked first with the exception given 219 as an argument. Then, if the exception is a network exception, the function 220 call is retried once more. 221 222 """ 223 for try_no in range(0, retries): 224 try: 225 return fn(try_no) 226 except (socket.error, errors.ConnectionClosedError) as ex: 227 on_error(ex) 228 # we retry on a network error, unless it's the last try 229 if try_no == retries - 1: 230 raise 231 logging.error("Network error: %s, retring (retry attempt number %d)", 232 ex, try_no + 1) 233 time.sleep(wait_on_error * try_no) 234 except Exception, ex: 235 on_error(ex) 236 raise 237 assert False # we should never get here
238
239 - def Close(self):
240 """Close the socket""" 241 if self.socket is not None: 242 self.socket.close() 243 self.socket = None
244
245 246 -class FdTransport:
247 """Low-level transport class that works on arbitrary file descriptors. 248 249 Unlike L{Transport}, this doesn't use timeouts. 250 """ 251
252 - def __init__(self, fds, 253 timeouts=None, allow_non_master=None): # pylint: disable=W0613
254 """Constructor for the Client class. 255 256 @type fds: pair of file descriptors 257 @param fds: the file descriptor for reading (the first in the pair) 258 and the file descriptor for writing (the second) 259 @type timeouts: int 260 @param timeouts: unused 261 @type allow_non_master: bool 262 @param allow_non_master: unused 263 264 """ 265 self._rstream = io.open(fds[0], 'rb', 0) 266 self._wstream = io.open(fds[1], 'wb', 0) 267 268 self._buffer = "" 269 self._msgs = collections.deque()
270
271 - def _CheckSocket(self):
272 """Make sure we are connected. 273 274 """ 275 if self._rstream is None or self._wstream is None: 276 raise errors.ProtocolError("Connection is closed")
277
278 - def Send(self, msg):
279 """Send a message. 280 281 This just sends a message and doesn't wait for the response. 282 283 """ 284 if constants.LUXI_EOM in msg: 285 raise errors.ProtocolError("Message terminator found in payload") 286 287 self._CheckSocket() 288 self._wstream.write(msg + constants.LUXI_EOM) 289 self._wstream.flush()
290
291 - def Recv(self):
292 """Try to receive a message from the read part of the socket. 293 294 In case we already have messages queued, we just return from the 295 queue. 296 297 """ 298 self._CheckSocket() 299 while not self._msgs: 300 data = self._rstream.read(4096) 301 if not data: 302 raise errors.ConnectionClosedError("Connection closed while reading") 303 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 304 self._buffer = new_msgs.pop() 305 self._msgs.extend(new_msgs) 306 return self._msgs.popleft()
307
308 - def Call(self, msg):
309 """Send a message and wait for the response. 310 311 This is just a wrapper over Send and Recv. 312 313 """ 314 self.Send(msg) 315 return self.Recv()
316
317 - def Close(self):
318 """Close the socket""" 319 if self._rstream is not None: 320 self._rstream.close() 321 self._rstream = None 322 if self._wstream is not None: 323 self._wstream.close() 324 self._wstream = None
325
326 - def close(self):
327 self.Close()
328