1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module for the unix socket protocol
32
33 This module implements the local unix socket protocol. You only need
34 this module and the opcodes module in the client program in order to
35 communicate with the master.
36
37 The module is also used by the master daemon.
38
39 """
40
41 import socket
42 import collections
43 import time
44 import errno
45 import logging
46
47 from ganeti import serializer
48 from ganeti import constants
49 from ganeti import errors
50 from ganeti import utils
51 from ganeti import objects
52 from ganeti import pathutils
53
54
55 KEY_METHOD = constants.LUXI_KEY_METHOD
56 KEY_ARGS = constants.LUXI_KEY_ARGS
57 KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
58 KEY_RESULT = constants.LUXI_KEY_RESULT
59 KEY_VERSION = constants.LUXI_KEY_VERSION
60
61 REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
62 REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
63 REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
64 REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
65 REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
66 REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
67 REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
68 REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
69 REQ_QUERY = constants.LUXI_REQ_QUERY
70 REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
71 REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
72 REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
73 REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
74 REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
75 REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
76 REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
77 REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
78 REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
79 REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
80 REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
81 REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
82 REQ_ALL = constants.LUXI_REQ_ALL
83
84 DEF_CTMO = constants.LUXI_DEF_CTMO
85 DEF_RWTO = constants.LUXI_DEF_RWTO
86 WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
90 """Denotes an error in the LUXI protocol."""
91
94 """Connection closed error."""
95
98 """Operation timeout error."""
99
102 """Error on request.
103
104 This signifies an error in the request format or request handling,
105 but not (e.g.) an error in starting up an instance.
106
107 Some common conditions that can trigger this exception:
108 - job submission failed because the job data was wrong
109 - query failed because required fields were missing
110
111 """
112
115 """The master cannot be reached.
116
117 This means that the master daemon is not running or the socket has
118 been removed.
119
120 """
121
124 """Permission denied while connecting to the master socket.
125
126 This means the user doesn't have the proper rights.
127
128 """
129
132 """Low-level transport class.
133
134 This is used on the client side.
135
136 This could be replace by any other class that provides the same
137 semantics to the Client. This means:
138 - can send messages and receive messages
139 - safe for multithreading
140
141 """
142
143 - def __init__(self, address, timeouts=None):
144 """Constructor for the Client class.
145
146 Arguments:
147 - address: a valid address the the used transport class
148 - timeout: a list of timeouts, to be used on connect and read/write
149
150 There are two timeouts used since we might want to wait for a long
151 time for a response, but the connect timeout should be lower.
152
153 If not passed, we use a default of 10 and respectively 60 seconds.
154
155 Note that on reading data, since the timeout applies to an
156 invidual receive, it might be that the total duration is longer
157 than timeout value passed (we make a hard limit at twice the read
158 timeout).
159
160 """
161 self.address = address
162 if timeouts is None:
163 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
164 else:
165 self._ctimeout, self._rwtimeout = timeouts
166
167 self.socket = None
168 self._buffer = ""
169 self._msgs = collections.deque()
170
171 try:
172 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
173
174
175 try:
176 utils.Retry(self._Connect, 1.0, self._ctimeout,
177 args=(self.socket, address, self._ctimeout))
178 except utils.RetryTimeout:
179 raise TimeoutError("Connect timed out")
180
181 self.socket.settimeout(self._rwtimeout)
182 except (socket.error, NoMasterError):
183 if self.socket is not None:
184 self.socket.close()
185 self.socket = None
186 raise
187
188 @staticmethod
190 sock.settimeout(timeout)
191 try:
192 sock.connect(address)
193 except socket.timeout, err:
194 raise TimeoutError("Connect timed out: %s" % str(err))
195 except socket.error, err:
196 error_code = err.args[0]
197 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
198 raise NoMasterError(address)
199 elif error_code in (errno.EPERM, errno.EACCES):
200 raise PermissionError(address)
201 elif error_code == errno.EAGAIN:
202
203 raise utils.RetryAgain()
204 raise
205
207 """Make sure we are connected.
208
209 """
210 if self.socket is None:
211 raise ProtocolError("Connection is closed")
212
213 - def Send(self, msg):
214 """Send a message.
215
216 This just sends a message and doesn't wait for the response.
217
218 """
219 if constants.LUXI_EOM in msg:
220 raise ProtocolError("Message terminator found in payload")
221
222 self._CheckSocket()
223 try:
224
225 self.socket.sendall(msg + constants.LUXI_EOM)
226 except socket.timeout, err:
227 raise TimeoutError("Sending timeout: %s" % str(err))
228
230 """Try to receive a message from the socket.
231
232 In case we already have messages queued, we just return from the
233 queue. Otherwise, we try to read data with a _rwtimeout network
234 timeout, and making sure we don't go over 2x_rwtimeout as a global
235 limit.
236
237 """
238 self._CheckSocket()
239 etime = time.time() + self._rwtimeout
240 while not self._msgs:
241 if time.time() > etime:
242 raise TimeoutError("Extended receive timeout")
243 while True:
244 try:
245 data = self.socket.recv(4096)
246 except socket.timeout, err:
247 raise TimeoutError("Receive timeout: %s" % str(err))
248 except socket.error, err:
249 if err.args and err.args[0] == errno.EAGAIN:
250 continue
251 raise
252 break
253 if not data:
254 raise ConnectionClosedError("Connection closed while reading")
255 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
256 self._buffer = new_msgs.pop()
257 self._msgs.extend(new_msgs)
258 return self._msgs.popleft()
259
260 - def Call(self, msg):
261 """Send a message and wait for the response.
262
263 This is just a wrapper over Send and Recv.
264
265 """
266 self.Send(msg)
267 return self.Recv()
268
270 """Close the socket"""
271 if self.socket is not None:
272 self.socket.close()
273 self.socket = None
274
277 """Parses a LUXI request message.
278
279 """
280 try:
281 request = serializer.LoadJson(msg)
282 except ValueError, err:
283 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
284
285 logging.debug("LUXI request: %s", request)
286
287 if not isinstance(request, dict):
288 logging.error("LUXI request not a dict: %r", msg)
289 raise ProtocolError("Invalid LUXI request (not a dict)")
290
291 method = request.get(KEY_METHOD, None)
292 args = request.get(KEY_ARGS, None)
293 version = request.get(KEY_VERSION, None)
294
295 if method is None or args is None:
296 logging.error("LUXI request missing method or arguments: %r", msg)
297 raise ProtocolError(("Invalid LUXI request (no method or arguments"
298 " in request): %r") % msg)
299
300 return (method, args, version)
301
323
340
357
360 """Send a LUXI request via a transport and return the response.
361
362 """
363 assert callable(transport_cb)
364
365 request_msg = FormatRequest(method, args, version=version)
366
367
368 response_msg = transport_cb(request_msg)
369
370 (success, result, resp_version) = ParseResponse(response_msg)
371
372
373 if resp_version is not None and resp_version != version:
374 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
375 (version, resp_version))
376
377 if success:
378 return result
379
380 errors.MaybeRaise(result)
381 raise RequestError(result)
382
385 """High-level client implementation.
386
387 This uses a backing Transport-like class on top of which it
388 implements data serialization/deserialization.
389
390 """
392 """Constructor for the Client class.
393
394 Arguments:
395 - address: a valid address the the used transport class
396 - timeout: a list of timeouts, to be used on connect and read/write
397 - transport: a Transport-like class
398
399
400 If timeout is not passed, the default timeouts of the transport
401 class are used.
402
403 """
404 if address is None:
405 address = pathutils.MASTER_SOCKET
406 self.address = address
407 self.timeouts = timeouts
408 self.transport_class = transport
409 self.transport = None
410 self._InitTransport()
411
413 """(Re)initialize the transport if needed.
414
415 """
416 if self.transport is None:
417 self.transport = self.transport_class(self.address,
418 timeouts=self.timeouts)
419
421 """Close the transport, ignoring errors.
422
423 """
424 if self.transport is None:
425 return
426 try:
427 old_transp = self.transport
428 self.transport = None
429 old_transp.Close()
430 except Exception:
431 pass
432
441
443 """Close the underlying connection.
444
445 """
446 self._CloseTransport()
447
457
460
463
467
471
477
478 @staticmethod
480 try:
481 return int(job_id)
482 except ValueError:
483 raise RequestError("Invalid parameter passed to %s as job id: "
484 " expected integer, got value %s" %
485 (request_name, job_id))
486
490
494
498
502
506 """Waits for changes on a job.
507
508 @param job_id: Job ID
509 @type fields: list
510 @param fields: List of field names to be observed
511 @type prev_job_info: None or list
512 @param prev_job_info: Previously received job information
513 @type prev_log_serial: None or int/long
514 @param prev_log_serial: Highest log serial number previously received
515 @type timeout: int/float
516 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
517 be capped to that value)
518
519 """
520 assert timeout >= 0, "Timeout can not be negative"
521 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
522 (job_id, fields, prev_job_info,
523 prev_log_serial,
524 min(WFJC_TIMEOUT, timeout)))
525
534
535 - def Query(self, what, fields, qfilter):
536 """Query for resources/items.
537
538 @param what: One of L{constants.QR_VIA_LUXI}
539 @type fields: List of strings
540 @param fields: List of requested fields
541 @type qfilter: None or list
542 @param qfilter: Query filter
543 @rtype: L{objects.QueryResponse}
544
545 """
546 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
547 return objects.QueryResponse.FromDict(result)
548
550 """Query for available fields.
551
552 @param what: One of L{constants.QR_VIA_LUXI}
553 @type fields: None or list of strings
554 @param fields: List of requested fields
555 @rtype: L{objects.QueryFieldsResponse}
556
557 """
558 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
559 return objects.QueryFieldsResponse.FromDict(result)
560
563
566
567 - def QueryNodes(self, names, fields, use_locking):
569
572
575
578
581
584
587