1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37 import warnings
38
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
44
45
46 KEY_METHOD = "method"
47 KEY_ARGS = "args"
48 KEY_SUCCESS = "success"
49 KEY_RESULT = "result"
50 KEY_VERSION = "version"
51
52 REQ_SUBMIT_JOB = "SubmitJob"
53 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55 REQ_CANCEL_JOB = "CancelJob"
56 REQ_ARCHIVE_JOB = "ArchiveJob"
57 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
58 REQ_QUERY = "Query"
59 REQ_QUERY_FIELDS = "QueryFields"
60 REQ_QUERY_JOBS = "QueryJobs"
61 REQ_QUERY_INSTANCES = "QueryInstances"
62 REQ_QUERY_NODES = "QueryNodes"
63 REQ_QUERY_GROUPS = "QueryGroups"
64 REQ_QUERY_EXPORTS = "QueryExports"
65 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67 REQ_QUERY_TAGS = "QueryTags"
68 REQ_QUERY_LOCKS = "QueryLocks"
69 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
70 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
71
72 DEF_CTMO = 10
73 DEF_RWTO = 60
74
75
76 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
80 """Denotes an error in the LUXI protocol."""
81
84 """Connection closed error."""
85
88 """Operation timeout error."""
89
92 """Error on request.
93
94 This signifies an error in the request format or request handling,
95 but not (e.g.) an error in starting up an instance.
96
97 Some common conditions that can trigger this exception:
98 - job submission failed because the job data was wrong
99 - query failed because required fields were missing
100
101 """
102
105 """The master cannot be reached.
106
107 This means that the master daemon is not running or the socket has
108 been removed.
109
110 """
111
114 """Permission denied while connecting to the master socket.
115
116 This means the user doesn't have the proper rights.
117
118 """
119
122 """Low-level transport class.
123
124 This is used on the client side.
125
126 This could be replace by any other class that provides the same
127 semantics to the Client. This means:
128 - can send messages and receive messages
129 - safe for multithreading
130
131 """
132
133 - def __init__(self, address, timeouts=None):
134 """Constructor for the Client class.
135
136 Arguments:
137 - address: a valid address the the used transport class
138 - timeout: a list of timeouts, to be used on connect and read/write
139
140 There are two timeouts used since we might want to wait for a long
141 time for a response, but the connect timeout should be lower.
142
143 If not passed, we use a default of 10 and respectively 60 seconds.
144
145 Note that on reading data, since the timeout applies to an
146 invidual receive, it might be that the total duration is longer
147 than timeout value passed (we make a hard limit at twice the read
148 timeout).
149
150 """
151 self.address = address
152 if timeouts is None:
153 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
154 else:
155 self._ctimeout, self._rwtimeout = timeouts
156
157 self.socket = None
158 self._buffer = ""
159 self._msgs = collections.deque()
160
161 try:
162 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
163
164
165 try:
166 utils.Retry(self._Connect, 1.0, self._ctimeout,
167 args=(self.socket, address, self._ctimeout))
168 except utils.RetryTimeout:
169 raise TimeoutError("Connect timed out")
170
171 self.socket.settimeout(self._rwtimeout)
172 except (socket.error, NoMasterError):
173 if self.socket is not None:
174 self.socket.close()
175 self.socket = None
176 raise
177
178 @staticmethod
180 sock.settimeout(timeout)
181 try:
182 sock.connect(address)
183 except socket.timeout, err:
184 raise TimeoutError("Connect timed out: %s" % str(err))
185 except socket.error, err:
186 error_code = err.args[0]
187 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
188 raise NoMasterError(address)
189 elif error_code in (errno.EPERM, errno.EACCES):
190 raise PermissionError(address)
191 elif error_code == errno.EAGAIN:
192
193 raise utils.RetryAgain()
194 raise
195
197 """Make sure we are connected.
198
199 """
200 if self.socket is None:
201 raise ProtocolError("Connection is closed")
202
203 - def Send(self, msg):
204 """Send a message.
205
206 This just sends a message and doesn't wait for the response.
207
208 """
209 if constants.LUXI_EOM in msg:
210 raise ProtocolError("Message terminator found in payload")
211
212 self._CheckSocket()
213 try:
214
215 self.socket.sendall(msg + constants.LUXI_EOM)
216 except socket.timeout, err:
217 raise TimeoutError("Sending timeout: %s" % str(err))
218
220 """Try to receive a message from the socket.
221
222 In case we already have messages queued, we just return from the
223 queue. Otherwise, we try to read data with a _rwtimeout network
224 timeout, and making sure we don't go over 2x_rwtimeout as a global
225 limit.
226
227 """
228 self._CheckSocket()
229 etime = time.time() + self._rwtimeout
230 while not self._msgs:
231 if time.time() > etime:
232 raise TimeoutError("Extended receive timeout")
233 while True:
234 try:
235 data = self.socket.recv(4096)
236 except socket.timeout, err:
237 raise TimeoutError("Receive timeout: %s" % str(err))
238 except socket.error, err:
239 if err.args and err.args[0] == errno.EAGAIN:
240 continue
241 raise
242 break
243 if not data:
244 raise ConnectionClosedError("Connection closed while reading")
245 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
246 self._buffer = new_msgs.pop()
247 self._msgs.extend(new_msgs)
248 return self._msgs.popleft()
249
250 - def Call(self, msg):
251 """Send a message and wait for the response.
252
253 This is just a wrapper over Send and Recv.
254
255 """
256 self.Send(msg)
257 return self.Recv()
258
260 """Close the socket"""
261 if self.socket is not None:
262 self.socket.close()
263 self.socket = None
264
267 """Parses a LUXI request message.
268
269 """
270 try:
271 request = serializer.LoadJson(msg)
272 except ValueError, err:
273 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
274
275 logging.debug("LUXI request: %s", request)
276
277 if not isinstance(request, dict):
278 logging.error("LUXI request not a dict: %r", msg)
279 raise ProtocolError("Invalid LUXI request (not a dict)")
280
281 method = request.get(KEY_METHOD, None)
282 args = request.get(KEY_ARGS, None)
283 version = request.get(KEY_VERSION, None)
284
285 if method is None or args is None:
286 logging.error("LUXI request missing method or arguments: %r", msg)
287 raise ProtocolError(("Invalid LUXI request (no method or arguments"
288 " in request): %r") % msg)
289
290 return (method, args, version)
291
294 """Parses a LUXI response message.
295
296 """
297
298 try:
299 data = serializer.LoadJson(msg)
300 except KeyboardInterrupt:
301 raise
302 except Exception, err:
303 raise ProtocolError("Error while deserializing response: %s" % str(err))
304
305
306 if not (isinstance(data, dict) and
307 KEY_SUCCESS in data and
308 KEY_RESULT in data):
309 raise ProtocolError("Invalid response from server: %r" % data)
310
311 return (data[KEY_SUCCESS], data[KEY_RESULT],
312 data.get(KEY_VERSION, None))
313
330
347
350 """Send a LUXI request via a transport and return the response.
351
352 """
353 assert callable(transport_cb)
354
355 request_msg = FormatRequest(method, args, version=version)
356
357
358 response_msg = transport_cb(request_msg)
359
360 (success, result, resp_version) = ParseResponse(response_msg)
361
362
363 if resp_version is not None and resp_version != version:
364 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
365 (version, resp_version))
366
367 if success:
368 return result
369
370 errors.MaybeRaise(result)
371 raise RequestError(result)
372
375 """High-level client implementation.
376
377 This uses a backing Transport-like class on top of which it
378 implements data serialization/deserialization.
379
380 """
382 """Constructor for the Client class.
383
384 Arguments:
385 - address: a valid address the the used transport class
386 - timeout: a list of timeouts, to be used on connect and read/write
387 - transport: a Transport-like class
388
389
390 If timeout is not passed, the default timeouts of the transport
391 class are used.
392
393 """
394 if address is None:
395 address = constants.MASTER_SOCKET
396 self.address = address
397 self.timeouts = timeouts
398 self.transport_class = transport
399 self.transport = None
400 self._InitTransport()
401
403 """(Re)initialize the transport if needed.
404
405 """
406 if self.transport is None:
407 self.transport = self.transport_class(self.address,
408 timeouts=self.timeouts)
409
411 """Close the transport, ignoring errors.
412
413 """
414 if self.transport is None:
415 return
416 try:
417 old_transp = self.transport
418 self.transport = None
419 old_transp.Close()
420 except Exception:
421 pass
422
431
433 """Close the underlying connection.
434
435 """
436 self._CloseTransport()
437
444
447
450
454
460
463
466
470
474 """Waits for changes on a job.
475
476 @param job_id: Job ID
477 @type fields: list
478 @param fields: List of field names to be observed
479 @type prev_job_info: None or list
480 @param prev_job_info: Previously received job information
481 @type prev_log_serial: None or int/long
482 @param prev_log_serial: Highest log serial number previously received
483 @type timeout: int/float
484 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
485 be capped to that value)
486
487 """
488 assert timeout >= 0, "Timeout can not be negative"
489 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
490 (job_id, fields, prev_job_info,
491 prev_log_serial,
492 min(WFJC_TIMEOUT, timeout)))
493
501
502 - def Query(self, what, fields, filter_):
503 """Query for resources/items.
504
505 @param what: One of L{constants.QR_VIA_LUXI}
506 @type fields: List of strings
507 @param fields: List of requested fields
508 @type filter_: None or list
509 @param filter_: Query filter
510 @rtype: L{objects.QueryResponse}
511
512 """
513 req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
514 result = self.CallMethod(REQ_QUERY, req.ToDict())
515 return objects.QueryResponse.FromDict(result)
516
529
532
535
536 - def QueryNodes(self, names, fields, use_locking):
538
541
544
547
550
553
558