Package ganeti :: Package http :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.http.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2007, 2008, 2010 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30  """HTTP client module. 
 31   
 32  """ 
 33   
 34  import logging 
 35  import pycurl 
 36  import threading 
 37  from cStringIO import StringIO 
 38   
 39  from ganeti import http 
 40  from ganeti import compat 
 41  from ganeti import netutils 
 42  from ganeti import locking 
43 44 45 -class HttpClientRequest(object):
46 - def __init__(self, host, port, method, path, headers=None, post_data=None, 47 read_timeout=None, curl_config_fn=None, nicename=None, 48 completion_cb=None):
49 """Describes an HTTP request. 50 51 @type host: string 52 @param host: Hostname 53 @type port: int 54 @param port: Port 55 @type method: string 56 @param method: Method name 57 @type path: string 58 @param path: Request path 59 @type headers: list or None 60 @param headers: Additional headers to send, list of strings 61 @type post_data: string or None 62 @param post_data: Additional data to send 63 @type read_timeout: int 64 @param read_timeout: if passed, it will be used as the read 65 timeout while reading the response from the server 66 @type curl_config_fn: callable 67 @param curl_config_fn: Function to configure cURL object before request 68 @type nicename: string 69 @param nicename: Name, presentable to a user, to describe this request (no 70 whitespace) 71 @type completion_cb: callable accepting this request object as a single 72 parameter 73 @param completion_cb: Callback for request completion 74 75 """ 76 assert path.startswith("/"), "Path must start with slash (/)" 77 assert curl_config_fn is None or callable(curl_config_fn) 78 assert completion_cb is None or callable(completion_cb) 79 80 # Request attributes 81 self.host = host 82 self.port = port 83 self.method = method 84 self.path = path 85 self.read_timeout = read_timeout 86 self.curl_config_fn = curl_config_fn 87 self.nicename = nicename 88 self.completion_cb = completion_cb 89 90 if post_data is None: 91 self.post_data = "" 92 else: 93 self.post_data = post_data 94 95 if headers is None: 96 self.headers = [] 97 elif isinstance(headers, dict): 98 # Support for old interface 99 self.headers = ["%s: %s" % (name, value) 100 for name, value in headers.items()] 101 else: 102 self.headers = headers 103 104 # Response status 105 self.success = None 106 self.error = None 107 108 # Response attributes 109 self.resp_status_code = None 110 self.resp_body = None
111
112 - def __repr__(self):
113 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 114 "%s:%s" % (self.host, self.port), 115 self.method, 116 self.path] 117 118 return "<%s at %#x>" % (" ".join(status), id(self))
119 120 @property
121 - def url(self):
122 """Returns the full URL for this requests. 123 124 """ 125 if netutils.IPAddress.IsValid(self.host): 126 address = netutils.FormatAddress((self.host, self.port)) 127 else: 128 address = "%s:%s" % (self.host, self.port) 129 # TODO: Support for non-SSL requests 130 return "https://%s%s" % (address, self.path)
131
132 133 -def _StartRequest(curl, req):
134 """Starts a request on a cURL object. 135 136 @type curl: pycurl.Curl 137 @param curl: cURL object 138 @type req: L{HttpClientRequest} 139 @param req: HTTP request 140 141 """ 142 logging.debug("Starting request %r", req) 143 144 url = req.url 145 method = req.method 146 post_data = req.post_data 147 headers = req.headers 148 149 # PycURL requires strings to be non-unicode 150 assert isinstance(method, str) 151 assert isinstance(url, str) 152 assert isinstance(post_data, str) 153 assert compat.all(isinstance(i, str) for i in headers) 154 155 # Buffer for response 156 resp_buffer = StringIO() 157 158 # Configure client for request 159 curl.setopt(pycurl.VERBOSE, False) 160 curl.setopt(pycurl.NOSIGNAL, True) 161 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION) 162 curl.setopt(pycurl.PROXY, "") 163 curl.setopt(pycurl.CUSTOMREQUEST, str(method)) 164 curl.setopt(pycurl.URL, url) 165 curl.setopt(pycurl.POSTFIELDS, post_data) 166 curl.setopt(pycurl.HTTPHEADER, headers) 167 168 if req.read_timeout is None: 169 curl.setopt(pycurl.TIMEOUT, 0) 170 else: 171 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout)) 172 173 # Disable SSL session ID caching (pycurl >= 7.16.0) 174 if hasattr(pycurl, "SSL_SESSIONID_CACHE"): 175 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False) 176 177 curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write) 178 179 # Pass cURL object to external config function 180 if req.curl_config_fn: 181 req.curl_config_fn(curl) 182 183 return _PendingRequest(curl, req, resp_buffer.getvalue)
184
185 186 -class _PendingRequest(object):
187 - def __init__(self, curl, req, resp_buffer_read):
188 """Initializes this class. 189 190 @type curl: pycurl.Curl 191 @param curl: cURL object 192 @type req: L{HttpClientRequest} 193 @param req: HTTP request 194 @type resp_buffer_read: callable 195 @param resp_buffer_read: Function to read response body 196 197 """ 198 assert req.success is None 199 200 self._curl = curl 201 self._req = req 202 self._resp_buffer_read = resp_buffer_read
203
204 - def GetCurlHandle(self):
205 """Returns the cURL object. 206 207 """ 208 return self._curl
209
210 - def GetCurrentRequest(self):
211 """Returns the current request. 212 213 """ 214 return self._req
215
216 - def Done(self, errmsg):
217 """Finishes a request. 218 219 @type errmsg: string or None 220 @param errmsg: Error message if request failed 221 222 """ 223 curl = self._curl 224 req = self._req 225 226 assert req.success is None, "Request has already been finalized" 227 228 try: 229 # LOCAL_* options added in pycurl 7.21.5 230 from_str = "from %s:%s " % ( 231 req.getinfo(pycurl.LOCAL_IP), 232 req.getinfo(pycurl.LOCAL_PORT) 233 ) 234 except AttributeError: 235 from_str = "" 236 logging.debug("Request %s%s finished, errmsg=%s", from_str, req, errmsg) 237 238 req.success = not bool(errmsg) 239 req.error = errmsg 240 241 # Get HTTP response code 242 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE) 243 req.resp_body = self._resp_buffer_read() 244 245 # Ensure no potentially large variables are referenced 246 curl.setopt(pycurl.POSTFIELDS, "") 247 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None) 248 249 if req.completion_cb: 250 req.completion_cb(req)
251
252 253 -class _NoOpRequestMonitor(object): # pylint: disable=W0232
254 """No-op request monitor. 255 256 """ 257 @staticmethod
258 - def acquire(*args, **kwargs):
259 pass
260 261 release = acquire 262 Disable = acquire 263
264 265 -class _PendingRequestMonitor(object):
266 _LOCK = "_lock" 267
268 - def __init__(self, owner, pending_fn):
269 """Initializes this class. 270 271 """ 272 self._owner = owner 273 self._pending_fn = pending_fn 274 275 # The lock monitor runs in another thread, hence locking is necessary 276 self._lock = locking.SharedLock("PendingHttpRequests") 277 self.acquire = self._lock.acquire 278 self.release = self._lock.release
279 280 @locking.ssynchronized(_LOCK)
281 - def Disable(self):
282 """Disable monitor. 283 284 """ 285 self._pending_fn = None
286 287 @locking.ssynchronized(_LOCK, shared=1)
288 - def GetLockInfo(self, requested): # pylint: disable=W0613
289 """Retrieves information about pending requests. 290 291 @type requested: set 292 @param requested: Requested information, see C{query.LQ_*} 293 294 """ 295 # No need to sort here, that's being done by the lock manager and query 296 # library. There are no priorities for requests, hence all show up as 297 # one item under "pending". 298 result = [] 299 300 if self._pending_fn: 301 owner_name = self._owner.getName() 302 303 for client in self._pending_fn(): 304 req = client.GetCurrentRequest() 305 if req: 306 if req.nicename is None: 307 name = "%s%s" % (req.host, req.path) 308 else: 309 name = req.nicename 310 result.append(("rpc/%s" % name, None, [owner_name], None)) 311 312 return result
313
314 315 -def _ProcessCurlRequests(multi, requests):
316 """cURL request processor. 317 318 This generator yields a tuple once for every completed request, successful or 319 not. The first value in the tuple is the handle, the second an error message 320 or C{None} for successful requests. 321 322 @type multi: C{pycurl.CurlMulti} 323 @param multi: cURL multi object 324 @type requests: sequence 325 @param requests: cURL request handles 326 327 """ 328 for curl in requests: 329 multi.add_handle(curl) 330 331 while True: 332 (ret, active) = multi.perform() 333 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM) 334 335 if ret == pycurl.E_CALL_MULTI_PERFORM: 336 # cURL wants to be called again 337 continue 338 339 while True: 340 (remaining_messages, successful, failed) = multi.info_read() 341 342 for curl in successful: 343 multi.remove_handle(curl) 344 yield (curl, None) 345 346 for curl, errnum, errmsg in failed: 347 multi.remove_handle(curl) 348 yield (curl, "Error %s: %s" % (errnum, errmsg)) 349 350 if remaining_messages == 0: 351 break 352 353 if active == 0: 354 # No active handles anymore 355 break 356 357 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP 358 # timeouts, which are only evaluated in multi.perform, aren't 359 # unnecessarily delayed. 360 multi.select(1.0)
361
362 363 -def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl, 364 _curl_multi=pycurl.CurlMulti, 365 _curl_process=_ProcessCurlRequests):
366 """Processes any number of HTTP client requests. 367 368 @type requests: list of L{HttpClientRequest} 369 @param requests: List of all requests 370 @param lock_monitor_cb: Callable for registering with lock monitor 371 372 """ 373 assert compat.all((req.error is None and 374 req.success is None and 375 req.resp_status_code is None and 376 req.resp_body is None) 377 for req in requests) 378 379 # Prepare all requests 380 curl_to_client = \ 381 dict((client.GetCurlHandle(), client) 382 for client in map(lambda req: _StartRequest(_curl(), req), requests)) 383 384 assert len(curl_to_client) == len(requests) 385 386 if lock_monitor_cb: 387 monitor = _PendingRequestMonitor(threading.currentThread(), 388 curl_to_client.values) 389 lock_monitor_cb(monitor) 390 else: 391 monitor = _NoOpRequestMonitor 392 393 # Process all requests and act based on the returned values 394 for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()): 395 monitor.acquire(shared=0) 396 try: 397 curl_to_client.pop(curl).Done(msg) 398 finally: 399 monitor.release() 400 401 assert not curl_to_client, "Not all requests were processed" 402 403 # Don't try to read information anymore as all requests have been processed 404 monitor.Disable() 405 406 assert compat.all(req.error is not None or 407 (req.success and 408 req.resp_status_code is not None and 409 req.resp_body is not None) 410 for req in requests)
411