Package ganeti :: Package http :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.http.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2007, 2008, 2010 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30  """HTTP client module. 
 31   
 32  """ 
 33   
 34  import logging 
 35  import pycurl 
 36  import threading 
 37  from cStringIO import StringIO 
 38   
 39  from ganeti import http 
 40  from ganeti import compat 
 41  from ganeti import netutils 
 42  from ganeti import locking 
43 44 45 -class HttpClientRequest(object):
46 - def __init__(self, host, port, method, path, headers=None, post_data=None, 47 read_timeout=None, curl_config_fn=None, nicename=None, 48 completion_cb=None):
49 """Describes an HTTP request. 50 51 @type host: string 52 @param host: Hostname 53 @type port: int 54 @param port: Port 55 @type method: string 56 @param method: Method name 57 @type path: string 58 @param path: Request path 59 @type headers: list or None 60 @param headers: Additional headers to send, list of strings 61 @type post_data: string or None 62 @param post_data: Additional data to send 63 @type read_timeout: int 64 @param read_timeout: if passed, it will be used as the read 65 timeout while reading the response from the server 66 @type curl_config_fn: callable 67 @param curl_config_fn: Function to configure cURL object before request 68 @type nicename: string 69 @param nicename: Name, presentable to a user, to describe this request (no 70 whitespace) 71 @type completion_cb: callable accepting this request object as a single 72 parameter 73 @param completion_cb: Callback for request completion 74 75 """ 76 assert path.startswith("/"), "Path must start with slash (/)" 77 assert curl_config_fn is None or callable(curl_config_fn) 78 assert completion_cb is None or callable(completion_cb) 79 80 # Request attributes 81 self.host = host 82 self.port = port 83 self.method = method 84 self.path = path 85 self.read_timeout = read_timeout 86 self.curl_config_fn = curl_config_fn 87 self.nicename = nicename 88 self.completion_cb = completion_cb 89 90 if post_data is None: 91 self.post_data = "" 92 else: 93 self.post_data = post_data 94 95 if headers is None: 96 self.headers = [] 97 elif isinstance(headers, dict): 98 # Support for old interface 99 self.headers = ["%s: %s" % (name, value) 100 for name, value in headers.items()] 101 else: 102 self.headers = headers 103 104 # Response status 105 self.success = None 106 self.error = None 107 108 # Response attributes 109 self.resp_status_code = None 110 self.resp_body = None
111
112 - def __repr__(self):
113 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 114 "%s:%s" % (self.host, self.port), 115 self.method, 116 self.path] 117 118 return "<%s at %#x>" % (" ".join(status), id(self))
119 120 @property
121 - def url(self):
122 """Returns the full URL for this requests. 123 124 """ 125 if netutils.IPAddress.IsValid(self.host): 126 address = netutils.FormatAddress((self.host, self.port)) 127 else: 128 address = "%s:%s" % (self.host, self.port) 129 # TODO: Support for non-SSL requests 130 return "https://%s%s" % (address, self.path)
131
132 133 -def _StartRequest(curl, req):
134 """Starts a request on a cURL object. 135 136 @type curl: pycurl.Curl 137 @param curl: cURL object 138 @type req: L{HttpClientRequest} 139 @param req: HTTP request 140 141 """ 142 logging.debug("Starting request %r", req) 143 144 url = req.url 145 method = req.method 146 post_data = req.post_data 147 headers = req.headers 148 149 # PycURL requires strings to be non-unicode 150 assert isinstance(method, str) 151 assert isinstance(url, str) 152 assert isinstance(post_data, str) 153 assert compat.all(isinstance(i, str) for i in headers) 154 155 # Buffer for response 156 resp_buffer = StringIO() 157 158 # Configure client for request 159 curl.setopt(pycurl.VERBOSE, False) 160 curl.setopt(pycurl.NOSIGNAL, True) 161 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION) 162 curl.setopt(pycurl.PROXY, "") 163 curl.setopt(pycurl.CUSTOMREQUEST, str(method)) 164 curl.setopt(pycurl.URL, url) 165 curl.setopt(pycurl.POSTFIELDS, post_data) 166 curl.setopt(pycurl.HTTPHEADER, headers) 167 168 if req.read_timeout is None: 169 curl.setopt(pycurl.TIMEOUT, 0) 170 else: 171 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout)) 172 173 # Disable SSL session ID caching (pycurl >= 7.16.0) 174 if hasattr(pycurl, "SSL_SESSIONID_CACHE"): 175 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False) 176 177 curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write) 178 179 # Pass cURL object to external config function 180 if req.curl_config_fn: 181 req.curl_config_fn(curl) 182 183 return _PendingRequest(curl, req, resp_buffer.getvalue)
184
185 186 -class _PendingRequest(object):
187 - def __init__(self, curl, req, resp_buffer_read):
188 """Initializes this class. 189 190 @type curl: pycurl.Curl 191 @param curl: cURL object 192 @type req: L{HttpClientRequest} 193 @param req: HTTP request 194 @type resp_buffer_read: callable 195 @param resp_buffer_read: Function to read response body 196 197 """ 198 assert req.success is None 199 200 self._curl = curl 201 self._req = req 202 self._resp_buffer_read = resp_buffer_read
203
204 - def GetCurlHandle(self):
205 """Returns the cURL object. 206 207 """ 208 return self._curl
209
210 - def GetCurrentRequest(self):
211 """Returns the current request. 212 213 """ 214 return self._req
215
216 - def Done(self, errmsg):
217 """Finishes a request. 218 219 @type errmsg: string or None 220 @param errmsg: Error message if request failed 221 222 """ 223 curl = self._curl 224 req = self._req 225 226 assert req.success is None, "Request has already been finalized" 227 228 logging.debug("Request %s finished, errmsg=%s", req, errmsg) 229 230 req.success = not bool(errmsg) 231 req.error = errmsg 232 233 # Get HTTP response code 234 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE) 235 req.resp_body = self._resp_buffer_read() 236 237 # Ensure no potentially large variables are referenced 238 curl.setopt(pycurl.POSTFIELDS, "") 239 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None) 240 241 if req.completion_cb: 242 req.completion_cb(req)
243
244 245 -class _NoOpRequestMonitor(object): # pylint: disable=W0232
246 """No-op request monitor. 247 248 """ 249 @staticmethod
250 - def acquire(*args, **kwargs):
251 pass
252 253 release = acquire 254 Disable = acquire 255
256 257 -class _PendingRequestMonitor(object):
258 _LOCK = "_lock" 259
260 - def __init__(self, owner, pending_fn):
261 """Initializes this class. 262 263 """ 264 self._owner = owner 265 self._pending_fn = pending_fn 266 267 # The lock monitor runs in another thread, hence locking is necessary 268 self._lock = locking.SharedLock("PendingHttpRequests") 269 self.acquire = self._lock.acquire 270 self.release = self._lock.release
271 272 @locking.ssynchronized(_LOCK)
273 - def Disable(self):
274 """Disable monitor. 275 276 """ 277 self._pending_fn = None
278 279 @locking.ssynchronized(_LOCK, shared=1)
280 - def GetLockInfo(self, requested): # pylint: disable=W0613
281 """Retrieves information about pending requests. 282 283 @type requested: set 284 @param requested: Requested information, see C{query.LQ_*} 285 286 """ 287 # No need to sort here, that's being done by the lock manager and query 288 # library. There are no priorities for requests, hence all show up as 289 # one item under "pending". 290 result = [] 291 292 if self._pending_fn: 293 owner_name = self._owner.getName() 294 295 for client in self._pending_fn(): 296 req = client.GetCurrentRequest() 297 if req: 298 if req.nicename is None: 299 name = "%s%s" % (req.host, req.path) 300 else: 301 name = req.nicename 302 result.append(("rpc/%s" % name, None, [owner_name], None)) 303 304 return result
305
306 307 -def _ProcessCurlRequests(multi, requests):
308 """cURL request processor. 309 310 This generator yields a tuple once for every completed request, successful or 311 not. The first value in the tuple is the handle, the second an error message 312 or C{None} for successful requests. 313 314 @type multi: C{pycurl.CurlMulti} 315 @param multi: cURL multi object 316 @type requests: sequence 317 @param requests: cURL request handles 318 319 """ 320 for curl in requests: 321 multi.add_handle(curl) 322 323 while True: 324 (ret, active) = multi.perform() 325 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM) 326 327 if ret == pycurl.E_CALL_MULTI_PERFORM: 328 # cURL wants to be called again 329 continue 330 331 while True: 332 (remaining_messages, successful, failed) = multi.info_read() 333 334 for curl in successful: 335 multi.remove_handle(curl) 336 yield (curl, None) 337 338 for curl, errnum, errmsg in failed: 339 multi.remove_handle(curl) 340 yield (curl, "Error %s: %s" % (errnum, errmsg)) 341 342 if remaining_messages == 0: 343 break 344 345 if active == 0: 346 # No active handles anymore 347 break 348 349 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP 350 # timeouts, which are only evaluated in multi.perform, aren't 351 # unnecessarily delayed. 352 multi.select(1.0)
353
354 355 -def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl, 356 _curl_multi=pycurl.CurlMulti, 357 _curl_process=_ProcessCurlRequests):
358 """Processes any number of HTTP client requests. 359 360 @type requests: list of L{HttpClientRequest} 361 @param requests: List of all requests 362 @param lock_monitor_cb: Callable for registering with lock monitor 363 364 """ 365 assert compat.all((req.error is None and 366 req.success is None and 367 req.resp_status_code is None and 368 req.resp_body is None) 369 for req in requests) 370 371 # Prepare all requests 372 curl_to_client = \ 373 dict((client.GetCurlHandle(), client) 374 for client in map(lambda req: _StartRequest(_curl(), req), requests)) 375 376 assert len(curl_to_client) == len(requests) 377 378 if lock_monitor_cb: 379 monitor = _PendingRequestMonitor(threading.currentThread(), 380 curl_to_client.values) 381 lock_monitor_cb(monitor) 382 else: 383 monitor = _NoOpRequestMonitor 384 385 # Process all requests and act based on the returned values 386 for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()): 387 monitor.acquire(shared=0) 388 try: 389 curl_to_client.pop(curl).Done(msg) 390 finally: 391 monitor.release() 392 393 assert not curl_to_client, "Not all requests were processed" 394 395 # Don't try to read information anymore as all requests have been processed 396 monitor.Disable() 397 398 assert compat.all(req.error is not None or 399 (req.success and 400 req.resp_status_code is not None and 401 req.resp_body is not None) 402 for req in requests)
403