1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """HTTP client module.
22
23 """
24
25
26
27
28
29
30
31
32 import os
33 import select
34 import socket
35 import errno
36 import threading
37
38 from ganeti import workerpool
39 from ganeti import http
40 from ganeti import utils
41
42
43 HTTP_CLIENT_THREADS = 10
44
45
47 - def __init__(self, host, port, method, path, headers=None, post_data=None,
48 ssl_params=None, ssl_verify_peer=False):
49 """Describes an HTTP request.
50
51 @type host: string
52 @param host: Hostname
53 @type port: int
54 @param port: Port
55 @type method: string
56 @param method: Method name
57 @type path: string
58 @param path: Request path
59 @type headers: dict or None
60 @param headers: Additional headers to send
61 @type post_data: string or None
62 @param post_data: Additional data to send
63 @type ssl_params: HttpSslParams
64 @param ssl_params: SSL key and certificate
65 @type ssl_verify_peer: bool
66 @param ssl_verify_peer: Whether to compare our certificate with
67 server's certificate
68
69 """
70 if post_data is not None:
71 assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
72 "Only POST and GET requests support sending data"
73
74 assert path.startswith("/"), "Path must start with slash (/)"
75
76
77 self.host = host
78 self.port = port
79 self.ssl_params = ssl_params
80 self.ssl_verify_peer = ssl_verify_peer
81 self.method = method
82 self.path = path
83 self.headers = headers
84 self.post_data = post_data
85
86 self.success = None
87 self.error = None
88
89
90 self.response = None
91
92
93 self.resp_version = None
94 self.resp_status_code = None
95 self.resp_reason = None
96 self.resp_headers = None
97 self.resp_body = None
98
100 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
101 "%s:%s" % (self.host, self.port),
102 self.method,
103 self.path]
104
105 return "<%s at %#x>" % (" ".join(status), id(self))
106
107
110
111
113
114 START_LINE_LENGTH_MAX = 512
115 HEADER_LENGTH_MAX = 4096
116
118 """Parses the status line sent by the server.
119
120 """
121
122 assert start_line
123
124 try:
125 [version, status, reason] = start_line.split(None, 2)
126 except ValueError:
127 try:
128 [version, status] = start_line.split(None, 1)
129 reason = ""
130 except ValueError:
131 version = http.HTTP_0_9
132
133 if version:
134 version = version.upper()
135
136
137 try:
138 status = int(status)
139 if status < 100 or status > 999:
140 status = -1
141 except (TypeError, ValueError):
142 status = -1
143
144 if status == -1:
145 raise http.HttpError("Invalid status code (%r)" % start_line)
146
147 return http.HttpServerToClientStartLine(version, status, reason)
148
149
151
152 DEFAULT_HEADERS = {
153 http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
154
155 http.HTTP_CONNECTION: "close",
156 }
157
158
159
160
161 CONNECT_TIMEOUT = 5
162 WRITE_TIMEOUT = 10
163 READ_TIMEOUT = None
164 CLOSE_TIMEOUT = 1
165
167 """Initializes the HttpClientRequestExecutor class.
168
169 @type req: HttpClientRequest
170 @param req: Request object
171
172 """
173 http.HttpBase.__init__(self)
174 self.request = req
175
176 try:
177
178 self.sock = self._CreateSocket(req.ssl_params,
179 req.ssl_verify_peer)
180
181
182 self.sock.settimeout(None)
183
184
185 self.sock.setblocking(0)
186
187 response_msg_reader = None
188 response_msg = None
189 force_close = True
190
191 self._Connect()
192 try:
193 self._SendRequest()
194 (response_msg_reader, response_msg) = self._ReadResponse()
195
196
197 force_close = False
198 finally:
199
200 force_close = True
201 http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
202 self.WRITE_TIMEOUT, response_msg_reader,
203 force_close)
204
205 self.sock.close()
206 self.sock = None
207
208 req.response = response_msg
209
210 req.resp_version = req.response.start_line.version
211 req.resp_status_code = req.response.start_line.code
212 req.resp_reason = req.response.start_line.reason
213 req.resp_headers = req.response.headers
214 req.resp_body = req.response.body
215
216 req.success = True
217 req.error = None
218
219 except http.HttpError, err:
220 req.success = False
221 req.error = str(err)
222
224 """Non-blocking connect to host with timeout.
225
226 """
227 connected = False
228 while True:
229 try:
230 connect_error = self.sock.connect_ex((self.request.host,
231 self.request.port))
232 except socket.gaierror, err:
233 raise http.HttpError("Connection failed: %s" % str(err))
234
235 if connect_error == errno.EINTR:
236
237 pass
238
239 elif connect_error == 0:
240
241 connected = True
242 break
243
244 elif connect_error == errno.EINPROGRESS:
245
246 break
247
248 raise http.HttpError("Connection failed (%s: %s)" %
249 (connect_error, os.strerror(connect_error)))
250
251 if not connected:
252
253 event = utils.WaitForFdCondition(self.sock, select.POLLOUT,
254 self.CONNECT_TIMEOUT)
255 if event is None:
256 raise http.HttpError("Timeout while connecting to server")
257
258
259 connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
260 if connect_error != 0:
261 raise http.HttpError("Connection failed (%s: %s)" %
262 (connect_error, os.strerror(connect_error)))
263
264
265 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
266
267
268
269
270
271
272 if self.using_ssl:
273 self.sock.set_connect_state()
274 try:
275 http.Handshake(self.sock, self.WRITE_TIMEOUT)
276 except http.HttpSessionHandshakeUnexpectedEOF:
277 raise http.HttpError("Server closed connection during SSL handshake")
278
311
328
329
331 """Data class for pending requests.
332
333 """
335 self.request = request
336
337
338 self.done = threading.Event()
339
341 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
342 "req=%r" % self.request]
343
344 return "<%s at %#x>" % (" ".join(status), id(self))
345
346
348 """HTTP client worker class.
349
350 """
352 try:
353 HttpClientRequestExecutor(pend_req.request)
354 finally:
355 pend_req.done.set()
356
357
364
365
367 """Manages HTTP requests.
368
369 """
372
375
377 """Execute HTTP requests.
378
379 This function can be called from multiple threads at the same time.
380
381 @type requests: List of HttpClientRequest instances
382 @param requests: The requests to execute
383 @rtype: List of HttpClientRequest instances
384 @return: The list of requests passed in
385
386 """
387
388 pending = [_HttpClientPendingRequest(req) for req in requests]
389
390 try:
391
392 for pend_req in pending:
393 self._wpool.AddTask(pend_req)
394
395 finally:
396
397
398
399
400
401 for pend_req in pending:
402 pend_req.done.wait()
403
404
405 return requests
406
410