1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """HTTP client module.
22
23 """
24
25 import logging
26 import pycurl
27 from cStringIO import StringIO
28
29 from ganeti import http
30 from ganeti import compat
31 from ganeti import netutils
35 - def __init__(self, host, port, method, path, headers=None, post_data=None,
36 read_timeout=None, curl_config_fn=None):
37 """Describes an HTTP request.
38
39 @type host: string
40 @param host: Hostname
41 @type port: int
42 @param port: Port
43 @type method: string
44 @param method: Method name
45 @type path: string
46 @param path: Request path
47 @type headers: list or None
48 @param headers: Additional headers to send, list of strings
49 @type post_data: string or None
50 @param post_data: Additional data to send
51 @type read_timeout: int
52 @param read_timeout: if passed, it will be used as the read
53 timeout while reading the response from the server
54 @type curl_config_fn: callable
55 @param curl_config_fn: Function to configure cURL object before request
56 (Note: if the function configures the connection in
57 a way where it wouldn't be efficient to reuse them,
58 a "identity" property should be defined, see
59 L{HttpClientRequest.identity})
60
61 """
62 assert path.startswith("/"), "Path must start with slash (/)"
63 assert curl_config_fn is None or callable(curl_config_fn)
64
65
66 self.host = host
67 self.port = port
68 self.method = method
69 self.path = path
70 self.read_timeout = read_timeout
71 self.curl_config_fn = curl_config_fn
72
73 if post_data is None:
74 self.post_data = ""
75 else:
76 self.post_data = post_data
77
78 if headers is None:
79 self.headers = []
80 elif isinstance(headers, dict):
81
82 self.headers = ["%s: %s" % (name, value)
83 for name, value in headers.items()]
84 else:
85 self.headers = headers
86
87
88 self.success = None
89 self.error = None
90
91
92 self.resp_status_code = None
93 self.resp_body = None
94
96 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97 "%s:%s" % (self.host, self.port),
98 self.method,
99 self.path]
100
101 return "<%s at %#x>" % (" ".join(status), id(self))
102
103 @property
105 """Returns the full URL for this requests.
106
107 """
108 if netutils.IPAddress.IsValid(self.host):
109 address = netutils.FormatAddress((self.host, self.port))
110 else:
111 address = "%s:%s" % (self.host, self.port)
112
113 return "https://%s%s" % (address, self.path)
114
115 @property
117 """Returns identifier for retrieving a pooled connection for this request.
118
119 This allows cURL client objects to be re-used and to cache information
120 (e.g. SSL session IDs or connections).
121
122 """
123 parts = [self.host, self.port]
124
125 if self.curl_config_fn:
126 try:
127 parts.append(self.curl_config_fn.identity)
128 except AttributeError:
129 pass
130
131 return "/".join(str(i) for i in parts)
132
136 """Initializes this class.
137
138 @type curl_config_fn: callable
139 @param curl_config_fn: Function to configure cURL object after
140 initialization
141
142 """
143 self._req = None
144
145 curl = self._CreateCurlHandle()
146 curl.setopt(pycurl.VERBOSE, False)
147 curl.setopt(pycurl.NOSIGNAL, True)
148 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149 curl.setopt(pycurl.PROXY, "")
150
151
152 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
153 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
154
155
156 if curl_config_fn:
157 curl_config_fn(curl)
158
159 self._curl = curl
160
161 @staticmethod
163 """Returns a new cURL object.
164
165 """
166 return pycurl.Curl()
167
169 """Returns the cURL object.
170
171 """
172 return self._curl
173
175 """Returns the current request.
176
177 @rtype: L{HttpClientRequest} or None
178
179 """
180 return self._req
181
183 """Starts a request on this client.
184
185 @type req: L{HttpClientRequest}
186 @param req: HTTP request
187
188 """
189 assert not self._req, "Another request is already started"
190
191 self._req = req
192 self._resp_buffer = StringIO()
193
194 url = req.url
195 method = req.method
196 post_data = req.post_data
197 headers = req.headers
198
199
200 assert isinstance(method, str)
201 assert isinstance(url, str)
202 assert isinstance(post_data, str)
203 assert compat.all(isinstance(i, str) for i in headers)
204
205
206 curl = self._curl
207 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
208 curl.setopt(pycurl.URL, url)
209 curl.setopt(pycurl.POSTFIELDS, post_data)
210 curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
211 curl.setopt(pycurl.HTTPHEADER, headers)
212
213 if req.read_timeout is None:
214 curl.setopt(pycurl.TIMEOUT, 0)
215 else:
216 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
217
218
219 if req.curl_config_fn:
220 req.curl_config_fn(curl)
221
222 - def Done(self, errmsg):
223 """Finishes a request.
224
225 @type errmsg: string or None
226 @param errmsg: Error message if request failed
227
228 """
229 req = self._req
230 assert req, "No request"
231
232 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
233
234 curl = self._curl
235
236 req.success = not bool(errmsg)
237 req.error = errmsg
238
239
240 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
241 req.resp_body = self._resp_buffer.getvalue()
242
243
244 self._req = None
245 self._resp_buffer = None
246
247
248 curl.setopt(pycurl.POSTFIELDS, "")
249 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
250
253 """Data structure for HTTP client pool.
254
255 """
257 """Initializes this class.
258
259 @type identity: string
260 @param identity: Client identifier for pool
261 @type client: L{_HttpClient}
262 @param client: HTTP client
263
264 """
265 self.identity = identity
266 self.client = client
267 self.lastused = 0
268
270 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
271 "id=%s" % self.identity,
272 "lastuse=%s" % self.lastused,
273 repr(self.client)]
274
275 return "<%s at %#x>" % (" ".join(status), id(self))
276
279 """A simple HTTP client pool.
280
281 Supports one pooled connection per identity (see
282 L{HttpClientRequest.identity}).
283
284 """
285
286 _MAX_GENERATIONS_DROP = 25
287
289 """Initializes this class.
290
291 @type curl_config_fn: callable
292 @param curl_config_fn: Function to configure cURL object after
293 initialization
294
295 """
296 self._curl_config_fn = curl_config_fn
297 self._generation = 0
298 self._pool = {}
299
300 @staticmethod
302 """Returns callable to create HTTP client.
303
304 """
305 return _HttpClient
306
307 - def _Get(self, identity):
308 """Gets an HTTP client from the pool.
309
310 @type identity: string
311 @param identity: Client identifier
312
313 """
314 try:
315 pclient = self._pool.pop(identity)
316 except KeyError:
317
318 client = self._GetHttpClientCreator()(self._curl_config_fn)
319 pclient = _PooledHttpClient(identity, client)
320 logging.debug("Created new client %s", pclient)
321 else:
322 logging.debug("Reusing client %s", pclient)
323
324 assert pclient.identity == identity
325
326 return pclient
327
329 """Starts a request.
330
331 @type req: L{HttpClientRequest}
332 @param req: HTTP request
333
334 """
335 logging.debug("Starting request %r", req)
336 pclient = self._Get(req.identity)
337
338 assert req.identity not in self._pool
339
340 pclient.client.StartRequest(req)
341 pclient.lastused = self._generation
342
343 return pclient
344
346 """Returns HTTP clients to the pool.
347
348 """
349 for pc in pclients:
350 logging.debug("Returning client %s to pool", pc)
351 assert pc.identity not in self._pool
352 assert pc not in self._pool.values()
353 self._pool[pc.identity] = pc
354
355
356 for pc in self._pool.values():
357 if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
358 logging.debug("Removing client %s which hasn't been used"
359 " for %s generations",
360 pc, self._MAX_GENERATIONS_DROP)
361 self._pool.pop(pc.identity, None)
362
363 assert compat.all(pc.lastused >= (self._generation -
364 self._MAX_GENERATIONS_DROP)
365 for pc in self._pool.values())
366
367 @staticmethod
369 """Creates new cURL multi handle.
370
371 """
372 return pycurl.CurlMulti()
373
375 """Processes any number of HTTP client requests using pooled objects.
376
377 @type requests: list of L{HttpClientRequest}
378 @param requests: List of all requests
379
380 """
381 multi = self._CreateCurlMultiHandle()
382
383
384 self._generation += 1
385
386 assert compat.all((req.error is None and
387 req.success is None and
388 req.resp_status_code is None and
389 req.resp_body is None)
390 for req in requests)
391
392 curl_to_pclient = {}
393 for req in requests:
394 pclient = self._StartRequest(req)
395 curl = pclient.client.GetCurlHandle()
396 curl_to_pclient[curl] = pclient
397 multi.add_handle(curl)
398 assert pclient.client.GetCurrentRequest() == req
399 assert pclient.lastused >= 0
400
401 assert len(curl_to_pclient) == len(requests)
402
403 done_count = 0
404 while True:
405 (ret, _) = multi.perform()
406 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
407
408 if ret == pycurl.E_CALL_MULTI_PERFORM:
409
410 continue
411
412 while True:
413 (remaining_messages, successful, failed) = multi.info_read()
414
415 for curl in successful:
416 multi.remove_handle(curl)
417 done_count += 1
418 pclient = curl_to_pclient[curl]
419 req = pclient.client.GetCurrentRequest()
420 pclient.client.Done(None)
421 assert req.success
422 assert not pclient.client.GetCurrentRequest()
423
424 for curl, errnum, errmsg in failed:
425 multi.remove_handle(curl)
426 done_count += 1
427 pclient = curl_to_pclient[curl]
428 req = pclient.client.GetCurrentRequest()
429 pclient.client.Done("Error %s: %s" % (errnum, errmsg))
430 assert req.error
431 assert not pclient.client.GetCurrentRequest()
432
433 if remaining_messages == 0:
434 break
435
436 assert done_count <= len(requests)
437
438 if done_count == len(requests):
439 break
440
441
442
443
444 multi.select(1.0)
445
446 assert compat.all(pclient.client.GetCurrentRequest() is None
447 for pclient in curl_to_pclient.values())
448
449
450 self._Return(curl_to_pclient.values())
451
452 assert done_count == len(requests)
453 assert compat.all(req.error is not None or
454 (req.success and
455 req.resp_status_code is not None and
456 req.resp_body is not None)
457 for req in requests)
458