Package ganeti :: Package http :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.http.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2007, 2008, 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21  """HTTP client module. 
 22   
 23  """ 
 24   
 25  import logging 
 26  import pycurl 
 27  from cStringIO import StringIO 
 28   
 29  from ganeti import http 
 30  from ganeti import compat 
 31  from ganeti import netutils 
32 33 34 -class HttpClientRequest(object):
35 - def __init__(self, host, port, method, path, headers=None, post_data=None, 36 read_timeout=None, curl_config_fn=None):
37 """Describes an HTTP request. 38 39 @type host: string 40 @param host: Hostname 41 @type port: int 42 @param port: Port 43 @type method: string 44 @param method: Method name 45 @type path: string 46 @param path: Request path 47 @type headers: list or None 48 @param headers: Additional headers to send, list of strings 49 @type post_data: string or None 50 @param post_data: Additional data to send 51 @type read_timeout: int 52 @param read_timeout: if passed, it will be used as the read 53 timeout while reading the response from the server 54 @type curl_config_fn: callable 55 @param curl_config_fn: Function to configure cURL object before request 56 (Note: if the function configures the connection in 57 a way where it wouldn't be efficient to reuse them, 58 a "identity" property should be defined, see 59 L{HttpClientRequest.identity}) 60 61 """ 62 assert path.startswith("/"), "Path must start with slash (/)" 63 assert curl_config_fn is None or callable(curl_config_fn) 64 65 # Request attributes 66 self.host = host 67 self.port = port 68 self.method = method 69 self.path = path 70 self.read_timeout = read_timeout 71 self.curl_config_fn = curl_config_fn 72 73 if post_data is None: 74 self.post_data = "" 75 else: 76 self.post_data = post_data 77 78 if headers is None: 79 self.headers = [] 80 elif isinstance(headers, dict): 81 # Support for old interface 82 self.headers = ["%s: %s" % (name, value) 83 for name, value in headers.items()] 84 else: 85 self.headers = headers 86 87 # Response status 88 self.success = None 89 self.error = None 90 91 # Response attributes 92 self.resp_status_code = None 93 self.resp_body = None
94
95 - def __repr__(self):
96 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 97 "%s:%s" % (self.host, self.port), 98 self.method, 99 self.path] 100 101 return "<%s at %#x>" % (" ".join(status), id(self))
102 103 @property
104 - def url(self):
105 """Returns the full URL for this requests. 106 107 """ 108 if netutils.IPAddress.IsValid(self.host): 109 address = netutils.FormatAddress((self.host, self.port)) 110 else: 111 address = "%s:%s" % (self.host, self.port) 112 # TODO: Support for non-SSL requests 113 return "https://%s%s" % (address, self.path)
114 115 @property
116 - def identity(self):
117 """Returns identifier for retrieving a pooled connection for this request. 118 119 This allows cURL client objects to be re-used and to cache information 120 (e.g. SSL session IDs or connections). 121 122 """ 123 parts = [self.host, self.port] 124 125 if self.curl_config_fn: 126 try: 127 parts.append(self.curl_config_fn.identity) 128 except AttributeError: 129 pass 130 131 return "/".join(str(i) for i in parts)
132
133 134 -class _HttpClient(object):
135 - def __init__(self, curl_config_fn):
136 """Initializes this class. 137 138 @type curl_config_fn: callable 139 @param curl_config_fn: Function to configure cURL object after 140 initialization 141 142 """ 143 self._req = None 144 145 curl = self._CreateCurlHandle() 146 curl.setopt(pycurl.VERBOSE, False) 147 curl.setopt(pycurl.NOSIGNAL, True) 148 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION) 149 curl.setopt(pycurl.PROXY, "") 150 151 # Disable SSL session ID caching (pycurl >= 7.16.0) 152 if hasattr(pycurl, "SSL_SESSIONID_CACHE"): 153 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False) 154 155 # Pass cURL object to external config function 156 if curl_config_fn: 157 curl_config_fn(curl) 158 159 self._curl = curl
160 161 @staticmethod
162 - def _CreateCurlHandle():
163 """Returns a new cURL object. 164 165 """ 166 return pycurl.Curl()
167
168 - def GetCurlHandle(self):
169 """Returns the cURL object. 170 171 """ 172 return self._curl
173
174 - def GetCurrentRequest(self):
175 """Returns the current request. 176 177 @rtype: L{HttpClientRequest} or None 178 179 """ 180 return self._req
181
182 - def StartRequest(self, req):
183 """Starts a request on this client. 184 185 @type req: L{HttpClientRequest} 186 @param req: HTTP request 187 188 """ 189 assert not self._req, "Another request is already started" 190 191 logging.debug("Starting request %r", req) 192 193 self._req = req 194 self._resp_buffer = StringIO() 195 196 url = req.url 197 method = req.method 198 post_data = req.post_data 199 headers = req.headers 200 201 # PycURL requires strings to be non-unicode 202 assert isinstance(method, str) 203 assert isinstance(url, str) 204 assert isinstance(post_data, str) 205 assert compat.all(isinstance(i, str) for i in headers) 206 207 # Configure cURL object for request 208 curl = self._curl 209 curl.setopt(pycurl.CUSTOMREQUEST, str(method)) 210 curl.setopt(pycurl.URL, url) 211 curl.setopt(pycurl.POSTFIELDS, post_data) 212 curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write) 213 curl.setopt(pycurl.HTTPHEADER, headers) 214 215 if req.read_timeout is None: 216 curl.setopt(pycurl.TIMEOUT, 0) 217 else: 218 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout)) 219 220 # Pass cURL object to external config function 221 if req.curl_config_fn: 222 req.curl_config_fn(curl)
223
224 - def Done(self, errmsg):
225 """Finishes a request. 226 227 @type errmsg: string or None 228 @param errmsg: Error message if request failed 229 230 """ 231 req = self._req 232 assert req, "No request" 233 234 logging.debug("Request %s finished, errmsg=%s", req, errmsg) 235 236 curl = self._curl 237 238 req.success = not bool(errmsg) 239 req.error = errmsg 240 241 # Get HTTP response code 242 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE) 243 req.resp_body = self._resp_buffer.getvalue() 244 245 # Reset client object 246 self._req = None 247 self._resp_buffer = None 248 249 # Ensure no potentially large variables are referenced 250 curl.setopt(pycurl.POSTFIELDS, "") 251 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
252
253 254 -class _PooledHttpClient:
255 """Data structure for HTTP client pool. 256 257 """
258 - def __init__(self, identity, client):
259 """Initializes this class. 260 261 @type identity: string 262 @param identity: Client identifier for pool 263 @type client: L{_HttpClient} 264 @param client: HTTP client 265 266 """ 267 self.identity = identity 268 self.client = client 269 self.lastused = 0
270
271 - def __repr__(self):
272 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 273 "id=%s" % self.identity, 274 "lastuse=%s" % self.lastused, 275 repr(self.client)] 276 277 return "<%s at %#x>" % (" ".join(status), id(self))
278
279 280 -class HttpClientPool:
281 """A simple HTTP client pool. 282 283 Supports one pooled connection per identity (see 284 L{HttpClientRequest.identity}). 285 286 """ 287 #: After how many generations to drop unused clients 288 _MAX_GENERATIONS_DROP = 25 289
290 - def __init__(self, curl_config_fn):
291 """Initializes this class. 292 293 @type curl_config_fn: callable 294 @param curl_config_fn: Function to configure cURL object after 295 initialization 296 297 """ 298 self._curl_config_fn = curl_config_fn 299 self._generation = 0 300 self._pool = {} 301 302 # Create custom logger for HTTP client pool. Change logging level to 303 # C{logging.NOTSET} to get more details. 304 self._logger = logging.getLogger(self.__class__.__name__) 305 self._logger.setLevel(logging.INFO)
306 307 @staticmethod
309 """Returns callable to create HTTP client. 310 311 """ 312 return _HttpClient
313
314 - def _Get(self, identity):
315 """Gets an HTTP client from the pool. 316 317 @type identity: string 318 @param identity: Client identifier 319 320 """ 321 try: 322 pclient = self._pool.pop(identity) 323 except KeyError: 324 # Need to create new client 325 client = self._GetHttpClientCreator()(self._curl_config_fn) 326 pclient = _PooledHttpClient(identity, client) 327 self._logger.debug("Created new client %s", pclient) 328 else: 329 self._logger.debug("Reusing client %s", pclient) 330 331 assert pclient.identity == identity 332 333 return pclient
334
335 - def _StartRequest(self, req):
336 """Starts a request. 337 338 @type req: L{HttpClientRequest} 339 @param req: HTTP request 340 341 """ 342 pclient = self._Get(req.identity) 343 344 assert req.identity not in self._pool 345 346 pclient.client.StartRequest(req) 347 pclient.lastused = self._generation 348 349 return pclient
350
351 - def _Return(self, pclients):
352 """Returns HTTP clients to the pool. 353 354 """ 355 for pc in pclients: 356 self._logger.debug("Returning client %s to pool", pc) 357 assert pc.identity not in self._pool 358 assert pc not in self._pool.values() 359 self._pool[pc.identity] = pc 360 361 # Check for unused clients 362 for pc in self._pool.values(): 363 if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation: 364 self._logger.debug("Removing client %s which hasn't been used" 365 " for %s generations", 366 pc, self._MAX_GENERATIONS_DROP) 367 self._pool.pop(pc.identity, None) 368 369 assert compat.all(pc.lastused >= (self._generation - 370 self._MAX_GENERATIONS_DROP) 371 for pc in self._pool.values())
372 373 @staticmethod
375 """Creates new cURL multi handle. 376 377 """ 378 return pycurl.CurlMulti()
379
380 - def ProcessRequests(self, requests):
381 """Processes any number of HTTP client requests using pooled objects. 382 383 @type requests: list of L{HttpClientRequest} 384 @param requests: List of all requests 385 386 """ 387 multi = self._CreateCurlMultiHandle() 388 389 # For client cleanup 390 self._generation += 1 391 392 assert compat.all((req.error is None and 393 req.success is None and 394 req.resp_status_code is None and 395 req.resp_body is None) 396 for req in requests) 397 398 curl_to_pclient = {} 399 for req in requests: 400 pclient = self._StartRequest(req) 401 curl = pclient.client.GetCurlHandle() 402 curl_to_pclient[curl] = pclient 403 multi.add_handle(curl) 404 assert pclient.client.GetCurrentRequest() == req 405 assert pclient.lastused >= 0 406 407 assert len(curl_to_pclient) == len(requests) 408 409 done_count = 0 410 while True: 411 (ret, _) = multi.perform() 412 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM) 413 414 if ret == pycurl.E_CALL_MULTI_PERFORM: 415 # cURL wants to be called again 416 continue 417 418 while True: 419 (remaining_messages, successful, failed) = multi.info_read() 420 421 for curl in successful: 422 multi.remove_handle(curl) 423 done_count += 1 424 pclient = curl_to_pclient[curl] 425 req = pclient.client.GetCurrentRequest() 426 pclient.client.Done(None) 427 assert req.success 428 assert not pclient.client.GetCurrentRequest() 429 430 for curl, errnum, errmsg in failed: 431 multi.remove_handle(curl) 432 done_count += 1 433 pclient = curl_to_pclient[curl] 434 req = pclient.client.GetCurrentRequest() 435 pclient.client.Done("Error %s: %s" % (errnum, errmsg)) 436 assert req.error 437 assert not pclient.client.GetCurrentRequest() 438 439 if remaining_messages == 0: 440 break 441 442 assert done_count <= len(requests) 443 444 if done_count == len(requests): 445 break 446 447 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP 448 # timeouts, which are only evaluated in multi.perform, aren't 449 # unnecessarily delayed. 450 multi.select(1.0) 451 452 assert compat.all(pclient.client.GetCurrentRequest() is None 453 for pclient in curl_to_pclient.values()) 454 455 # Return clients to pool 456 self._Return(curl_to_pclient.values()) 457 458 assert done_count == len(requests) 459 assert compat.all(req.error is not None or 460 (req.success and 461 req.resp_status_code is not None and 462 req.resp_body is not None) 463 for req in requests)
464