1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42
43
44 KEY_METHOD = "method"
45 KEY_ARGS = "args"
46 KEY_SUCCESS = "success"
47 KEY_RESULT = "result"
48 KEY_VERSION = "version"
49
50 REQ_SUBMIT_JOB = "SubmitJob"
51 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
52 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
53 REQ_CANCEL_JOB = "CancelJob"
54 REQ_ARCHIVE_JOB = "ArchiveJob"
55 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
56 REQ_QUERY_JOBS = "QueryJobs"
57 REQ_QUERY_INSTANCES = "QueryInstances"
58 REQ_QUERY_NODES = "QueryNodes"
59 REQ_QUERY_EXPORTS = "QueryExports"
60 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
61 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
62 REQ_QUERY_TAGS = "QueryTags"
63 REQ_QUERY_LOCKS = "QueryLocks"
64 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
65 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
66
67 DEF_CTMO = 10
68 DEF_RWTO = 60
69
70
71 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
75 """Denotes an error in the LUXI protocol."""
76
79 """Connection closed error."""
80
83 """Operation timeout error."""
84
87 """Error on request.
88
89 This signifies an error in the request format or request handling,
90 but not (e.g.) an error in starting up an instance.
91
92 Some common conditions that can trigger this exception:
93 - job submission failed because the job data was wrong
94 - query failed because required fields were missing
95
96 """
97
100 """The master cannot be reached.
101
102 This means that the master daemon is not running or the socket has
103 been removed.
104
105 """
106
109 """Permission denied while connecting to the master socket.
110
111 This means the user doesn't have the proper rights.
112
113 """
114
117 """Low-level transport class.
118
119 This is used on the client side.
120
121 This could be replace by any other class that provides the same
122 semantics to the Client. This means:
123 - can send messages and receive messages
124 - safe for multithreading
125
126 """
127
128 - def __init__(self, address, timeouts=None):
129 """Constructor for the Client class.
130
131 Arguments:
132 - address: a valid address the the used transport class
133 - timeout: a list of timeouts, to be used on connect and read/write
134
135 There are two timeouts used since we might want to wait for a long
136 time for a response, but the connect timeout should be lower.
137
138 If not passed, we use a default of 10 and respectively 60 seconds.
139
140 Note that on reading data, since the timeout applies to an
141 invidual receive, it might be that the total duration is longer
142 than timeout value passed (we make a hard limit at twice the read
143 timeout).
144
145 """
146 self.address = address
147 if timeouts is None:
148 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
149 else:
150 self._ctimeout, self._rwtimeout = timeouts
151
152 self.socket = None
153 self._buffer = ""
154 self._msgs = collections.deque()
155
156 try:
157 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
158
159
160 try:
161 utils.Retry(self._Connect, 1.0, self._ctimeout,
162 args=(self.socket, address, self._ctimeout))
163 except utils.RetryTimeout:
164 raise TimeoutError("Connect timed out")
165
166 self.socket.settimeout(self._rwtimeout)
167 except (socket.error, NoMasterError):
168 if self.socket is not None:
169 self.socket.close()
170 self.socket = None
171 raise
172
173 @staticmethod
175 sock.settimeout(timeout)
176 try:
177 sock.connect(address)
178 except socket.timeout, err:
179 raise TimeoutError("Connect timed out: %s" % str(err))
180 except socket.error, err:
181 error_code = err.args[0]
182 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
183 raise NoMasterError(address)
184 elif error_code in (errno.EPERM, errno.EACCES):
185 raise PermissionError(address)
186 elif error_code == errno.EAGAIN:
187
188 raise utils.RetryAgain()
189 raise
190
192 """Make sure we are connected.
193
194 """
195 if self.socket is None:
196 raise ProtocolError("Connection is closed")
197
198 - def Send(self, msg):
199 """Send a message.
200
201 This just sends a message and doesn't wait for the response.
202
203 """
204 if constants.LUXI_EOM in msg:
205 raise ProtocolError("Message terminator found in payload")
206
207 self._CheckSocket()
208 try:
209
210 self.socket.sendall(msg + constants.LUXI_EOM)
211 except socket.timeout, err:
212 raise TimeoutError("Sending timeout: %s" % str(err))
213
215 """Try to receive a message from the socket.
216
217 In case we already have messages queued, we just return from the
218 queue. Otherwise, we try to read data with a _rwtimeout network
219 timeout, and making sure we don't go over 2x_rwtimeout as a global
220 limit.
221
222 """
223 self._CheckSocket()
224 etime = time.time() + self._rwtimeout
225 while not self._msgs:
226 if time.time() > etime:
227 raise TimeoutError("Extended receive timeout")
228 while True:
229 try:
230 data = self.socket.recv(4096)
231 except socket.error, err:
232 if err.args and err.args[0] == errno.EAGAIN:
233 continue
234 raise
235 except socket.timeout, err:
236 raise TimeoutError("Receive timeout: %s" % str(err))
237 break
238 if not data:
239 raise ConnectionClosedError("Connection closed while reading")
240 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
241 self._buffer = new_msgs.pop()
242 self._msgs.extend(new_msgs)
243 return self._msgs.popleft()
244
245 - def Call(self, msg):
246 """Send a message and wait for the response.
247
248 This is just a wrapper over Send and Recv.
249
250 """
251 self.Send(msg)
252 return self.Recv()
253
255 """Close the socket"""
256 if self.socket is not None:
257 self.socket.close()
258 self.socket = None
259
262 """Parses a LUXI request message.
263
264 """
265 try:
266 request = serializer.LoadJson(msg)
267 except ValueError, err:
268 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
269
270 logging.debug("LUXI request: %s", request)
271
272 if not isinstance(request, dict):
273 logging.error("LUXI request not a dict: %r", msg)
274 raise ProtocolError("Invalid LUXI request (not a dict)")
275
276 method = request.get(KEY_METHOD, None)
277 args = request.get(KEY_ARGS, None)
278 version = request.get(KEY_VERSION, None)
279
280 if method is None or args is None:
281 logging.error("LUXI request missing method or arguments: %r", msg)
282 raise ProtocolError(("Invalid LUXI request (no method or arguments"
283 " in request): %r") % msg)
284
285 return (method, args, version)
286
306
323
340
343 """Send a LUXI request via a transport and return the response.
344
345 """
346 assert callable(transport_cb)
347
348 request_msg = FormatRequest(method, args, version=version)
349
350
351 response_msg = transport_cb(request_msg)
352
353 (success, result, resp_version) = ParseResponse(response_msg)
354
355
356 if resp_version is not None and resp_version != version:
357 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
358 (version, resp_version))
359
360 if success:
361 return result
362
363 errors.MaybeRaise(result)
364 raise RequestError(result)
365
368 """High-level client implementation.
369
370 This uses a backing Transport-like class on top of which it
371 implements data serialization/deserialization.
372
373 """
375 """Constructor for the Client class.
376
377 Arguments:
378 - address: a valid address the the used transport class
379 - timeout: a list of timeouts, to be used on connect and read/write
380 - transport: a Transport-like class
381
382
383 If timeout is not passed, the default timeouts of the transport
384 class are used.
385
386 """
387 if address is None:
388 address = constants.MASTER_SOCKET
389 self.address = address
390 self.timeouts = timeouts
391 self.transport_class = transport
392 self.transport = None
393 self._InitTransport()
394
396 """(Re)initialize the transport if needed.
397
398 """
399 if self.transport is None:
400 self.transport = self.transport_class(self.address,
401 timeouts=self.timeouts)
402
404 """Close the transport, ignoring errors.
405
406 """
407 if self.transport is None:
408 return
409 try:
410 old_transp = self.transport
411 self.transport = None
412 old_transp.Close()
413 except Exception:
414 pass
415
424
431
434
437
441
447
450
453
457
461 """Waits for changes on a job.
462
463 @param job_id: Job ID
464 @type fields: list
465 @param fields: List of field names to be observed
466 @type prev_job_info: None or list
467 @param prev_job_info: Previously received job information
468 @type prev_log_serial: None or int/long
469 @param prev_log_serial: Highest log serial number previously received
470 @type timeout: int/float
471 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
472 be capped to that value)
473
474 """
475 assert timeout >= 0, "Timeout can not be negative"
476 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
477 (job_id, fields, prev_job_info,
478 prev_log_serial,
479 min(WFJC_TIMEOUT, timeout)))
480
488
491
494
495 - def QueryNodes(self, names, fields, use_locking):
497
500
503
506
509
512