Package ganeti :: Package http :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.http.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2007, 2008, 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21  """HTTP client module. 
 22   
 23  """ 
 24   
 25  import logging 
 26  import pycurl 
 27  import threading 
 28  from cStringIO import StringIO 
 29   
 30  from ganeti import http 
 31  from ganeti import compat 
 32  from ganeti import netutils 
 33  from ganeti import locking 
34 35 36 -class HttpClientRequest(object):
37 - def __init__(self, host, port, method, path, headers=None, post_data=None, 38 read_timeout=None, curl_config_fn=None, nicename=None, 39 completion_cb=None):
40 """Describes an HTTP request. 41 42 @type host: string 43 @param host: Hostname 44 @type port: int 45 @param port: Port 46 @type method: string 47 @param method: Method name 48 @type path: string 49 @param path: Request path 50 @type headers: list or None 51 @param headers: Additional headers to send, list of strings 52 @type post_data: string or None 53 @param post_data: Additional data to send 54 @type read_timeout: int 55 @param read_timeout: if passed, it will be used as the read 56 timeout while reading the response from the server 57 @type curl_config_fn: callable 58 @param curl_config_fn: Function to configure cURL object before request 59 @type nicename: string 60 @param nicename: Name, presentable to a user, to describe this request (no 61 whitespace) 62 @type completion_cb: callable accepting this request object as a single 63 parameter 64 @param completion_cb: Callback for request completion 65 66 """ 67 assert path.startswith("/"), "Path must start with slash (/)" 68 assert curl_config_fn is None or callable(curl_config_fn) 69 assert completion_cb is None or callable(completion_cb) 70 71 # Request attributes 72 self.host = host 73 self.port = port 74 self.method = method 75 self.path = path 76 self.read_timeout = read_timeout 77 self.curl_config_fn = curl_config_fn 78 self.nicename = nicename 79 self.completion_cb = completion_cb 80 81 if post_data is None: 82 self.post_data = "" 83 else: 84 self.post_data = post_data 85 86 if headers is None: 87 self.headers = [] 88 elif isinstance(headers, dict): 89 # Support for old interface 90 self.headers = ["%s: %s" % (name, value) 91 for name, value in headers.items()] 92 else: 93 self.headers = headers 94 95 # Response status 96 self.success = None 97 self.error = None 98 99 # Response attributes 100 self.resp_status_code = None 101 self.resp_body = None
102
103 - def __repr__(self):
104 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), 105 "%s:%s" % (self.host, self.port), 106 self.method, 107 self.path] 108 109 return "<%s at %#x>" % (" ".join(status), id(self))
110 111 @property
112 - def url(self):
113 """Returns the full URL for this requests. 114 115 """ 116 if netutils.IPAddress.IsValid(self.host): 117 address = netutils.FormatAddress((self.host, self.port)) 118 else: 119 address = "%s:%s" % (self.host, self.port) 120 # TODO: Support for non-SSL requests 121 return "https://%s%s" % (address, self.path)
122
123 124 -def _StartRequest(curl, req):
125 """Starts a request on a cURL object. 126 127 @type curl: pycurl.Curl 128 @param curl: cURL object 129 @type req: L{HttpClientRequest} 130 @param req: HTTP request 131 132 """ 133 logging.debug("Starting request %r", req) 134 135 url = req.url 136 method = req.method 137 post_data = req.post_data 138 headers = req.headers 139 140 # PycURL requires strings to be non-unicode 141 assert isinstance(method, str) 142 assert isinstance(url, str) 143 assert isinstance(post_data, str) 144 assert compat.all(isinstance(i, str) for i in headers) 145 146 # Buffer for response 147 resp_buffer = StringIO() 148 149 # Configure client for request 150 curl.setopt(pycurl.VERBOSE, False) 151 curl.setopt(pycurl.NOSIGNAL, True) 152 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION) 153 curl.setopt(pycurl.PROXY, "") 154 curl.setopt(pycurl.CUSTOMREQUEST, str(method)) 155 curl.setopt(pycurl.URL, url) 156 curl.setopt(pycurl.POSTFIELDS, post_data) 157 curl.setopt(pycurl.HTTPHEADER, headers) 158 159 if req.read_timeout is None: 160 curl.setopt(pycurl.TIMEOUT, 0) 161 else: 162 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout)) 163 164 # Disable SSL session ID caching (pycurl >= 7.16.0) 165 if hasattr(pycurl, "SSL_SESSIONID_CACHE"): 166 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False) 167 168 curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write) 169 170 # Pass cURL object to external config function 171 if req.curl_config_fn: 172 req.curl_config_fn(curl) 173 174 return _PendingRequest(curl, req, resp_buffer.getvalue)
175
176 177 -class _PendingRequest:
178 - def __init__(self, curl, req, resp_buffer_read):
179 """Initializes this class. 180 181 @type curl: pycurl.Curl 182 @param curl: cURL object 183 @type req: L{HttpClientRequest} 184 @param req: HTTP request 185 @type resp_buffer_read: callable 186 @param resp_buffer_read: Function to read response body 187 188 """ 189 assert req.success is None 190 191 self._curl = curl 192 self._req = req 193 self._resp_buffer_read = resp_buffer_read
194
195 - def GetCurlHandle(self):
196 """Returns the cURL object. 197 198 """ 199 return self._curl
200
201 - def GetCurrentRequest(self):
202 """Returns the current request. 203 204 """ 205 return self._req
206
207 - def Done(self, errmsg):
208 """Finishes a request. 209 210 @type errmsg: string or None 211 @param errmsg: Error message if request failed 212 213 """ 214 curl = self._curl 215 req = self._req 216 217 assert req.success is None, "Request has already been finalized" 218 219 logging.debug("Request %s finished, errmsg=%s", req, errmsg) 220 221 req.success = not bool(errmsg) 222 req.error = errmsg 223 224 # Get HTTP response code 225 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE) 226 req.resp_body = self._resp_buffer_read() 227 228 # Ensure no potentially large variables are referenced 229 curl.setopt(pycurl.POSTFIELDS, "") 230 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None) 231 232 if req.completion_cb: 233 req.completion_cb(req)
234
235 236 -class _NoOpRequestMonitor: # pylint: disable=W0232
237 """No-op request monitor. 238 239 """ 240 @staticmethod
241 - def acquire(*args, **kwargs):
242 pass
243 244 release = acquire 245 Disable = acquire 246
247 248 -class _PendingRequestMonitor:
249 _LOCK = "_lock" 250
251 - def __init__(self, owner, pending_fn):
252 """Initializes this class. 253 254 """ 255 self._owner = owner 256 self._pending_fn = pending_fn 257 258 # The lock monitor runs in another thread, hence locking is necessary 259 self._lock = locking.SharedLock("PendingHttpRequests") 260 self.acquire = self._lock.acquire 261 self.release = self._lock.release
262 263 @locking.ssynchronized(_LOCK)
264 - def Disable(self):
265 """Disable monitor. 266 267 """ 268 self._pending_fn = None
269 270 @locking.ssynchronized(_LOCK, shared=1)
271 - def GetLockInfo(self, requested): # pylint: disable=W0613
272 """Retrieves information about pending requests. 273 274 @type requested: set 275 @param requested: Requested information, see C{query.LQ_*} 276 277 """ 278 # No need to sort here, that's being done by the lock manager and query 279 # library. There are no priorities for requests, hence all show up as 280 # one item under "pending". 281 result = [] 282 283 if self._pending_fn: 284 owner_name = self._owner.getName() 285 286 for client in self._pending_fn(): 287 req = client.GetCurrentRequest() 288 if req: 289 if req.nicename is None: 290 name = "%s%s" % (req.host, req.path) 291 else: 292 name = req.nicename 293 result.append(("rpc/%s" % name, None, [owner_name], None)) 294 295 return result
296
297 298 -def _ProcessCurlRequests(multi, requests):
299 """cURL request processor. 300 301 This generator yields a tuple once for every completed request, successful or 302 not. The first value in the tuple is the handle, the second an error message 303 or C{None} for successful requests. 304 305 @type multi: C{pycurl.CurlMulti} 306 @param multi: cURL multi object 307 @type requests: sequence 308 @param requests: cURL request handles 309 310 """ 311 for curl in requests: 312 multi.add_handle(curl) 313 314 while True: 315 (ret, active) = multi.perform() 316 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM) 317 318 if ret == pycurl.E_CALL_MULTI_PERFORM: 319 # cURL wants to be called again 320 continue 321 322 while True: 323 (remaining_messages, successful, failed) = multi.info_read() 324 325 for curl in successful: 326 multi.remove_handle(curl) 327 yield (curl, None) 328 329 for curl, errnum, errmsg in failed: 330 multi.remove_handle(curl) 331 yield (curl, "Error %s: %s" % (errnum, errmsg)) 332 333 if remaining_messages == 0: 334 break 335 336 if active == 0: 337 # No active handles anymore 338 break 339 340 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP 341 # timeouts, which are only evaluated in multi.perform, aren't 342 # unnecessarily delayed. 343 multi.select(1.0)
344
345 346 -def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl, 347 _curl_multi=pycurl.CurlMulti, 348 _curl_process=_ProcessCurlRequests):
349 """Processes any number of HTTP client requests. 350 351 @type requests: list of L{HttpClientRequest} 352 @param requests: List of all requests 353 @param lock_monitor_cb: Callable for registering with lock monitor 354 355 """ 356 assert compat.all((req.error is None and 357 req.success is None and 358 req.resp_status_code is None and 359 req.resp_body is None) 360 for req in requests) 361 362 # Prepare all requests 363 curl_to_client = \ 364 dict((client.GetCurlHandle(), client) 365 for client in map(lambda req: _StartRequest(_curl(), req), requests)) 366 367 assert len(curl_to_client) == len(requests) 368 369 if lock_monitor_cb: 370 monitor = _PendingRequestMonitor(threading.currentThread(), 371 curl_to_client.values) 372 lock_monitor_cb(monitor) 373 else: 374 monitor = _NoOpRequestMonitor 375 376 # Process all requests and act based on the returned values 377 for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()): 378 monitor.acquire(shared=0) 379 try: 380 curl_to_client.pop(curl).Done(msg) 381 finally: 382 monitor.release() 383 384 assert not curl_to_client, "Not all requests were processed" 385 386 # Don't try to read information anymore as all requests have been processed 387 monitor.Disable() 388 389 assert compat.all(req.error is not None or 390 (req.success and 391 req.resp_status_code is not None and 392 req.resp_body is not None) 393 for req in requests)
394