1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
43
44
45 KEY_METHOD = "method"
46 KEY_ARGS = "args"
47 KEY_SUCCESS = "success"
48 KEY_RESULT = "result"
49 KEY_VERSION = "version"
50
51 REQ_SUBMIT_JOB = "SubmitJob"
52 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54 REQ_CANCEL_JOB = "CancelJob"
55 REQ_ARCHIVE_JOB = "ArchiveJob"
56 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
57 REQ_QUERY = "Query"
58 REQ_QUERY_FIELDS = "QueryFields"
59 REQ_QUERY_JOBS = "QueryJobs"
60 REQ_QUERY_INSTANCES = "QueryInstances"
61 REQ_QUERY_NODES = "QueryNodes"
62 REQ_QUERY_GROUPS = "QueryGroups"
63 REQ_QUERY_EXPORTS = "QueryExports"
64 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66 REQ_QUERY_TAGS = "QueryTags"
67 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
68 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
69
70
71 REQ_ALL = frozenset([
72 REQ_ARCHIVE_JOB,
73 REQ_AUTO_ARCHIVE_JOBS,
74 REQ_CANCEL_JOB,
75 REQ_QUERY,
76 REQ_QUERY_CLUSTER_INFO,
77 REQ_QUERY_CONFIG_VALUES,
78 REQ_QUERY_EXPORTS,
79 REQ_QUERY_FIELDS,
80 REQ_QUERY_GROUPS,
81 REQ_QUERY_INSTANCES,
82 REQ_QUERY_JOBS,
83 REQ_QUERY_NODES,
84 REQ_QUERY_TAGS,
85 REQ_SET_DRAIN_FLAG,
86 REQ_SET_WATCHER_PAUSE,
87 REQ_SUBMIT_JOB,
88 REQ_SUBMIT_MANY_JOBS,
89 REQ_WAIT_FOR_JOB_CHANGE,
90 ])
91
92 DEF_CTMO = 10
93 DEF_RWTO = 60
94
95
96 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
100 """Denotes an error in the LUXI protocol."""
101
104 """Connection closed error."""
105
108 """Operation timeout error."""
109
112 """Error on request.
113
114 This signifies an error in the request format or request handling,
115 but not (e.g.) an error in starting up an instance.
116
117 Some common conditions that can trigger this exception:
118 - job submission failed because the job data was wrong
119 - query failed because required fields were missing
120
121 """
122
125 """The master cannot be reached.
126
127 This means that the master daemon is not running or the socket has
128 been removed.
129
130 """
131
134 """Permission denied while connecting to the master socket.
135
136 This means the user doesn't have the proper rights.
137
138 """
139
142 """Low-level transport class.
143
144 This is used on the client side.
145
146 This could be replace by any other class that provides the same
147 semantics to the Client. This means:
148 - can send messages and receive messages
149 - safe for multithreading
150
151 """
152
153 - def __init__(self, address, timeouts=None):
154 """Constructor for the Client class.
155
156 Arguments:
157 - address: a valid address the the used transport class
158 - timeout: a list of timeouts, to be used on connect and read/write
159
160 There are two timeouts used since we might want to wait for a long
161 time for a response, but the connect timeout should be lower.
162
163 If not passed, we use a default of 10 and respectively 60 seconds.
164
165 Note that on reading data, since the timeout applies to an
166 invidual receive, it might be that the total duration is longer
167 than timeout value passed (we make a hard limit at twice the read
168 timeout).
169
170 """
171 self.address = address
172 if timeouts is None:
173 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
174 else:
175 self._ctimeout, self._rwtimeout = timeouts
176
177 self.socket = None
178 self._buffer = ""
179 self._msgs = collections.deque()
180
181 try:
182 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
183
184
185 try:
186 utils.Retry(self._Connect, 1.0, self._ctimeout,
187 args=(self.socket, address, self._ctimeout))
188 except utils.RetryTimeout:
189 raise TimeoutError("Connect timed out")
190
191 self.socket.settimeout(self._rwtimeout)
192 except (socket.error, NoMasterError):
193 if self.socket is not None:
194 self.socket.close()
195 self.socket = None
196 raise
197
198 @staticmethod
200 sock.settimeout(timeout)
201 try:
202 sock.connect(address)
203 except socket.timeout, err:
204 raise TimeoutError("Connect timed out: %s" % str(err))
205 except socket.error, err:
206 error_code = err.args[0]
207 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
208 raise NoMasterError(address)
209 elif error_code in (errno.EPERM, errno.EACCES):
210 raise PermissionError(address)
211 elif error_code == errno.EAGAIN:
212
213 raise utils.RetryAgain()
214 raise
215
217 """Make sure we are connected.
218
219 """
220 if self.socket is None:
221 raise ProtocolError("Connection is closed")
222
223 - def Send(self, msg):
224 """Send a message.
225
226 This just sends a message and doesn't wait for the response.
227
228 """
229 if constants.LUXI_EOM in msg:
230 raise ProtocolError("Message terminator found in payload")
231
232 self._CheckSocket()
233 try:
234
235 self.socket.sendall(msg + constants.LUXI_EOM)
236 except socket.timeout, err:
237 raise TimeoutError("Sending timeout: %s" % str(err))
238
240 """Try to receive a message from the socket.
241
242 In case we already have messages queued, we just return from the
243 queue. Otherwise, we try to read data with a _rwtimeout network
244 timeout, and making sure we don't go over 2x_rwtimeout as a global
245 limit.
246
247 """
248 self._CheckSocket()
249 etime = time.time() + self._rwtimeout
250 while not self._msgs:
251 if time.time() > etime:
252 raise TimeoutError("Extended receive timeout")
253 while True:
254 try:
255 data = self.socket.recv(4096)
256 except socket.timeout, err:
257 raise TimeoutError("Receive timeout: %s" % str(err))
258 except socket.error, err:
259 if err.args and err.args[0] == errno.EAGAIN:
260 continue
261 raise
262 break
263 if not data:
264 raise ConnectionClosedError("Connection closed while reading")
265 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
266 self._buffer = new_msgs.pop()
267 self._msgs.extend(new_msgs)
268 return self._msgs.popleft()
269
270 - def Call(self, msg):
271 """Send a message and wait for the response.
272
273 This is just a wrapper over Send and Recv.
274
275 """
276 self.Send(msg)
277 return self.Recv()
278
280 """Close the socket"""
281 if self.socket is not None:
282 self.socket.close()
283 self.socket = None
284
287 """Parses a LUXI request message.
288
289 """
290 try:
291 request = serializer.LoadJson(msg)
292 except ValueError, err:
293 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
294
295 logging.debug("LUXI request: %s", request)
296
297 if not isinstance(request, dict):
298 logging.error("LUXI request not a dict: %r", msg)
299 raise ProtocolError("Invalid LUXI request (not a dict)")
300
301 method = request.get(KEY_METHOD, None)
302 args = request.get(KEY_ARGS, None)
303 version = request.get(KEY_VERSION, None)
304
305 if method is None or args is None:
306 logging.error("LUXI request missing method or arguments: %r", msg)
307 raise ProtocolError(("Invalid LUXI request (no method or arguments"
308 " in request): %r") % msg)
309
310 return (method, args, version)
311
333
350
367
370 """Send a LUXI request via a transport and return the response.
371
372 """
373 assert callable(transport_cb)
374
375 request_msg = FormatRequest(method, args, version=version)
376
377
378 response_msg = transport_cb(request_msg)
379
380 (success, result, resp_version) = ParseResponse(response_msg)
381
382
383 if resp_version is not None and resp_version != version:
384 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
385 (version, resp_version))
386
387 if success:
388 return result
389
390 errors.MaybeRaise(result)
391 raise RequestError(result)
392
395 """High-level client implementation.
396
397 This uses a backing Transport-like class on top of which it
398 implements data serialization/deserialization.
399
400 """
402 """Constructor for the Client class.
403
404 Arguments:
405 - address: a valid address the the used transport class
406 - timeout: a list of timeouts, to be used on connect and read/write
407 - transport: a Transport-like class
408
409
410 If timeout is not passed, the default timeouts of the transport
411 class are used.
412
413 """
414 if address is None:
415 address = constants.MASTER_SOCKET
416 self.address = address
417 self.timeouts = timeouts
418 self.transport_class = transport
419 self.transport = None
420 self._InitTransport()
421
423 """(Re)initialize the transport if needed.
424
425 """
426 if self.transport is None:
427 self.transport = self.transport_class(self.address,
428 timeouts=self.timeouts)
429
431 """Close the transport, ignoring errors.
432
433 """
434 if self.transport is None:
435 return
436 try:
437 old_transp = self.transport
438 self.transport = None
439 old_transp.Close()
440 except Exception:
441 pass
442
451
453 """Close the underlying connection.
454
455 """
456 self._CloseTransport()
457
467
470
473
477
483
486
489
493
497 """Waits for changes on a job.
498
499 @param job_id: Job ID
500 @type fields: list
501 @param fields: List of field names to be observed
502 @type prev_job_info: None or list
503 @param prev_job_info: Previously received job information
504 @type prev_log_serial: None or int/long
505 @param prev_log_serial: Highest log serial number previously received
506 @type timeout: int/float
507 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
508 be capped to that value)
509
510 """
511 assert timeout >= 0, "Timeout can not be negative"
512 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
513 (job_id, fields, prev_job_info,
514 prev_log_serial,
515 min(WFJC_TIMEOUT, timeout)))
516
524
525 - def Query(self, what, fields, qfilter):
526 """Query for resources/items.
527
528 @param what: One of L{constants.QR_VIA_LUXI}
529 @type fields: List of strings
530 @param fields: List of requested fields
531 @type qfilter: None or list
532 @param qfilter: Query filter
533 @rtype: L{objects.QueryResponse}
534
535 """
536 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
537 return objects.QueryResponse.FromDict(result)
538
540 """Query for available fields.
541
542 @param what: One of L{constants.QR_VIA_LUXI}
543 @type fields: None or list of strings
544 @param fields: List of requested fields
545 @rtype: L{objects.QueryFieldsResponse}
546
547 """
548 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
549 return objects.QueryFieldsResponse.FromDict(result)
550
553
556
557 - def QueryNodes(self, names, fields, use_locking):
559
562
565
568
571
574