1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """HTTP client module.
22
23 """
24
25 import logging
26 import pycurl
27 import threading
28 from cStringIO import StringIO
29
30 from ganeti import http
31 from ganeti import compat
32 from ganeti import netutils
33 from ganeti import locking
37 - def __init__(self, host, port, method, path, headers=None, post_data=None,
38 read_timeout=None, curl_config_fn=None, nicename=None,
39 completion_cb=None):
40 """Describes an HTTP request.
41
42 @type host: string
43 @param host: Hostname
44 @type port: int
45 @param port: Port
46 @type method: string
47 @param method: Method name
48 @type path: string
49 @param path: Request path
50 @type headers: list or None
51 @param headers: Additional headers to send, list of strings
52 @type post_data: string or None
53 @param post_data: Additional data to send
54 @type read_timeout: int
55 @param read_timeout: if passed, it will be used as the read
56 timeout while reading the response from the server
57 @type curl_config_fn: callable
58 @param curl_config_fn: Function to configure cURL object before request
59 @type nicename: string
60 @param nicename: Name, presentable to a user, to describe this request (no
61 whitespace)
62 @type completion_cb: callable accepting this request object as a single
63 parameter
64 @param completion_cb: Callback for request completion
65
66 """
67 assert path.startswith("/"), "Path must start with slash (/)"
68 assert curl_config_fn is None or callable(curl_config_fn)
69 assert completion_cb is None or callable(completion_cb)
70
71
72 self.host = host
73 self.port = port
74 self.method = method
75 self.path = path
76 self.read_timeout = read_timeout
77 self.curl_config_fn = curl_config_fn
78 self.nicename = nicename
79 self.completion_cb = completion_cb
80
81 if post_data is None:
82 self.post_data = ""
83 else:
84 self.post_data = post_data
85
86 if headers is None:
87 self.headers = []
88 elif isinstance(headers, dict):
89
90 self.headers = ["%s: %s" % (name, value)
91 for name, value in headers.items()]
92 else:
93 self.headers = headers
94
95
96 self.success = None
97 self.error = None
98
99
100 self.resp_status_code = None
101 self.resp_body = None
102
104 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
105 "%s:%s" % (self.host, self.port),
106 self.method,
107 self.path]
108
109 return "<%s at %#x>" % (" ".join(status), id(self))
110
111 @property
113 """Returns the full URL for this requests.
114
115 """
116 if netutils.IPAddress.IsValid(self.host):
117 address = netutils.FormatAddress((self.host, self.port))
118 else:
119 address = "%s:%s" % (self.host, self.port)
120
121 return "https://%s%s" % (address, self.path)
122
125 """Starts a request on a cURL object.
126
127 @type curl: pycurl.Curl
128 @param curl: cURL object
129 @type req: L{HttpClientRequest}
130 @param req: HTTP request
131
132 """
133 logging.debug("Starting request %r", req)
134
135 url = req.url
136 method = req.method
137 post_data = req.post_data
138 headers = req.headers
139
140
141 assert isinstance(method, str)
142 assert isinstance(url, str)
143 assert isinstance(post_data, str)
144 assert compat.all(isinstance(i, str) for i in headers)
145
146
147 resp_buffer = StringIO()
148
149
150 curl.setopt(pycurl.VERBOSE, False)
151 curl.setopt(pycurl.NOSIGNAL, True)
152 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
153 curl.setopt(pycurl.PROXY, "")
154 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
155 curl.setopt(pycurl.URL, url)
156 curl.setopt(pycurl.POSTFIELDS, post_data)
157 curl.setopt(pycurl.HTTPHEADER, headers)
158
159 if req.read_timeout is None:
160 curl.setopt(pycurl.TIMEOUT, 0)
161 else:
162 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
163
164
165 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
166 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
167
168 curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
169
170
171 if req.curl_config_fn:
172 req.curl_config_fn(curl)
173
174 return _PendingRequest(curl, req, resp_buffer.getvalue)
175
178 - def __init__(self, curl, req, resp_buffer_read):
179 """Initializes this class.
180
181 @type curl: pycurl.Curl
182 @param curl: cURL object
183 @type req: L{HttpClientRequest}
184 @param req: HTTP request
185 @type resp_buffer_read: callable
186 @param resp_buffer_read: Function to read response body
187
188 """
189 assert req.success is None
190
191 self._curl = curl
192 self._req = req
193 self._resp_buffer_read = resp_buffer_read
194
196 """Returns the cURL object.
197
198 """
199 return self._curl
200
202 """Returns the current request.
203
204 """
205 return self._req
206
207 - def Done(self, errmsg):
208 """Finishes a request.
209
210 @type errmsg: string or None
211 @param errmsg: Error message if request failed
212
213 """
214 curl = self._curl
215 req = self._req
216
217 assert req.success is None, "Request has already been finalized"
218
219 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
220
221 req.success = not bool(errmsg)
222 req.error = errmsg
223
224
225 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
226 req.resp_body = self._resp_buffer_read()
227
228
229 curl.setopt(pycurl.POSTFIELDS, "")
230 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
231
232 if req.completion_cb:
233 req.completion_cb(req)
234
237 """No-op request monitor.
238
239 """
240 @staticmethod
243
244 release = acquire
245 Disable = acquire
246
249 _LOCK = "_lock"
250
252 """Initializes this class.
253
254 """
255 self._owner = owner
256 self._pending_fn = pending_fn
257
258
259 self._lock = locking.SharedLock("PendingHttpRequests")
260 self.acquire = self._lock.acquire
261 self.release = self._lock.release
262
263 @locking.ssynchronized(_LOCK)
265 """Disable monitor.
266
267 """
268 self._pending_fn = None
269
270 @locking.ssynchronized(_LOCK, shared=1)
272 """Retrieves information about pending requests.
273
274 @type requested: set
275 @param requested: Requested information, see C{query.LQ_*}
276
277 """
278
279
280
281 result = []
282
283 if self._pending_fn:
284 owner_name = self._owner.getName()
285
286 for client in self._pending_fn():
287 req = client.GetCurrentRequest()
288 if req:
289 if req.nicename is None:
290 name = "%s%s" % (req.host, req.path)
291 else:
292 name = req.nicename
293 result.append(("rpc/%s" % name, None, [owner_name], None))
294
295 return result
296
299 """cURL request processor.
300
301 This generator yields a tuple once for every completed request, successful or
302 not. The first value in the tuple is the handle, the second an error message
303 or C{None} for successful requests.
304
305 @type multi: C{pycurl.CurlMulti}
306 @param multi: cURL multi object
307 @type requests: sequence
308 @param requests: cURL request handles
309
310 """
311 for curl in requests:
312 multi.add_handle(curl)
313
314 while True:
315 (ret, active) = multi.perform()
316 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
317
318 if ret == pycurl.E_CALL_MULTI_PERFORM:
319
320 continue
321
322 while True:
323 (remaining_messages, successful, failed) = multi.info_read()
324
325 for curl in successful:
326 multi.remove_handle(curl)
327 yield (curl, None)
328
329 for curl, errnum, errmsg in failed:
330 multi.remove_handle(curl)
331 yield (curl, "Error %s: %s" % (errnum, errmsg))
332
333 if remaining_messages == 0:
334 break
335
336 if active == 0:
337
338 break
339
340
341
342
343 multi.select(1.0)
344
349 """Processes any number of HTTP client requests.
350
351 @type requests: list of L{HttpClientRequest}
352 @param requests: List of all requests
353 @param lock_monitor_cb: Callable for registering with lock monitor
354
355 """
356 assert compat.all((req.error is None and
357 req.success is None and
358 req.resp_status_code is None and
359 req.resp_body is None)
360 for req in requests)
361
362
363 curl_to_client = \
364 dict((client.GetCurlHandle(), client)
365 for client in map(lambda req: _StartRequest(_curl(), req), requests))
366
367 assert len(curl_to_client) == len(requests)
368
369 if lock_monitor_cb:
370 monitor = _PendingRequestMonitor(threading.currentThread(),
371 curl_to_client.values)
372 lock_monitor_cb(monitor)
373 else:
374 monitor = _NoOpRequestMonitor
375
376
377 for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
378 monitor.acquire(shared=0)
379 try:
380 curl_to_client.pop(curl).Done(msg)
381 finally:
382 monitor.release()
383
384 assert not curl_to_client, "Not all requests were processed"
385
386
387 monitor.Disable()
388
389 assert compat.all(req.error is not None or
390 (req.success and
391 req.resp_status_code is not None and
392 req.resp_body is not None)
393 for req in requests)
394