1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import compat
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
44 from ganeti import pathutils
45
46
47 KEY_METHOD = "method"
48 KEY_ARGS = "args"
49 KEY_SUCCESS = "success"
50 KEY_RESULT = "result"
51 KEY_VERSION = "version"
52
53 REQ_SUBMIT_JOB = "SubmitJob"
54 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
55 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
56 REQ_CANCEL_JOB = "CancelJob"
57 REQ_ARCHIVE_JOB = "ArchiveJob"
58 REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
59 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
60 REQ_QUERY = "Query"
61 REQ_QUERY_FIELDS = "QueryFields"
62 REQ_QUERY_JOBS = "QueryJobs"
63 REQ_QUERY_INSTANCES = "QueryInstances"
64 REQ_QUERY_NODES = "QueryNodes"
65 REQ_QUERY_GROUPS = "QueryGroups"
66 REQ_QUERY_NETWORKS = "QueryNetworks"
67 REQ_QUERY_EXPORTS = "QueryExports"
68 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
69 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
70 REQ_QUERY_TAGS = "QueryTags"
71 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
72 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
73
74
75 REQ_ALL = compat.UniqueFrozenset([
76 REQ_ARCHIVE_JOB,
77 REQ_AUTO_ARCHIVE_JOBS,
78 REQ_CANCEL_JOB,
79 REQ_CHANGE_JOB_PRIORITY,
80 REQ_QUERY,
81 REQ_QUERY_CLUSTER_INFO,
82 REQ_QUERY_CONFIG_VALUES,
83 REQ_QUERY_EXPORTS,
84 REQ_QUERY_FIELDS,
85 REQ_QUERY_GROUPS,
86 REQ_QUERY_INSTANCES,
87 REQ_QUERY_JOBS,
88 REQ_QUERY_NODES,
89 REQ_QUERY_TAGS,
90 REQ_SET_DRAIN_FLAG,
91 REQ_SET_WATCHER_PAUSE,
92 REQ_SUBMIT_JOB,
93 REQ_SUBMIT_MANY_JOBS,
94 REQ_WAIT_FOR_JOB_CHANGE,
95 ])
96
97 DEF_CTMO = 10
98 DEF_RWTO = 60
99
100
101 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
105 """Denotes an error in the LUXI protocol."""
106
109 """Connection closed error."""
110
113 """Operation timeout error."""
114
117 """Error on request.
118
119 This signifies an error in the request format or request handling,
120 but not (e.g.) an error in starting up an instance.
121
122 Some common conditions that can trigger this exception:
123 - job submission failed because the job data was wrong
124 - query failed because required fields were missing
125
126 """
127
130 """The master cannot be reached.
131
132 This means that the master daemon is not running or the socket has
133 been removed.
134
135 """
136
139 """Permission denied while connecting to the master socket.
140
141 This means the user doesn't have the proper rights.
142
143 """
144
147 """Low-level transport class.
148
149 This is used on the client side.
150
151 This could be replace by any other class that provides the same
152 semantics to the Client. This means:
153 - can send messages and receive messages
154 - safe for multithreading
155
156 """
157
158 - def __init__(self, address, timeouts=None):
159 """Constructor for the Client class.
160
161 Arguments:
162 - address: a valid address the the used transport class
163 - timeout: a list of timeouts, to be used on connect and read/write
164
165 There are two timeouts used since we might want to wait for a long
166 time for a response, but the connect timeout should be lower.
167
168 If not passed, we use a default of 10 and respectively 60 seconds.
169
170 Note that on reading data, since the timeout applies to an
171 invidual receive, it might be that the total duration is longer
172 than timeout value passed (we make a hard limit at twice the read
173 timeout).
174
175 """
176 self.address = address
177 if timeouts is None:
178 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
179 else:
180 self._ctimeout, self._rwtimeout = timeouts
181
182 self.socket = None
183 self._buffer = ""
184 self._msgs = collections.deque()
185
186 try:
187 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
188
189
190 try:
191 utils.Retry(self._Connect, 1.0, self._ctimeout,
192 args=(self.socket, address, self._ctimeout))
193 except utils.RetryTimeout:
194 raise TimeoutError("Connect timed out")
195
196 self.socket.settimeout(self._rwtimeout)
197 except (socket.error, NoMasterError):
198 if self.socket is not None:
199 self.socket.close()
200 self.socket = None
201 raise
202
203 @staticmethod
205 sock.settimeout(timeout)
206 try:
207 sock.connect(address)
208 except socket.timeout, err:
209 raise TimeoutError("Connect timed out: %s" % str(err))
210 except socket.error, err:
211 error_code = err.args[0]
212 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
213 raise NoMasterError(address)
214 elif error_code in (errno.EPERM, errno.EACCES):
215 raise PermissionError(address)
216 elif error_code == errno.EAGAIN:
217
218 raise utils.RetryAgain()
219 raise
220
222 """Make sure we are connected.
223
224 """
225 if self.socket is None:
226 raise ProtocolError("Connection is closed")
227
228 - def Send(self, msg):
229 """Send a message.
230
231 This just sends a message and doesn't wait for the response.
232
233 """
234 if constants.LUXI_EOM in msg:
235 raise ProtocolError("Message terminator found in payload")
236
237 self._CheckSocket()
238 try:
239
240 self.socket.sendall(msg + constants.LUXI_EOM)
241 except socket.timeout, err:
242 raise TimeoutError("Sending timeout: %s" % str(err))
243
245 """Try to receive a message from the socket.
246
247 In case we already have messages queued, we just return from the
248 queue. Otherwise, we try to read data with a _rwtimeout network
249 timeout, and making sure we don't go over 2x_rwtimeout as a global
250 limit.
251
252 """
253 self._CheckSocket()
254 etime = time.time() + self._rwtimeout
255 while not self._msgs:
256 if time.time() > etime:
257 raise TimeoutError("Extended receive timeout")
258 while True:
259 try:
260 data = self.socket.recv(4096)
261 except socket.timeout, err:
262 raise TimeoutError("Receive timeout: %s" % str(err))
263 except socket.error, err:
264 if err.args and err.args[0] == errno.EAGAIN:
265 continue
266 raise
267 break
268 if not data:
269 raise ConnectionClosedError("Connection closed while reading")
270 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
271 self._buffer = new_msgs.pop()
272 self._msgs.extend(new_msgs)
273 return self._msgs.popleft()
274
275 - def Call(self, msg):
276 """Send a message and wait for the response.
277
278 This is just a wrapper over Send and Recv.
279
280 """
281 self.Send(msg)
282 return self.Recv()
283
285 """Close the socket"""
286 if self.socket is not None:
287 self.socket.close()
288 self.socket = None
289
292 """Parses a LUXI request message.
293
294 """
295 try:
296 request = serializer.LoadJson(msg)
297 except ValueError, err:
298 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
299
300 logging.debug("LUXI request: %s", request)
301
302 if not isinstance(request, dict):
303 logging.error("LUXI request not a dict: %r", msg)
304 raise ProtocolError("Invalid LUXI request (not a dict)")
305
306 method = request.get(KEY_METHOD, None)
307 args = request.get(KEY_ARGS, None)
308 version = request.get(KEY_VERSION, None)
309
310 if method is None or args is None:
311 logging.error("LUXI request missing method or arguments: %r", msg)
312 raise ProtocolError(("Invalid LUXI request (no method or arguments"
313 " in request): %r") % msg)
314
315 return (method, args, version)
316
338
355
372
375 """Send a LUXI request via a transport and return the response.
376
377 """
378 assert callable(transport_cb)
379
380 request_msg = FormatRequest(method, args, version=version)
381
382
383 response_msg = transport_cb(request_msg)
384
385 (success, result, resp_version) = ParseResponse(response_msg)
386
387
388 if resp_version is not None and resp_version != version:
389 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
390 (version, resp_version))
391
392 if success:
393 return result
394
395 errors.MaybeRaise(result)
396 raise RequestError(result)
397
400 """High-level client implementation.
401
402 This uses a backing Transport-like class on top of which it
403 implements data serialization/deserialization.
404
405 """
407 """Constructor for the Client class.
408
409 Arguments:
410 - address: a valid address the the used transport class
411 - timeout: a list of timeouts, to be used on connect and read/write
412 - transport: a Transport-like class
413
414
415 If timeout is not passed, the default timeouts of the transport
416 class are used.
417
418 """
419 if address is None:
420 address = pathutils.MASTER_SOCKET
421 self.address = address
422 self.timeouts = timeouts
423 self.transport_class = transport
424 self.transport = None
425 self._InitTransport()
426
428 """(Re)initialize the transport if needed.
429
430 """
431 if self.transport is None:
432 self.transport = self.transport_class(self.address,
433 timeouts=self.timeouts)
434
436 """Close the transport, ignoring errors.
437
438 """
439 if self.transport is None:
440 return
441 try:
442 old_transp = self.transport
443 self.transport = None
444 old_transp.Close()
445 except Exception:
446 pass
447
456
458 """Close the underlying connection.
459
460 """
461 self._CloseTransport()
462
472
475
478
482
488
491
494
497
501
505 """Waits for changes on a job.
506
507 @param job_id: Job ID
508 @type fields: list
509 @param fields: List of field names to be observed
510 @type prev_job_info: None or list
511 @param prev_job_info: Previously received job information
512 @type prev_log_serial: None or int/long
513 @param prev_log_serial: Highest log serial number previously received
514 @type timeout: int/float
515 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
516 be capped to that value)
517
518 """
519 assert timeout >= 0, "Timeout can not be negative"
520 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
521 (job_id, fields, prev_job_info,
522 prev_log_serial,
523 min(WFJC_TIMEOUT, timeout)))
524
532
533 - def Query(self, what, fields, qfilter):
534 """Query for resources/items.
535
536 @param what: One of L{constants.QR_VIA_LUXI}
537 @type fields: List of strings
538 @param fields: List of requested fields
539 @type qfilter: None or list
540 @param qfilter: Query filter
541 @rtype: L{objects.QueryResponse}
542
543 """
544 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
545 return objects.QueryResponse.FromDict(result)
546
548 """Query for available fields.
549
550 @param what: One of L{constants.QR_VIA_LUXI}
551 @type fields: None or list of strings
552 @param fields: List of requested fields
553 @rtype: L{objects.QueryFieldsResponse}
554
555 """
556 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
557 return objects.QueryFieldsResponse.FromDict(result)
558
561
564
565 - def QueryNodes(self, names, fields, use_locking):
567
570
573
576
579
582
585