1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """HTTP client module.
22
23 """
24
25 import logging
26 import pycurl
27 from cStringIO import StringIO
28
29 from ganeti import http
30 from ganeti import compat
34 - def __init__(self, host, port, method, path, headers=None, post_data=None,
35 read_timeout=None, curl_config_fn=None):
36 """Describes an HTTP request.
37
38 @type host: string
39 @param host: Hostname
40 @type port: int
41 @param port: Port
42 @type method: string
43 @param method: Method name
44 @type path: string
45 @param path: Request path
46 @type headers: list or None
47 @param headers: Additional headers to send, list of strings
48 @type post_data: string or None
49 @param post_data: Additional data to send
50 @type read_timeout: int
51 @param read_timeout: if passed, it will be used as the read
52 timeout while reading the response from the server
53 @type curl_config_fn: callable
54 @param curl_config_fn: Function to configure cURL object before request
55 (Note: if the function configures the connection in
56 a way where it wouldn't be efficient to reuse them,
57 a "identity" property should be defined, see
58 L{HttpClientRequest.identity})
59
60 """
61 assert path.startswith("/"), "Path must start with slash (/)"
62 assert curl_config_fn is None or callable(curl_config_fn)
63
64
65 self.host = host
66 self.port = port
67 self.method = method
68 self.path = path
69 self.read_timeout = read_timeout
70 self.curl_config_fn = curl_config_fn
71
72 if post_data is None:
73 self.post_data = ""
74 else:
75 self.post_data = post_data
76
77 if headers is None:
78 self.headers = []
79 elif isinstance(headers, dict):
80
81 self.headers = ["%s: %s" % (name, value)
82 for name, value in headers.items()]
83 else:
84 self.headers = headers
85
86
87 self.success = None
88 self.error = None
89
90
91 self.resp_status_code = None
92 self.resp_body = None
93
95 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
96 "%s:%s" % (self.host, self.port),
97 self.method,
98 self.path]
99
100 return "<%s at %#x>" % (" ".join(status), id(self))
101
102 @property
104 """Returns the full URL for this requests.
105
106 """
107
108 return "https://%s:%s%s" % (self.host, self.port, self.path)
109
110 @property
112 """Returns identifier for retrieving a pooled connection for this request.
113
114 This allows cURL client objects to be re-used and to cache information
115 (e.g. SSL session IDs or connections).
116
117 """
118 parts = [self.host, self.port]
119
120 if self.curl_config_fn:
121 try:
122 parts.append(self.curl_config_fn.identity)
123 except AttributeError:
124 pass
125
126 return "/".join(str(i) for i in parts)
127
131 """Initializes this class.
132
133 @type curl_config_fn: callable
134 @param curl_config_fn: Function to configure cURL object after
135 initialization
136
137 """
138 self._req = None
139
140 curl = self._CreateCurlHandle()
141 curl.setopt(pycurl.VERBOSE, False)
142 curl.setopt(pycurl.NOSIGNAL, True)
143 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
144 curl.setopt(pycurl.PROXY, "")
145
146
147 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
148 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
149
150
151 if curl_config_fn:
152 curl_config_fn(curl)
153
154 self._curl = curl
155
156 @staticmethod
158 """Returns a new cURL object.
159
160 """
161 return pycurl.Curl()
162
164 """Returns the cURL object.
165
166 """
167 return self._curl
168
170 """Returns the current request.
171
172 @rtype: L{HttpClientRequest} or None
173
174 """
175 return self._req
176
178 """Starts a request on this client.
179
180 @type req: L{HttpClientRequest}
181 @param req: HTTP request
182
183 """
184 assert not self._req, "Another request is already started"
185
186 self._req = req
187 self._resp_buffer = StringIO()
188
189 url = req.url
190 method = req.method
191 post_data = req.post_data
192 headers = req.headers
193
194
195 assert isinstance(method, str)
196 assert isinstance(url, str)
197 assert isinstance(post_data, str)
198 assert compat.all(isinstance(i, str) for i in headers)
199
200
201 curl = self._curl
202 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
203 curl.setopt(pycurl.URL, url)
204 curl.setopt(pycurl.POSTFIELDS, post_data)
205 curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
206 curl.setopt(pycurl.HTTPHEADER, headers)
207
208 if req.read_timeout is None:
209 curl.setopt(pycurl.TIMEOUT, 0)
210 else:
211 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
212
213
214 if req.curl_config_fn:
215 req.curl_config_fn(curl)
216
217 - def Done(self, errmsg):
218 """Finishes a request.
219
220 @type errmsg: string or None
221 @param errmsg: Error message if request failed
222
223 """
224 req = self._req
225 assert req, "No request"
226
227 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
228
229 curl = self._curl
230
231 req.success = not bool(errmsg)
232 req.error = errmsg
233
234
235 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
236 req.resp_body = self._resp_buffer.getvalue()
237
238
239 self._req = None
240 self._resp_buffer = None
241
242
243 curl.setopt(pycurl.POSTFIELDS, "")
244 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
245
248 """Data structure for HTTP client pool.
249
250 """
252 """Initializes this class.
253
254 @type identity: string
255 @param identity: Client identifier for pool
256 @type client: L{_HttpClient}
257 @param client: HTTP client
258
259 """
260 self.identity = identity
261 self.client = client
262 self.lastused = 0
263
265 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
266 "id=%s" % self.identity,
267 "lastuse=%s" % self.lastused,
268 repr(self.client)]
269
270 return "<%s at %#x>" % (" ".join(status), id(self))
271
274 """A simple HTTP client pool.
275
276 Supports one pooled connection per identity (see
277 L{HttpClientRequest.identity}).
278
279 """
280
281 _MAX_GENERATIONS_DROP = 25
282
284 """Initializes this class.
285
286 @type curl_config_fn: callable
287 @param curl_config_fn: Function to configure cURL object after
288 initialization
289
290 """
291 self._curl_config_fn = curl_config_fn
292 self._generation = 0
293 self._pool = {}
294
295 @staticmethod
297 """Returns callable to create HTTP client.
298
299 """
300 return _HttpClient
301
302 - def _Get(self, identity):
303 """Gets an HTTP client from the pool.
304
305 @type identity: string
306 @param identity: Client identifier
307
308 """
309 try:
310 pclient = self._pool.pop(identity)
311 except KeyError:
312
313 client = self._GetHttpClientCreator()(self._curl_config_fn)
314 pclient = _PooledHttpClient(identity, client)
315 logging.debug("Created new client %s", pclient)
316 else:
317 logging.debug("Reusing client %s", pclient)
318
319 assert pclient.identity == identity
320
321 return pclient
322
324 """Starts a request.
325
326 @type req: L{HttpClientRequest}
327 @param req: HTTP request
328
329 """
330 logging.debug("Starting request %r", req)
331 pclient = self._Get(req.identity)
332
333 assert req.identity not in self._pool
334
335 pclient.client.StartRequest(req)
336 pclient.lastused = self._generation
337
338 return pclient
339
341 """Returns HTTP clients to the pool.
342
343 """
344 for pc in pclients:
345 logging.debug("Returning client %s to pool", pc)
346 assert pc.identity not in self._pool
347 assert pc not in self._pool.values()
348 self._pool[pc.identity] = pc
349
350
351 for pc in self._pool.values():
352 if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
353 logging.debug("Removing client %s which hasn't been used"
354 " for %s generations",
355 pc, self._MAX_GENERATIONS_DROP)
356 self._pool.pop(pc.identity, None)
357
358 assert compat.all(pc.lastused >= (self._generation -
359 self._MAX_GENERATIONS_DROP)
360 for pc in self._pool.values())
361
362 @staticmethod
364 """Creates new cURL multi handle.
365
366 """
367 return pycurl.CurlMulti()
368
370 """Processes any number of HTTP client requests using pooled objects.
371
372 @type requests: list of L{HttpClientRequest}
373 @param requests: List of all requests
374
375 """
376 multi = self._CreateCurlMultiHandle()
377
378
379 self._generation += 1
380
381 assert compat.all((req.error is None and
382 req.success is None and
383 req.resp_status_code is None and
384 req.resp_body is None)
385 for req in requests)
386
387 curl_to_pclient = {}
388 for req in requests:
389 pclient = self._StartRequest(req)
390 curl = pclient.client.GetCurlHandle()
391 curl_to_pclient[curl] = pclient
392 multi.add_handle(curl)
393 assert pclient.client.GetCurrentRequest() == req
394 assert pclient.lastused >= 0
395
396 assert len(curl_to_pclient) == len(requests)
397
398 done_count = 0
399 while True:
400 (ret, _) = multi.perform()
401 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
402
403 if ret == pycurl.E_CALL_MULTI_PERFORM:
404
405 continue
406
407 while True:
408 (remaining_messages, successful, failed) = multi.info_read()
409
410 for curl in successful:
411 multi.remove_handle(curl)
412 done_count += 1
413 pclient = curl_to_pclient[curl]
414 req = pclient.client.GetCurrentRequest()
415 pclient.client.Done(None)
416 assert req.success
417 assert not pclient.client.GetCurrentRequest()
418
419 for curl, errnum, errmsg in failed:
420 multi.remove_handle(curl)
421 done_count += 1
422 pclient = curl_to_pclient[curl]
423 req = pclient.client.GetCurrentRequest()
424 pclient.client.Done("Error %s: %s" % (errnum, errmsg))
425 assert req.error
426 assert not pclient.client.GetCurrentRequest()
427
428 if remaining_messages == 0:
429 break
430
431 assert done_count <= len(requests)
432
433 if done_count == len(requests):
434 break
435
436
437
438
439 multi.select(1.0)
440
441 assert compat.all(pclient.client.GetCurrentRequest() is None
442 for pclient in curl_to_pclient.values())
443
444
445 self._Return(curl_to_pclient.values())
446
447 assert done_count == len(requests)
448 assert compat.all(req.error is not None or
449 (req.success and
450 req.resp_status_code is not None and
451 req.resp_body is not None)
452 for req in requests)
453