Package ganeti :: Package http :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.http.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2007, 2008, 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21  """HTTP client module. 
 22   
 23  """ 
 24   
 25  import logging 
 26  import pycurl 
 27  from cStringIO import StringIO 
 28   
 29  from ganeti import http 
 30  from ganeti import compat 
31 32 33 -class HttpClientRequest(object):
34 - def __init__(self, host, port, method, path, headers=None, post_data=None, 35 read_timeout=None, curl_config_fn=None):
36 """Describes an HTTP request. 37 38 @type host: string 39 @param host: Hostname 40 @type port: int 41 @param port: Port 42 @type method: string 43 @param method: Method name 44 @type path: string 45 @param path: Request path 46 @type headers: list or None 47 @param headers: Additional headers to send, list of strings 48 @type post_data: string or None 49 @param post_data: Additional data to send 50 @type read_timeout: int 51 @param read_timeout: if passed, it will be used as the read 52 timeout while reading the response from the server 53 @type curl_config_fn: callable 54 @param curl_config_fn: Function to configure cURL object before request 55 (Note: if the function configures the connection in 56 a way where it wouldn't be efficient to reuse them, 57 a "identity" property should be defined, see 58 L{HttpClientRequest.identity}) 59 60 """ 61 assert path.startswith("/"), "Path must start with slash (/)" 62 assert curl_config_fn is None or callable(curl_config_fn) 63 64 # Request attributes 65 self.host = host 66 self.port = port 67 self.method = method 68 self.path = path 69 self.read_timeout = read_timeout 70 self.curl_config_fn = curl_config_fn 71 72 if post_data is None: 73 self.post_data = "" 74 else: 75 self.post_data = post_data 76 77 if headers is None: 78 self.headers = [] 79 elif isinstance(headers, dict): 80 # Support for old interface 81 self.headers = ["%s: %s" % (name, value) 82 for name, value in headers.items()] 83 else: 84 self.headers = headers 85 86 # Response status 87 self.success = None 88 self.error = None 89 90 # Response attributes 91 self.resp_status_code = None 92 self.resp_body = None
93
94 - def __repr__(self):
95 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 96 "%s:%s" % (self.host, self.port), 97 self.method, 98 self.path] 99 100 return "<%s at %#x>" % (" ".join(status), id(self))
101 102 @property
103 - def url(self):
104 """Returns the full URL for this requests. 105 106 """ 107 # TODO: Support for non-SSL requests 108 return "https://%s:%s%s" % (self.host, self.port, self.path)
109 110 @property
111 - def identity(self):
112 """Returns identifier for retrieving a pooled connection for this request. 113 114 This allows cURL client objects to be re-used and to cache information 115 (e.g. SSL session IDs or connections). 116 117 """ 118 parts = [self.host, self.port] 119 120 if self.curl_config_fn: 121 try: 122 parts.append(self.curl_config_fn.identity) 123 except AttributeError: 124 pass 125 126 return "/".join(str(i) for i in parts)
127
128 129 -class _HttpClient(object):
130 - def __init__(self, curl_config_fn):
131 """Initializes this class. 132 133 @type curl_config_fn: callable 134 @param curl_config_fn: Function to configure cURL object after 135 initialization 136 137 """ 138 self._req = None 139 140 curl = self._CreateCurlHandle() 141 curl.setopt(pycurl.VERBOSE, False) 142 curl.setopt(pycurl.NOSIGNAL, True) 143 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION) 144 curl.setopt(pycurl.PROXY, "") 145 146 # Disable SSL session ID caching (pycurl >= 7.16.0) 147 if hasattr(pycurl, "SSL_SESSIONID_CACHE"): 148 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False) 149 150 # Pass cURL object to external config function 151 if curl_config_fn: 152 curl_config_fn(curl) 153 154 self._curl = curl
155 156 @staticmethod
157 - def _CreateCurlHandle():
158 """Returns a new cURL object. 159 160 """ 161 return pycurl.Curl()
162
163 - def GetCurlHandle(self):
164 """Returns the cURL object. 165 166 """ 167 return self._curl
168
169 - def GetCurrentRequest(self):
170 """Returns the current request. 171 172 @rtype: L{HttpClientRequest} or None 173 174 """ 175 return self._req
176
177 - def StartRequest(self, req):
178 """Starts a request on this client. 179 180 @type req: L{HttpClientRequest} 181 @param req: HTTP request 182 183 """ 184 assert not self._req, "Another request is already started" 185 186 self._req = req 187 self._resp_buffer = StringIO() 188 189 url = req.url 190 method = req.method 191 post_data = req.post_data 192 headers = req.headers 193 194 # PycURL requires strings to be non-unicode 195 assert isinstance(method, str) 196 assert isinstance(url, str) 197 assert isinstance(post_data, str) 198 assert compat.all(isinstance(i, str) for i in headers) 199 200 # Configure cURL object for request 201 curl = self._curl 202 curl.setopt(pycurl.CUSTOMREQUEST, str(method)) 203 curl.setopt(pycurl.URL, url) 204 curl.setopt(pycurl.POSTFIELDS, post_data) 205 curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write) 206 curl.setopt(pycurl.HTTPHEADER, headers) 207 208 if req.read_timeout is None: 209 curl.setopt(pycurl.TIMEOUT, 0) 210 else: 211 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout)) 212 213 # Pass cURL object to external config function 214 if req.curl_config_fn: 215 req.curl_config_fn(curl)
216
217 - def Done(self, errmsg):
218 """Finishes a request. 219 220 @type errmsg: string or None 221 @param errmsg: Error message if request failed 222 223 """ 224 req = self._req 225 assert req, "No request" 226 227 logging.debug("Request %s finished, errmsg=%s", req, errmsg) 228 229 curl = self._curl 230 231 req.success = not bool(errmsg) 232 req.error = errmsg 233 234 # Get HTTP response code 235 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE) 236 req.resp_body = self._resp_buffer.getvalue() 237 238 # Reset client object 239 self._req = None 240 self._resp_buffer = None 241 242 # Ensure no potentially large variables are referenced 243 curl.setopt(pycurl.POSTFIELDS, "") 244 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
245
246 247 -class _PooledHttpClient:
248 """Data structure for HTTP client pool. 249 250 """
251 - def __init__(self, identity, client):
252 """Initializes this class. 253 254 @type identity: string 255 @param identity: Client identifier for pool 256 @type client: L{_HttpClient} 257 @param client: HTTP client 258 259 """ 260 self.identity = identity 261 self.client = client 262 self.lastused = 0
263
264 - def __repr__(self):
265 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 266 "id=%s" % self.identity, 267 "lastuse=%s" % self.lastused, 268 repr(self.client)] 269 270 return "<%s at %#x>" % (" ".join(status), id(self))
271
272 273 -class HttpClientPool:
274 """A simple HTTP client pool. 275 276 Supports one pooled connection per identity (see 277 L{HttpClientRequest.identity}). 278 279 """ 280 #: After how many generations to drop unused clients 281 _MAX_GENERATIONS_DROP = 25 282
283 - def __init__(self, curl_config_fn):
284 """Initializes this class. 285 286 @type curl_config_fn: callable 287 @param curl_config_fn: Function to configure cURL object after 288 initialization 289 290 """ 291 self._curl_config_fn = curl_config_fn 292 self._generation = 0 293 self._pool = {}
294 295 @staticmethod
297 """Returns callable to create HTTP client. 298 299 """ 300 return _HttpClient
301
302 - def _Get(self, identity):
303 """Gets an HTTP client from the pool. 304 305 @type identity: string 306 @param identity: Client identifier 307 308 """ 309 try: 310 pclient = self._pool.pop(identity) 311 except KeyError: 312 # Need to create new client 313 client = self._GetHttpClientCreator()(self._curl_config_fn) 314 pclient = _PooledHttpClient(identity, client) 315 logging.debug("Created new client %s", pclient) 316 else: 317 logging.debug("Reusing client %s", pclient) 318 319 assert pclient.identity == identity 320 321 return pclient
322
323 - def _StartRequest(self, req):
324 """Starts a request. 325 326 @type req: L{HttpClientRequest} 327 @param req: HTTP request 328 329 """ 330 logging.debug("Starting request %r", req) 331 pclient = self._Get(req.identity) 332 333 assert req.identity not in self._pool 334 335 pclient.client.StartRequest(req) 336 pclient.lastused = self._generation 337 338 return pclient
339
340 - def _Return(self, pclients):
341 """Returns HTTP clients to the pool. 342 343 """ 344 for pc in pclients: 345 logging.debug("Returning client %s to pool", pc) 346 assert pc.identity not in self._pool 347 assert pc not in self._pool.values() 348 self._pool[pc.identity] = pc 349 350 # Check for unused clients 351 for pc in self._pool.values(): 352 if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation: 353 logging.debug("Removing client %s which hasn't been used" 354 " for %s generations", 355 pc, self._MAX_GENERATIONS_DROP) 356 self._pool.pop(pc.identity, None) 357 358 assert compat.all(pc.lastused >= (self._generation - 359 self._MAX_GENERATIONS_DROP) 360 for pc in self._pool.values())
361 362 @staticmethod
364 """Creates new cURL multi handle. 365 366 """ 367 return pycurl.CurlMulti()
368
369 - def ProcessRequests(self, requests):
370 """Processes any number of HTTP client requests using pooled objects. 371 372 @type requests: list of L{HttpClientRequest} 373 @param requests: List of all requests 374 375 """ 376 multi = self._CreateCurlMultiHandle() 377 378 # For client cleanup 379 self._generation += 1 380 381 assert compat.all((req.error is None and 382 req.success is None and 383 req.resp_status_code is None and 384 req.resp_body is None) 385 for req in requests) 386 387 curl_to_pclient = {} 388 for req in requests: 389 pclient = self._StartRequest(req) 390 curl = pclient.client.GetCurlHandle() 391 curl_to_pclient[curl] = pclient 392 multi.add_handle(curl) 393 assert pclient.client.GetCurrentRequest() == req 394 assert pclient.lastused >= 0 395 396 assert len(curl_to_pclient) == len(requests) 397 398 done_count = 0 399 while True: 400 (ret, _) = multi.perform() 401 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM) 402 403 if ret == pycurl.E_CALL_MULTI_PERFORM: 404 # cURL wants to be called again 405 continue 406 407 while True: 408 (remaining_messages, successful, failed) = multi.info_read() 409 410 for curl in successful: 411 multi.remove_handle(curl) 412 done_count += 1 413 pclient = curl_to_pclient[curl] 414 req = pclient.client.GetCurrentRequest() 415 pclient.client.Done(None) 416 assert req.success 417 assert not pclient.client.GetCurrentRequest() 418 419 for curl, errnum, errmsg in failed: 420 multi.remove_handle(curl) 421 done_count += 1 422 pclient = curl_to_pclient[curl] 423 req = pclient.client.GetCurrentRequest() 424 pclient.client.Done("Error %s: %s" % (errnum, errmsg)) 425 assert req.error 426 assert not pclient.client.GetCurrentRequest() 427 428 if remaining_messages == 0: 429 break 430 431 assert done_count <= len(requests) 432 433 if done_count == len(requests): 434 break 435 436 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP 437 # timeouts, which are only evaluated in multi.perform, aren't 438 # unnecessarily delayed. 439 multi.select(1.0) 440 441 assert compat.all(pclient.client.GetCurrentRequest() is None 442 for pclient in curl_to_pclient.values()) 443 444 # Return clients to pool 445 self._Return(curl_to_pclient.values()) 446 447 assert done_count == len(requests) 448 assert compat.all(req.error is not None or 449 (req.success and 450 req.resp_status_code is not None and 451 req.resp_body is not None) 452 for req in requests)
453