1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 """HTTP client module.
31
32 """
33
34 import logging
35 import pycurl
36 import threading
37 from cStringIO import StringIO
38
39 from ganeti import http
40 from ganeti import compat
41 from ganeti import netutils
42 from ganeti import locking
46 - def __init__(self, host, port, method, path, headers=None, post_data=None,
47 read_timeout=None, curl_config_fn=None, nicename=None,
48 completion_cb=None):
49 """Describes an HTTP request.
50
51 @type host: string
52 @param host: Hostname
53 @type port: int
54 @param port: Port
55 @type method: string
56 @param method: Method name
57 @type path: string
58 @param path: Request path
59 @type headers: list or None
60 @param headers: Additional headers to send, list of strings
61 @type post_data: string or None
62 @param post_data: Additional data to send
63 @type read_timeout: int
64 @param read_timeout: if passed, it will be used as the read
65 timeout while reading the response from the server
66 @type curl_config_fn: callable
67 @param curl_config_fn: Function to configure cURL object before request
68 @type nicename: string
69 @param nicename: Name, presentable to a user, to describe this request (no
70 whitespace)
71 @type completion_cb: callable accepting this request object as a single
72 parameter
73 @param completion_cb: Callback for request completion
74
75 """
76 assert path.startswith("/"), "Path must start with slash (/)"
77 assert curl_config_fn is None or callable(curl_config_fn)
78 assert completion_cb is None or callable(completion_cb)
79
80
81 self.host = host
82 self.port = port
83 self.method = method
84 self.path = path
85 self.read_timeout = read_timeout
86 self.curl_config_fn = curl_config_fn
87 self.nicename = nicename
88 self.completion_cb = completion_cb
89
90 if post_data is None:
91 self.post_data = ""
92 else:
93 self.post_data = post_data
94
95 if headers is None:
96 self.headers = []
97 elif isinstance(headers, dict):
98
99 self.headers = ["%s: %s" % (name, value)
100 for name, value in headers.items()]
101 else:
102 self.headers = headers
103
104
105 self.success = None
106 self.error = None
107
108
109 self.resp_status_code = None
110 self.resp_body = None
111
113 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
114 "%s:%s" % (self.host, self.port),
115 self.method,
116 self.path]
117
118 return "<%s at %#x>" % (" ".join(status), id(self))
119
120 @property
122 """Returns the full URL for this requests.
123
124 """
125 if netutils.IPAddress.IsValid(self.host):
126 address = netutils.FormatAddress((self.host, self.port))
127 else:
128 address = "%s:%s" % (self.host, self.port)
129
130 return "https://%s%s" % (address, self.path)
131
134 """Starts a request on a cURL object.
135
136 @type curl: pycurl.Curl
137 @param curl: cURL object
138 @type req: L{HttpClientRequest}
139 @param req: HTTP request
140
141 """
142 logging.debug("Starting request %r", req)
143
144 url = req.url
145 method = req.method
146 post_data = req.post_data
147 headers = req.headers
148
149
150 assert isinstance(method, str)
151 assert isinstance(url, str)
152 assert isinstance(post_data, str)
153 assert compat.all(isinstance(i, str) for i in headers)
154
155
156 resp_buffer = StringIO()
157
158
159 curl.setopt(pycurl.VERBOSE, False)
160 curl.setopt(pycurl.NOSIGNAL, True)
161 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
162 curl.setopt(pycurl.PROXY, "")
163 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
164 curl.setopt(pycurl.URL, url)
165 curl.setopt(pycurl.POSTFIELDS, post_data)
166 curl.setopt(pycurl.HTTPHEADER, headers)
167
168 if req.read_timeout is None:
169 curl.setopt(pycurl.TIMEOUT, 0)
170 else:
171 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
172
173
174 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
175 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
176
177 curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
178
179
180 if req.curl_config_fn:
181 req.curl_config_fn(curl)
182
183 return _PendingRequest(curl, req, resp_buffer.getvalue)
184
187 - def __init__(self, curl, req, resp_buffer_read):
188 """Initializes this class.
189
190 @type curl: pycurl.Curl
191 @param curl: cURL object
192 @type req: L{HttpClientRequest}
193 @param req: HTTP request
194 @type resp_buffer_read: callable
195 @param resp_buffer_read: Function to read response body
196
197 """
198 assert req.success is None
199
200 self._curl = curl
201 self._req = req
202 self._resp_buffer_read = resp_buffer_read
203
205 """Returns the cURL object.
206
207 """
208 return self._curl
209
211 """Returns the current request.
212
213 """
214 return self._req
215
216 - def Done(self, errmsg):
217 """Finishes a request.
218
219 @type errmsg: string or None
220 @param errmsg: Error message if request failed
221
222 """
223 curl = self._curl
224 req = self._req
225
226 assert req.success is None, "Request has already been finalized"
227
228 try:
229
230 from_str = "from %s:%s " % (
231 req.getinfo(pycurl.LOCAL_IP),
232 req.getinfo(pycurl.LOCAL_PORT)
233 )
234 except AttributeError:
235 from_str = ""
236 logging.debug("Request %s%s finished, errmsg=%s", from_str, req, errmsg)
237
238 req.success = not bool(errmsg)
239 req.error = errmsg
240
241
242 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
243 req.resp_body = self._resp_buffer_read()
244
245
246 curl.setopt(pycurl.POSTFIELDS, "")
247 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
248
249 if req.completion_cb:
250 req.completion_cb(req)
251
254 """No-op request monitor.
255
256 """
257 @staticmethod
260
261 release = acquire
262 Disable = acquire
263
266 _LOCK = "_lock"
267
269 """Initializes this class.
270
271 """
272 self._owner = owner
273 self._pending_fn = pending_fn
274
275
276 self._lock = locking.SharedLock("PendingHttpRequests")
277 self.acquire = self._lock.acquire
278 self.release = self._lock.release
279
280 @locking.ssynchronized(_LOCK)
282 """Disable monitor.
283
284 """
285 self._pending_fn = None
286
287 @locking.ssynchronized(_LOCK, shared=1)
289 """Retrieves information about pending requests.
290
291 @type requested: set
292 @param requested: Requested information, see C{query.LQ_*}
293
294 """
295
296
297
298 result = []
299
300 if self._pending_fn:
301 owner_name = self._owner.getName()
302
303 for client in self._pending_fn():
304 req = client.GetCurrentRequest()
305 if req:
306 if req.nicename is None:
307 name = "%s%s" % (req.host, req.path)
308 else:
309 name = req.nicename
310 result.append(("rpc/%s" % name, None, [owner_name], None))
311
312 return result
313
316 """cURL request processor.
317
318 This generator yields a tuple once for every completed request, successful or
319 not. The first value in the tuple is the handle, the second an error message
320 or C{None} for successful requests.
321
322 @type multi: C{pycurl.CurlMulti}
323 @param multi: cURL multi object
324 @type requests: sequence
325 @param requests: cURL request handles
326
327 """
328 for curl in requests:
329 multi.add_handle(curl)
330
331 while True:
332 (ret, active) = multi.perform()
333 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
334
335 if ret == pycurl.E_CALL_MULTI_PERFORM:
336
337 continue
338
339 while True:
340 (remaining_messages, successful, failed) = multi.info_read()
341
342 for curl in successful:
343 multi.remove_handle(curl)
344 yield (curl, None)
345
346 for curl, errnum, errmsg in failed:
347 multi.remove_handle(curl)
348 yield (curl, "Error %s: %s" % (errnum, errmsg))
349
350 if remaining_messages == 0:
351 break
352
353 if active == 0:
354
355 break
356
357
358
359
360 multi.select(1.0)
361
366 """Processes any number of HTTP client requests.
367
368 @type requests: list of L{HttpClientRequest}
369 @param requests: List of all requests
370 @param lock_monitor_cb: Callable for registering with lock monitor
371
372 """
373 assert compat.all((req.error is None and
374 req.success is None and
375 req.resp_status_code is None and
376 req.resp_body is None)
377 for req in requests)
378
379
380 curl_to_client = \
381 dict((client.GetCurlHandle(), client)
382 for client in map(lambda req: _StartRequest(_curl(), req), requests))
383
384 assert len(curl_to_client) == len(requests)
385
386 if lock_monitor_cb:
387 monitor = _PendingRequestMonitor(threading.currentThread(),
388 curl_to_client.values)
389 lock_monitor_cb(monitor)
390 else:
391 monitor = _NoOpRequestMonitor
392
393
394 for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
395 monitor.acquire(shared=0)
396 try:
397 curl_to_client.pop(curl).Done(msg)
398 finally:
399 monitor.release()
400
401 assert not curl_to_client, "Not all requests were processed"
402
403
404 monitor.Disable()
405
406 assert compat.all(req.error is not None or
407 (req.success and
408 req.resp_status_code is not None and
409 req.resp_body is not None)
410 for req in requests)
411