Package ganeti :: Package http :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.http.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2007, 2008 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21  """HTTP client module. 
 22   
 23  """ 
 24   
 25  # pylint: disable-msg=E1103 
 26   
 27  # # E1103: %s %r has no %r member (but some types could not be 
 28  # inferred), since _socketobject could be ssl or not and pylint 
 29  # doesn't parse that 
 30   
 31   
 32  import os 
 33  import select 
 34  import socket 
 35  import errno 
 36  import threading 
 37   
 38  from ganeti import workerpool 
 39  from ganeti import http 
 40  from ganeti import utils 
 41   
 42   
 43  HTTP_CLIENT_THREADS = 10 
 44   
 45   
46 -class HttpClientRequest(object):
47 - def __init__(self, host, port, method, path, headers=None, post_data=None, 48 ssl_params=None, ssl_verify_peer=False):
49 """Describes an HTTP request. 50 51 @type host: string 52 @param host: Hostname 53 @type port: int 54 @param port: Port 55 @type method: string 56 @param method: Method name 57 @type path: string 58 @param path: Request path 59 @type headers: dict or None 60 @param headers: Additional headers to send 61 @type post_data: string or None 62 @param post_data: Additional data to send 63 @type ssl_params: HttpSslParams 64 @param ssl_params: SSL key and certificate 65 @type ssl_verify_peer: bool 66 @param ssl_verify_peer: Whether to compare our certificate with 67 server's certificate 68 69 """ 70 if post_data is not None: 71 assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \ 72 "Only POST and GET requests support sending data" 73 74 assert path.startswith("/"), "Path must start with slash (/)" 75 76 # Request attributes 77 self.host = host 78 self.port = port 79 self.ssl_params = ssl_params 80 self.ssl_verify_peer = ssl_verify_peer 81 self.method = method 82 self.path = path 83 self.headers = headers 84 self.post_data = post_data 85 86 self.success = None 87 self.error = None 88 89 # Raw response 90 self.response = None 91 92 # Response attributes 93 self.resp_version = None 94 self.resp_status_code = None 95 self.resp_reason = None 96 self.resp_headers = None 97 self.resp_body = None
98
99 - def __repr__(self):
100 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 101 "%s:%s" % (self.host, self.port), 102 self.method, 103 self.path] 104 105 return "<%s at %#x>" % (" ".join(status), id(self))
106 107
108 -class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
109 pass
110 111
112 -class _HttpServerToClientMessageReader(http.HttpMessageReader):
113 # Length limits 114 START_LINE_LENGTH_MAX = 512 115 HEADER_LENGTH_MAX = 4096 116
117 - def ParseStartLine(self, start_line):
118 """Parses the status line sent by the server. 119 120 """ 121 # Empty lines are skipped when reading 122 assert start_line 123 124 try: 125 [version, status, reason] = start_line.split(None, 2) 126 except ValueError: 127 try: 128 [version, status] = start_line.split(None, 1) 129 reason = "" 130 except ValueError: 131 version = http.HTTP_0_9 132 133 if version: 134 version = version.upper() 135 136 # The status code is a three-digit number 137 try: 138 status = int(status) 139 if status < 100 or status > 999: 140 status = -1 141 except (TypeError, ValueError): 142 status = -1 143 144 if status == -1: 145 raise http.HttpError("Invalid status code (%r)" % start_line) 146 147 return http.HttpServerToClientStartLine(version, status, reason)
148 149
150 -class HttpClientRequestExecutor(http.HttpBase):
151 # Default headers 152 DEFAULT_HEADERS = { 153 http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION, 154 # TODO: For keep-alive, don't send "Connection: close" 155 http.HTTP_CONNECTION: "close", 156 } 157 158 # Timeouts in seconds for socket layer 159 # TODO: Soft timeout instead of only socket timeout? 160 # TODO: Make read timeout configurable per OpCode? 161 CONNECT_TIMEOUT = 5 162 WRITE_TIMEOUT = 10 163 READ_TIMEOUT = None 164 CLOSE_TIMEOUT = 1 165
166 - def __init__(self, req):
167 """Initializes the HttpClientRequestExecutor class. 168 169 @type req: HttpClientRequest 170 @param req: Request object 171 172 """ 173 http.HttpBase.__init__(self) 174 self.request = req 175 176 try: 177 # TODO: Implement connection caching/keep-alive 178 self.sock = self._CreateSocket(req.ssl_params, 179 req.ssl_verify_peer) 180 181 # Disable Python's timeout 182 self.sock.settimeout(None) 183 184 # Operate in non-blocking mode 185 self.sock.setblocking(0) 186 187 response_msg_reader = None 188 response_msg = None 189 force_close = True 190 191 self._Connect() 192 try: 193 self._SendRequest() 194 (response_msg_reader, response_msg) = self._ReadResponse() 195 196 # Only wait for server to close if we didn't have any exception. 197 force_close = False 198 finally: 199 # TODO: Keep-alive is not supported, always close connection 200 force_close = True 201 http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT, 202 self.WRITE_TIMEOUT, response_msg_reader, 203 force_close) 204 205 self.sock.close() 206 self.sock = None 207 208 req.response = response_msg 209 210 req.resp_version = req.response.start_line.version 211 req.resp_status_code = req.response.start_line.code 212 req.resp_reason = req.response.start_line.reason 213 req.resp_headers = req.response.headers 214 req.resp_body = req.response.body 215 216 req.success = True 217 req.error = None 218 219 except http.HttpError, err: 220 req.success = False 221 req.error = str(err)
222
223 - def _Connect(self):
224 """Non-blocking connect to host with timeout. 225 226 """ 227 connected = False 228 while True: 229 try: 230 connect_error = self.sock.connect_ex((self.request.host, 231 self.request.port)) 232 except socket.gaierror, err: 233 raise http.HttpError("Connection failed: %s" % str(err)) 234 235 if connect_error == errno.EINTR: 236 # Mask signals 237 pass 238 239 elif connect_error == 0: 240 # Connection established 241 connected = True 242 break 243 244 elif connect_error == errno.EINPROGRESS: 245 # Connection started 246 break 247 248 raise http.HttpError("Connection failed (%s: %s)" % 249 (connect_error, os.strerror(connect_error))) 250 251 if not connected: 252 # Wait for connection 253 event = utils.WaitForFdCondition(self.sock, select.POLLOUT, 254 self.CONNECT_TIMEOUT) 255 if event is None: 256 raise http.HttpError("Timeout while connecting to server") 257 258 # Get error code 259 connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 260 if connect_error != 0: 261 raise http.HttpError("Connection failed (%s: %s)" % 262 (connect_error, os.strerror(connect_error))) 263 264 # Enable TCP keep-alive 265 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 266 267 # If needed, Linux specific options are available to change the TCP 268 # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and 269 # TCP_KEEPINTVL. 270 271 # Do the secret SSL handshake 272 if self.using_ssl: 273 self.sock.set_connect_state() # pylint: disable-msg=E1103 274 try: 275 http.Handshake(self.sock, self.WRITE_TIMEOUT) 276 except http.HttpSessionHandshakeUnexpectedEOF: 277 raise http.HttpError("Server closed connection during SSL handshake")
278
279 - def _SendRequest(self):
280 """Sends request to server. 281 282 """ 283 # Headers 284 send_headers = self.DEFAULT_HEADERS.copy() 285 286 if self.request.headers: 287 send_headers.update(self.request.headers) 288 289 send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host, 290 self.request.port) 291 292 # Response message 293 msg = http.HttpMessage() 294 295 # Combine request line. We only support HTTP/1.0 (no chunked transfers and 296 # no keep-alive). 297 # TODO: For keep-alive, change to HTTP/1.1 298 msg.start_line = \ 299 http.HttpClientToServerStartLine(method=self.request.method.upper(), 300 path=self.request.path, 301 version=http.HTTP_1_0) 302 msg.headers = send_headers 303 msg.body = self.request.post_data 304 305 try: 306 _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT) 307 except http.HttpSocketTimeout: 308 raise http.HttpError("Timeout while sending request") 309 except socket.error, err: 310 raise http.HttpError("Error sending request: %s" % err)
311
312 - def _ReadResponse(self):
313 """Read response from server. 314 315 """ 316 response_msg = http.HttpMessage() 317 318 try: 319 response_msg_reader = \ 320 _HttpServerToClientMessageReader(self.sock, response_msg, 321 self.READ_TIMEOUT) 322 except http.HttpSocketTimeout: 323 raise http.HttpError("Timeout while reading response") 324 except socket.error, err: 325 raise http.HttpError("Error reading response: %s" % err) 326 327 return (response_msg_reader, response_msg)
328 329
330 -class _HttpClientPendingRequest(object):
331 """Data class for pending requests. 332 333 """
334 - def __init__(self, request):
335 self.request = request 336 337 # Thread synchronization 338 self.done = threading.Event()
339
340 - def __repr__(self):
341 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 342 "req=%r" % self.request] 343 344 return "<%s at %#x>" % (" ".join(status), id(self))
345 346
347 -class HttpClientWorker(workerpool.BaseWorker):
348 """HTTP client worker class. 349 350 """
351 - def RunTask(self, pend_req): # pylint: disable-msg=W0221
352 try: 353 HttpClientRequestExecutor(pend_req.request) 354 finally: 355 pend_req.done.set()
356 357
358 -class HttpClientWorkerPool(workerpool.WorkerPool):
359 - def __init__(self, manager):
360 workerpool.WorkerPool.__init__(self, "HttpClient", 361 HTTP_CLIENT_THREADS, 362 HttpClientWorker) 363 self.manager = manager
364 365
366 -class HttpClientManager(object):
367 """Manages HTTP requests. 368 369 """
370 - def __init__(self):
371 self._wpool = HttpClientWorkerPool(self)
372
373 - def __del__(self):
374 self.Shutdown()
375
376 - def ExecRequests(self, requests):
377 """Execute HTTP requests. 378 379 This function can be called from multiple threads at the same time. 380 381 @type requests: List of HttpClientRequest instances 382 @param requests: The requests to execute 383 @rtype: List of HttpClientRequest instances 384 @return: The list of requests passed in 385 386 """ 387 # _HttpClientPendingRequest is used for internal thread synchronization 388 pending = [_HttpClientPendingRequest(req) for req in requests] 389 390 try: 391 # Add requests to queue 392 for pend_req in pending: 393 self._wpool.AddTask(pend_req) 394 395 finally: 396 # In case of an exception we should still wait for the rest, otherwise 397 # another thread from the worker pool could modify the request object 398 # after we returned. 399 400 # And wait for them to finish 401 for pend_req in pending: 402 pend_req.done.wait() 403 404 # Return original list 405 return requests
406
407 - def Shutdown(self):
408 self._wpool.Quiesce() 409 self._wpool.TerminateWorkers()
410