1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 """HTTP client module.
31
32 """
33
34 import logging
35 import pycurl
36 import threading
37 from cStringIO import StringIO
38
39 from ganeti import http
40 from ganeti import compat
41 from ganeti import netutils
42 from ganeti import locking
46 - def __init__(self, host, port, method, path, headers=None, post_data=None,
47 read_timeout=None, curl_config_fn=None, nicename=None,
48 completion_cb=None):
49 """Describes an HTTP request.
50
51 @type host: string
52 @param host: Hostname
53 @type port: int
54 @param port: Port
55 @type method: string
56 @param method: Method name
57 @type path: string
58 @param path: Request path
59 @type headers: list or None
60 @param headers: Additional headers to send, list of strings
61 @type post_data: string or None
62 @param post_data: Additional data to send
63 @type read_timeout: int
64 @param read_timeout: if passed, it will be used as the read
65 timeout while reading the response from the server
66 @type curl_config_fn: callable
67 @param curl_config_fn: Function to configure cURL object before request
68 @type nicename: string
69 @param nicename: Name, presentable to a user, to describe this request (no
70 whitespace)
71 @type completion_cb: callable accepting this request object as a single
72 parameter
73 @param completion_cb: Callback for request completion
74
75 """
76 assert path.startswith("/"), "Path must start with slash (/)"
77 assert curl_config_fn is None or callable(curl_config_fn)
78 assert completion_cb is None or callable(completion_cb)
79
80
81 self.host = host
82 self.port = port
83 self.method = method
84 self.path = path
85 self.read_timeout = read_timeout
86 self.curl_config_fn = curl_config_fn
87 self.nicename = nicename
88 self.completion_cb = completion_cb
89
90 if post_data is None:
91 self.post_data = ""
92 else:
93 self.post_data = post_data
94
95 if headers is None:
96 self.headers = []
97 elif isinstance(headers, dict):
98
99 self.headers = ["%s: %s" % (name, value)
100 for name, value in headers.items()]
101 else:
102 self.headers = headers
103
104
105 self.success = None
106 self.error = None
107
108
109 self.resp_status_code = None
110 self.resp_body = None
111
113 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
114 "%s:%s" % (self.host, self.port),
115 self.method,
116 self.path]
117
118 return "<%s at %#x>" % (" ".join(status), id(self))
119
120 @property
122 """Returns the full URL for this requests.
123
124 """
125 if netutils.IPAddress.IsValid(self.host):
126 address = netutils.FormatAddress((self.host, self.port))
127 else:
128 address = "%s:%s" % (self.host, self.port)
129
130 return "https://%s%s" % (address, self.path)
131
134 """Starts a request on a cURL object.
135
136 @type curl: pycurl.Curl
137 @param curl: cURL object
138 @type req: L{HttpClientRequest}
139 @param req: HTTP request
140
141 """
142 logging.debug("Starting request %r", req)
143
144 url = req.url
145 method = req.method
146 post_data = req.post_data
147 headers = req.headers
148
149
150 assert isinstance(method, str)
151 assert isinstance(url, str)
152 assert isinstance(post_data, str)
153 assert compat.all(isinstance(i, str) for i in headers)
154
155
156 resp_buffer = StringIO()
157
158
159 curl.setopt(pycurl.VERBOSE, False)
160 curl.setopt(pycurl.NOSIGNAL, True)
161 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
162 curl.setopt(pycurl.PROXY, "")
163 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
164 curl.setopt(pycurl.URL, url)
165 curl.setopt(pycurl.POSTFIELDS, post_data)
166 curl.setopt(pycurl.HTTPHEADER, headers)
167
168 if req.read_timeout is None:
169 curl.setopt(pycurl.TIMEOUT, 0)
170 else:
171 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
172
173
174 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
175 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
176
177 curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
178
179
180 if req.curl_config_fn:
181 req.curl_config_fn(curl)
182
183 return _PendingRequest(curl, req, resp_buffer.getvalue)
184
187 - def __init__(self, curl, req, resp_buffer_read):
188 """Initializes this class.
189
190 @type curl: pycurl.Curl
191 @param curl: cURL object
192 @type req: L{HttpClientRequest}
193 @param req: HTTP request
194 @type resp_buffer_read: callable
195 @param resp_buffer_read: Function to read response body
196
197 """
198 assert req.success is None
199
200 self._curl = curl
201 self._req = req
202 self._resp_buffer_read = resp_buffer_read
203
205 """Returns the cURL object.
206
207 """
208 return self._curl
209
211 """Returns the current request.
212
213 """
214 return self._req
215
216 - def Done(self, errmsg):
217 """Finishes a request.
218
219 @type errmsg: string or None
220 @param errmsg: Error message if request failed
221
222 """
223 curl = self._curl
224 req = self._req
225
226 assert req.success is None, "Request has already been finalized"
227
228 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
229
230 req.success = not bool(errmsg)
231 req.error = errmsg
232
233
234 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
235 req.resp_body = self._resp_buffer_read()
236
237
238 curl.setopt(pycurl.POSTFIELDS, "")
239 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
240
241 if req.completion_cb:
242 req.completion_cb(req)
243
246 """No-op request monitor.
247
248 """
249 @staticmethod
252
253 release = acquire
254 Disable = acquire
255
258 _LOCK = "_lock"
259
261 """Initializes this class.
262
263 """
264 self._owner = owner
265 self._pending_fn = pending_fn
266
267
268 self._lock = locking.SharedLock("PendingHttpRequests")
269 self.acquire = self._lock.acquire
270 self.release = self._lock.release
271
272 @locking.ssynchronized(_LOCK)
274 """Disable monitor.
275
276 """
277 self._pending_fn = None
278
279 @locking.ssynchronized(_LOCK, shared=1)
281 """Retrieves information about pending requests.
282
283 @type requested: set
284 @param requested: Requested information, see C{query.LQ_*}
285
286 """
287
288
289
290 result = []
291
292 if self._pending_fn:
293 owner_name = self._owner.getName()
294
295 for client in self._pending_fn():
296 req = client.GetCurrentRequest()
297 if req:
298 if req.nicename is None:
299 name = "%s%s" % (req.host, req.path)
300 else:
301 name = req.nicename
302 result.append(("rpc/%s" % name, None, [owner_name], None))
303
304 return result
305
308 """cURL request processor.
309
310 This generator yields a tuple once for every completed request, successful or
311 not. The first value in the tuple is the handle, the second an error message
312 or C{None} for successful requests.
313
314 @type multi: C{pycurl.CurlMulti}
315 @param multi: cURL multi object
316 @type requests: sequence
317 @param requests: cURL request handles
318
319 """
320 for curl in requests:
321 multi.add_handle(curl)
322
323 while True:
324 (ret, active) = multi.perform()
325 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
326
327 if ret == pycurl.E_CALL_MULTI_PERFORM:
328
329 continue
330
331 while True:
332 (remaining_messages, successful, failed) = multi.info_read()
333
334 for curl in successful:
335 multi.remove_handle(curl)
336 yield (curl, None)
337
338 for curl, errnum, errmsg in failed:
339 multi.remove_handle(curl)
340 yield (curl, "Error %s: %s" % (errnum, errmsg))
341
342 if remaining_messages == 0:
343 break
344
345 if active == 0:
346
347 break
348
349
350
351
352 multi.select(1.0)
353
358 """Processes any number of HTTP client requests.
359
360 @type requests: list of L{HttpClientRequest}
361 @param requests: List of all requests
362 @param lock_monitor_cb: Callable for registering with lock monitor
363
364 """
365 assert compat.all((req.error is None and
366 req.success is None and
367 req.resp_status_code is None and
368 req.resp_body is None)
369 for req in requests)
370
371
372 curl_to_client = \
373 dict((client.GetCurlHandle(), client)
374 for client in map(lambda req: _StartRequest(_curl(), req), requests))
375
376 assert len(curl_to_client) == len(requests)
377
378 if lock_monitor_cb:
379 monitor = _PendingRequestMonitor(threading.currentThread(),
380 curl_to_client.values)
381 lock_monitor_cb(monitor)
382 else:
383 monitor = _NoOpRequestMonitor
384
385
386 for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
387 monitor.acquire(shared=0)
388 try:
389 curl_to_client.pop(curl).Done(msg)
390 finally:
391 monitor.release()
392
393 assert not curl_to_client, "Not all requests were processed"
394
395
396 monitor.Disable()
397
398 assert compat.all(req.error is not None or
399 (req.success and
400 req.resp_status_code is not None and
401 req.resp_body is not None)
402 for req in requests)
403