1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import compat
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
44 from ganeti import pathutils
45
46
47 KEY_METHOD = "method"
48 KEY_ARGS = "args"
49 KEY_SUCCESS = "success"
50 KEY_RESULT = "result"
51 KEY_VERSION = "version"
52
53 REQ_SUBMIT_JOB = "SubmitJob"
54 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
55 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
56 REQ_CANCEL_JOB = "CancelJob"
57 REQ_ARCHIVE_JOB = "ArchiveJob"
58 REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
59 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
60 REQ_QUERY = "Query"
61 REQ_QUERY_FIELDS = "QueryFields"
62 REQ_QUERY_JOBS = "QueryJobs"
63 REQ_QUERY_INSTANCES = "QueryInstances"
64 REQ_QUERY_NODES = "QueryNodes"
65 REQ_QUERY_GROUPS = "QueryGroups"
66 REQ_QUERY_NETWORKS = "QueryNetworks"
67 REQ_QUERY_EXPORTS = "QueryExports"
68 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
69 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
70 REQ_QUERY_TAGS = "QueryTags"
71 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
72 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
73
74
75 REQ_ALL = compat.UniqueFrozenset([
76 REQ_ARCHIVE_JOB,
77 REQ_AUTO_ARCHIVE_JOBS,
78 REQ_CANCEL_JOB,
79 REQ_CHANGE_JOB_PRIORITY,
80 REQ_QUERY,
81 REQ_QUERY_CLUSTER_INFO,
82 REQ_QUERY_CONFIG_VALUES,
83 REQ_QUERY_EXPORTS,
84 REQ_QUERY_FIELDS,
85 REQ_QUERY_GROUPS,
86 REQ_QUERY_INSTANCES,
87 REQ_QUERY_JOBS,
88 REQ_QUERY_NODES,
89 REQ_QUERY_NETWORKS,
90 REQ_QUERY_TAGS,
91 REQ_SET_DRAIN_FLAG,
92 REQ_SET_WATCHER_PAUSE,
93 REQ_SUBMIT_JOB,
94 REQ_SUBMIT_MANY_JOBS,
95 REQ_WAIT_FOR_JOB_CHANGE,
96 ])
97
98 DEF_CTMO = 10
99 DEF_RWTO = 60
100
101
102 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
106 """Denotes an error in the LUXI protocol."""
107
110 """Connection closed error."""
111
114 """Operation timeout error."""
115
118 """Error on request.
119
120 This signifies an error in the request format or request handling,
121 but not (e.g.) an error in starting up an instance.
122
123 Some common conditions that can trigger this exception:
124 - job submission failed because the job data was wrong
125 - query failed because required fields were missing
126
127 """
128
131 """The master cannot be reached.
132
133 This means that the master daemon is not running or the socket has
134 been removed.
135
136 """
137
140 """Permission denied while connecting to the master socket.
141
142 This means the user doesn't have the proper rights.
143
144 """
145
148 """Low-level transport class.
149
150 This is used on the client side.
151
152 This could be replace by any other class that provides the same
153 semantics to the Client. This means:
154 - can send messages and receive messages
155 - safe for multithreading
156
157 """
158
159 - def __init__(self, address, timeouts=None):
160 """Constructor for the Client class.
161
162 Arguments:
163 - address: a valid address the the used transport class
164 - timeout: a list of timeouts, to be used on connect and read/write
165
166 There are two timeouts used since we might want to wait for a long
167 time for a response, but the connect timeout should be lower.
168
169 If not passed, we use a default of 10 and respectively 60 seconds.
170
171 Note that on reading data, since the timeout applies to an
172 invidual receive, it might be that the total duration is longer
173 than timeout value passed (we make a hard limit at twice the read
174 timeout).
175
176 """
177 self.address = address
178 if timeouts is None:
179 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
180 else:
181 self._ctimeout, self._rwtimeout = timeouts
182
183 self.socket = None
184 self._buffer = ""
185 self._msgs = collections.deque()
186
187 try:
188 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
189
190
191 try:
192 utils.Retry(self._Connect, 1.0, self._ctimeout,
193 args=(self.socket, address, self._ctimeout))
194 except utils.RetryTimeout:
195 raise TimeoutError("Connect timed out")
196
197 self.socket.settimeout(self._rwtimeout)
198 except (socket.error, NoMasterError):
199 if self.socket is not None:
200 self.socket.close()
201 self.socket = None
202 raise
203
204 @staticmethod
206 sock.settimeout(timeout)
207 try:
208 sock.connect(address)
209 except socket.timeout, err:
210 raise TimeoutError("Connect timed out: %s" % str(err))
211 except socket.error, err:
212 error_code = err.args[0]
213 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
214 raise NoMasterError(address)
215 elif error_code in (errno.EPERM, errno.EACCES):
216 raise PermissionError(address)
217 elif error_code == errno.EAGAIN:
218
219 raise utils.RetryAgain()
220 raise
221
223 """Make sure we are connected.
224
225 """
226 if self.socket is None:
227 raise ProtocolError("Connection is closed")
228
229 - def Send(self, msg):
230 """Send a message.
231
232 This just sends a message and doesn't wait for the response.
233
234 """
235 if constants.LUXI_EOM in msg:
236 raise ProtocolError("Message terminator found in payload")
237
238 self._CheckSocket()
239 try:
240
241 self.socket.sendall(msg + constants.LUXI_EOM)
242 except socket.timeout, err:
243 raise TimeoutError("Sending timeout: %s" % str(err))
244
246 """Try to receive a message from the socket.
247
248 In case we already have messages queued, we just return from the
249 queue. Otherwise, we try to read data with a _rwtimeout network
250 timeout, and making sure we don't go over 2x_rwtimeout as a global
251 limit.
252
253 """
254 self._CheckSocket()
255 etime = time.time() + self._rwtimeout
256 while not self._msgs:
257 if time.time() > etime:
258 raise TimeoutError("Extended receive timeout")
259 while True:
260 try:
261 data = self.socket.recv(4096)
262 except socket.timeout, err:
263 raise TimeoutError("Receive timeout: %s" % str(err))
264 except socket.error, err:
265 if err.args and err.args[0] == errno.EAGAIN:
266 continue
267 raise
268 break
269 if not data:
270 raise ConnectionClosedError("Connection closed while reading")
271 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
272 self._buffer = new_msgs.pop()
273 self._msgs.extend(new_msgs)
274 return self._msgs.popleft()
275
276 - def Call(self, msg):
277 """Send a message and wait for the response.
278
279 This is just a wrapper over Send and Recv.
280
281 """
282 self.Send(msg)
283 return self.Recv()
284
286 """Close the socket"""
287 if self.socket is not None:
288 self.socket.close()
289 self.socket = None
290
293 """Parses a LUXI request message.
294
295 """
296 try:
297 request = serializer.LoadJson(msg)
298 except ValueError, err:
299 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
300
301 logging.debug("LUXI request: %s", request)
302
303 if not isinstance(request, dict):
304 logging.error("LUXI request not a dict: %r", msg)
305 raise ProtocolError("Invalid LUXI request (not a dict)")
306
307 method = request.get(KEY_METHOD, None)
308 args = request.get(KEY_ARGS, None)
309 version = request.get(KEY_VERSION, None)
310
311 if method is None or args is None:
312 logging.error("LUXI request missing method or arguments: %r", msg)
313 raise ProtocolError(("Invalid LUXI request (no method or arguments"
314 " in request): %r") % msg)
315
316 return (method, args, version)
317
339
356
373
376 """Send a LUXI request via a transport and return the response.
377
378 """
379 assert callable(transport_cb)
380
381 request_msg = FormatRequest(method, args, version=version)
382
383
384 response_msg = transport_cb(request_msg)
385
386 (success, result, resp_version) = ParseResponse(response_msg)
387
388
389 if resp_version is not None and resp_version != version:
390 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
391 (version, resp_version))
392
393 if success:
394 return result
395
396 errors.MaybeRaise(result)
397 raise RequestError(result)
398
401 """High-level client implementation.
402
403 This uses a backing Transport-like class on top of which it
404 implements data serialization/deserialization.
405
406 """
408 """Constructor for the Client class.
409
410 Arguments:
411 - address: a valid address the the used transport class
412 - timeout: a list of timeouts, to be used on connect and read/write
413 - transport: a Transport-like class
414
415
416 If timeout is not passed, the default timeouts of the transport
417 class are used.
418
419 """
420 if address is None:
421 address = pathutils.MASTER_SOCKET
422 self.address = address
423 self.timeouts = timeouts
424 self.transport_class = transport
425 self.transport = None
426 self._InitTransport()
427
429 """(Re)initialize the transport if needed.
430
431 """
432 if self.transport is None:
433 self.transport = self.transport_class(self.address,
434 timeouts=self.timeouts)
435
437 """Close the transport, ignoring errors.
438
439 """
440 if self.transport is None:
441 return
442 try:
443 old_transp = self.transport
444 self.transport = None
445 old_transp.Close()
446 except Exception:
447 pass
448
457
459 """Close the underlying connection.
460
461 """
462 self._CloseTransport()
463
473
476
479
483
489
490 @staticmethod
492 try:
493 return int(job_id)
494 except ValueError:
495 raise RequestError("Invalid parameter passed to %s as job id: "
496 " expected integer, got value %s" %
497 (request_name, job_id))
498
502
506
510
514
518 """Waits for changes on a job.
519
520 @param job_id: Job ID
521 @type fields: list
522 @param fields: List of field names to be observed
523 @type prev_job_info: None or list
524 @param prev_job_info: Previously received job information
525 @type prev_log_serial: None or int/long
526 @param prev_log_serial: Highest log serial number previously received
527 @type timeout: int/float
528 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
529 be capped to that value)
530
531 """
532 assert timeout >= 0, "Timeout can not be negative"
533 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
534 (job_id, fields, prev_job_info,
535 prev_log_serial,
536 min(WFJC_TIMEOUT, timeout)))
537
546
547 - def Query(self, what, fields, qfilter):
548 """Query for resources/items.
549
550 @param what: One of L{constants.QR_VIA_LUXI}
551 @type fields: List of strings
552 @param fields: List of requested fields
553 @type qfilter: None or list
554 @param qfilter: Query filter
555 @rtype: L{objects.QueryResponse}
556
557 """
558 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
559 return objects.QueryResponse.FromDict(result)
560
562 """Query for available fields.
563
564 @param what: One of L{constants.QR_VIA_LUXI}
565 @type fields: None or list of strings
566 @param fields: List of requested fields
567 @rtype: L{objects.QueryFieldsResponse}
568
569 """
570 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
571 return objects.QueryFieldsResponse.FromDict(result)
572
575
578
579 - def QueryNodes(self, names, fields, use_locking):
581
584
587
590
593
596
599