1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """HTTP client module.
22
23 """
24
25 import logging
26 import pycurl
27 from cStringIO import StringIO
28
29 from ganeti import http
30 from ganeti import compat
31 from ganeti import netutils
35 - def __init__(self, host, port, method, path, headers=None, post_data=None,
36 read_timeout=None, curl_config_fn=None):
37 """Describes an HTTP request.
38
39 @type host: string
40 @param host: Hostname
41 @type port: int
42 @param port: Port
43 @type method: string
44 @param method: Method name
45 @type path: string
46 @param path: Request path
47 @type headers: list or None
48 @param headers: Additional headers to send, list of strings
49 @type post_data: string or None
50 @param post_data: Additional data to send
51 @type read_timeout: int
52 @param read_timeout: if passed, it will be used as the read
53 timeout while reading the response from the server
54 @type curl_config_fn: callable
55 @param curl_config_fn: Function to configure cURL object before request
56 (Note: if the function configures the connection in
57 a way where it wouldn't be efficient to reuse them,
58 a "identity" property should be defined, see
59 L{HttpClientRequest.identity})
60
61 """
62 assert path.startswith("/"), "Path must start with slash (/)"
63 assert curl_config_fn is None or callable(curl_config_fn)
64
65
66 self.host = host
67 self.port = port
68 self.method = method
69 self.path = path
70 self.read_timeout = read_timeout
71 self.curl_config_fn = curl_config_fn
72
73 if post_data is None:
74 self.post_data = ""
75 else:
76 self.post_data = post_data
77
78 if headers is None:
79 self.headers = []
80 elif isinstance(headers, dict):
81
82 self.headers = ["%s: %s" % (name, value)
83 for name, value in headers.items()]
84 else:
85 self.headers = headers
86
87
88 self.success = None
89 self.error = None
90
91
92 self.resp_status_code = None
93 self.resp_body = None
94
96 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97 "%s:%s" % (self.host, self.port),
98 self.method,
99 self.path]
100
101 return "<%s at %#x>" % (" ".join(status), id(self))
102
103 @property
105 """Returns the full URL for this requests.
106
107 """
108 if netutils.IPAddress.IsValid(self.host):
109 address = netutils.FormatAddress((self.host, self.port))
110 else:
111 address = "%s:%s" % (self.host, self.port)
112
113 return "https://%s%s" % (address, self.path)
114
115 @property
117 """Returns identifier for retrieving a pooled connection for this request.
118
119 This allows cURL client objects to be re-used and to cache information
120 (e.g. SSL session IDs or connections).
121
122 """
123 parts = [self.host, self.port]
124
125 if self.curl_config_fn:
126 try:
127 parts.append(self.curl_config_fn.identity)
128 except AttributeError:
129 pass
130
131 return "/".join(str(i) for i in parts)
132
136 """Initializes this class.
137
138 @type curl_config_fn: callable
139 @param curl_config_fn: Function to configure cURL object after
140 initialization
141
142 """
143 self._req = None
144
145 curl = self._CreateCurlHandle()
146 curl.setopt(pycurl.VERBOSE, False)
147 curl.setopt(pycurl.NOSIGNAL, True)
148 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149 curl.setopt(pycurl.PROXY, "")
150
151
152 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
153 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
154
155
156 if curl_config_fn:
157 curl_config_fn(curl)
158
159 self._curl = curl
160
161 @staticmethod
163 """Returns a new cURL object.
164
165 """
166 return pycurl.Curl()
167
169 """Returns the cURL object.
170
171 """
172 return self._curl
173
175 """Returns the current request.
176
177 @rtype: L{HttpClientRequest} or None
178
179 """
180 return self._req
181
183 """Starts a request on this client.
184
185 @type req: L{HttpClientRequest}
186 @param req: HTTP request
187
188 """
189 assert not self._req, "Another request is already started"
190
191 logging.debug("Starting request %r", req)
192
193 self._req = req
194 self._resp_buffer = StringIO()
195
196 url = req.url
197 method = req.method
198 post_data = req.post_data
199 headers = req.headers
200
201
202 assert isinstance(method, str)
203 assert isinstance(url, str)
204 assert isinstance(post_data, str)
205 assert compat.all(isinstance(i, str) for i in headers)
206
207
208 curl = self._curl
209 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
210 curl.setopt(pycurl.URL, url)
211 curl.setopt(pycurl.POSTFIELDS, post_data)
212 curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
213 curl.setopt(pycurl.HTTPHEADER, headers)
214
215 if req.read_timeout is None:
216 curl.setopt(pycurl.TIMEOUT, 0)
217 else:
218 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
219
220
221 if req.curl_config_fn:
222 req.curl_config_fn(curl)
223
224 - def Done(self, errmsg):
225 """Finishes a request.
226
227 @type errmsg: string or None
228 @param errmsg: Error message if request failed
229
230 """
231 req = self._req
232 assert req, "No request"
233
234 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
235
236 curl = self._curl
237
238 req.success = not bool(errmsg)
239 req.error = errmsg
240
241
242 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
243 req.resp_body = self._resp_buffer.getvalue()
244
245
246 self._req = None
247 self._resp_buffer = None
248
249
250 curl.setopt(pycurl.POSTFIELDS, "")
251 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
252
255 """Data structure for HTTP client pool.
256
257 """
259 """Initializes this class.
260
261 @type identity: string
262 @param identity: Client identifier for pool
263 @type client: L{_HttpClient}
264 @param client: HTTP client
265
266 """
267 self.identity = identity
268 self.client = client
269 self.lastused = 0
270
272 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
273 "id=%s" % self.identity,
274 "lastuse=%s" % self.lastused,
275 repr(self.client)]
276
277 return "<%s at %#x>" % (" ".join(status), id(self))
278
281 """A simple HTTP client pool.
282
283 Supports one pooled connection per identity (see
284 L{HttpClientRequest.identity}).
285
286 """
287
288 _MAX_GENERATIONS_DROP = 25
289
291 """Initializes this class.
292
293 @type curl_config_fn: callable
294 @param curl_config_fn: Function to configure cURL object after
295 initialization
296
297 """
298 self._curl_config_fn = curl_config_fn
299 self._generation = 0
300 self._pool = {}
301
302
303
304 self._logger = logging.getLogger(self.__class__.__name__)
305 self._logger.setLevel(logging.INFO)
306
307 @staticmethod
309 """Returns callable to create HTTP client.
310
311 """
312 return _HttpClient
313
314 - def _Get(self, identity):
315 """Gets an HTTP client from the pool.
316
317 @type identity: string
318 @param identity: Client identifier
319
320 """
321 try:
322 pclient = self._pool.pop(identity)
323 except KeyError:
324
325 client = self._GetHttpClientCreator()(self._curl_config_fn)
326 pclient = _PooledHttpClient(identity, client)
327 self._logger.debug("Created new client %s", pclient)
328 else:
329 self._logger.debug("Reusing client %s", pclient)
330
331 assert pclient.identity == identity
332
333 return pclient
334
336 """Starts a request.
337
338 @type req: L{HttpClientRequest}
339 @param req: HTTP request
340
341 """
342 pclient = self._Get(req.identity)
343
344 assert req.identity not in self._pool
345
346 pclient.client.StartRequest(req)
347 pclient.lastused = self._generation
348
349 return pclient
350
352 """Returns HTTP clients to the pool.
353
354 """
355 for pc in pclients:
356 self._logger.debug("Returning client %s to pool", pc)
357 assert pc.identity not in self._pool
358 assert pc not in self._pool.values()
359 self._pool[pc.identity] = pc
360
361
362 for pc in self._pool.values():
363 if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
364 self._logger.debug("Removing client %s which hasn't been used"
365 " for %s generations",
366 pc, self._MAX_GENERATIONS_DROP)
367 self._pool.pop(pc.identity, None)
368
369 assert compat.all(pc.lastused >= (self._generation -
370 self._MAX_GENERATIONS_DROP)
371 for pc in self._pool.values())
372
373 @staticmethod
375 """Creates new cURL multi handle.
376
377 """
378 return pycurl.CurlMulti()
379
381 """Processes any number of HTTP client requests using pooled objects.
382
383 @type requests: list of L{HttpClientRequest}
384 @param requests: List of all requests
385
386 """
387 multi = self._CreateCurlMultiHandle()
388
389
390 self._generation += 1
391
392 assert compat.all((req.error is None and
393 req.success is None and
394 req.resp_status_code is None and
395 req.resp_body is None)
396 for req in requests)
397
398 curl_to_pclient = {}
399 for req in requests:
400 pclient = self._StartRequest(req)
401 curl = pclient.client.GetCurlHandle()
402 curl_to_pclient[curl] = pclient
403 multi.add_handle(curl)
404 assert pclient.client.GetCurrentRequest() == req
405 assert pclient.lastused >= 0
406
407 assert len(curl_to_pclient) == len(requests)
408
409 done_count = 0
410 while True:
411 (ret, _) = multi.perform()
412 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
413
414 if ret == pycurl.E_CALL_MULTI_PERFORM:
415
416 continue
417
418 while True:
419 (remaining_messages, successful, failed) = multi.info_read()
420
421 for curl in successful:
422 multi.remove_handle(curl)
423 done_count += 1
424 pclient = curl_to_pclient[curl]
425 req = pclient.client.GetCurrentRequest()
426 pclient.client.Done(None)
427 assert req.success
428 assert not pclient.client.GetCurrentRequest()
429
430 for curl, errnum, errmsg in failed:
431 multi.remove_handle(curl)
432 done_count += 1
433 pclient = curl_to_pclient[curl]
434 req = pclient.client.GetCurrentRequest()
435 pclient.client.Done("Error %s: %s" % (errnum, errmsg))
436 assert req.error
437 assert not pclient.client.GetCurrentRequest()
438
439 if remaining_messages == 0:
440 break
441
442 assert done_count <= len(requests)
443
444 if done_count == len(requests):
445 break
446
447
448
449
450 multi.select(1.0)
451
452 assert compat.all(pclient.client.GetCurrentRequest() is None
453 for pclient in curl_to_pclient.values())
454
455
456 self._Return(curl_to_pclient.values())
457
458 assert done_count == len(requests)
459 assert compat.all(req.error is not None or
460 (req.success and
461 req.resp_status_code is not None and
462 req.resp_body is not None)
463 for req in requests)
464