Package ganeti :: Package rpc :: Module transport
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc.transport

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module that defines a transport for RPC connections. 
 32   
 33  A transport can send to and receive messages from some endpoint. 
 34   
 35  """ 
 36   
 37  import collections 
 38  import errno 
 39  import socket 
 40  import time 
 41   
 42  from ganeti import constants 
 43  from ganeti import utils 
 44  from ganeti.rpc import errors 
 45   
 46   
 47  DEF_CTMO = constants.LUXI_DEF_CTMO 
 48  DEF_RWTO = constants.LUXI_DEF_RWTO 
49 50 51 -class Transport:
52 """Low-level transport class. 53 54 This is used on the client side. 55 56 This could be replace by any other class that provides the same 57 semantics to the Client. This means: 58 - can send messages and receive messages 59 - safe for multithreading 60 61 """ 62
63 - def __init__(self, address, timeouts=None):
64 """Constructor for the Client class. 65 66 Arguments: 67 - address: a valid address the the used transport class 68 - timeout: a list of timeouts, to be used on connect and read/write 69 70 There are two timeouts used since we might want to wait for a long 71 time for a response, but the connect timeout should be lower. 72 73 If not passed, we use a default of 10 and respectively 60 seconds. 74 75 Note that on reading data, since the timeout applies to an 76 invidual receive, it might be that the total duration is longer 77 than timeout value passed (we make a hard limit at twice the read 78 timeout). 79 80 """ 81 self.address = address 82 if timeouts is None: 83 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 84 else: 85 self._ctimeout, self._rwtimeout = timeouts 86 87 self.socket = None 88 self._buffer = "" 89 self._msgs = collections.deque() 90 91 try: 92 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 93 94 # Try to connect 95 try: 96 utils.Retry(self._Connect, 1.0, self._ctimeout, 97 args=(self.socket, address, self._ctimeout)) 98 except utils.RetryTimeout: 99 raise errors.TimeoutError("Connect timed out") 100 101 self.socket.settimeout(self._rwtimeout) 102 except (socket.error, errors.NoMasterError): 103 if self.socket is not None: 104 self.socket.close() 105 self.socket = None 106 raise
107 108 @staticmethod
109 - def _Connect(sock, address, timeout):
110 sock.settimeout(timeout) 111 try: 112 sock.connect(address) 113 except socket.timeout, err: 114 raise errors.TimeoutError("Connect timed out: %s" % str(err)) 115 except socket.error, err: 116 error_code = err.args[0] 117 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 118 raise errors.NoMasterError(address) 119 elif error_code in (errno.EPERM, errno.EACCES): 120 raise errors.PermissionError(address) 121 elif error_code == errno.EAGAIN: 122 # Server's socket backlog is full at the moment 123 raise utils.RetryAgain() 124 raise
125
126 - def _CheckSocket(self):
127 """Make sure we are connected. 128 129 """ 130 if self.socket is None: 131 raise errors.ProtocolError("Connection is closed")
132
133 - def Send(self, msg):
134 """Send a message. 135 136 This just sends a message and doesn't wait for the response. 137 138 """ 139 if constants.LUXI_EOM in msg: 140 raise errors.ProtocolError("Message terminator found in payload") 141 142 self._CheckSocket() 143 try: 144 # TODO: sendall is not guaranteed to send everything 145 self.socket.sendall(msg + constants.LUXI_EOM) 146 except socket.timeout, err: 147 raise errors.TimeoutError("Sending timeout: %s" % str(err))
148
149 - def Recv(self):
150 """Try to receive a message from the socket. 151 152 In case we already have messages queued, we just return from the 153 queue. Otherwise, we try to read data with a _rwtimeout network 154 timeout, and making sure we don't go over 2x_rwtimeout as a global 155 limit. 156 157 """ 158 self._CheckSocket() 159 etime = time.time() + self._rwtimeout 160 while not self._msgs: 161 if time.time() > etime: 162 raise errors.TimeoutError("Extended receive timeout") 163 while True: 164 try: 165 data = self.socket.recv(4096) 166 except socket.timeout, err: 167 raise errors.TimeoutError("Receive timeout: %s" % str(err)) 168 except socket.error, err: 169 if err.args and err.args[0] == errno.EAGAIN: 170 continue 171 raise 172 break 173 if not data: 174 raise errors.ConnectionClosedError("Connection closed while reading") 175 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 176 self._buffer = new_msgs.pop() 177 self._msgs.extend(new_msgs) 178 return self._msgs.popleft()
179
180 - def Call(self, msg):
181 """Send a message and wait for the response. 182 183 This is just a wrapper over Send and Recv. 184 185 """ 186 self.Send(msg) 187 return self.Recv()
188
189 - def Close(self):
190 """Close the socket""" 191 if self.socket is not None: 192 self.socket.close() 193 self.socket = None
194