1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42
43
44 KEY_METHOD = "method"
45 KEY_ARGS = "args"
46 KEY_SUCCESS = "success"
47 KEY_RESULT = "result"
48
49 REQ_SUBMIT_JOB = "SubmitJob"
50 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52 REQ_CANCEL_JOB = "CancelJob"
53 REQ_ARCHIVE_JOB = "ArchiveJob"
54 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55 REQ_QUERY_JOBS = "QueryJobs"
56 REQ_QUERY_INSTANCES = "QueryInstances"
57 REQ_QUERY_NODES = "QueryNodes"
58 REQ_QUERY_EXPORTS = "QueryExports"
59 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 REQ_QUERY_TAGS = "QueryTags"
62 REQ_QUERY_LOCKS = "QueryLocks"
63 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
64 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
65
66 DEF_CTMO = 10
67 DEF_RWTO = 60
68
69
70 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
74 """Denotes an error in the LUXI protocol."""
75
78 """Connection closed error."""
79
82 """Operation timeout error."""
83
86 """Error on request.
87
88 This signifies an error in the request format or request handling,
89 but not (e.g.) an error in starting up an instance.
90
91 Some common conditions that can trigger this exception:
92 - job submission failed because the job data was wrong
93 - query failed because required fields were missing
94
95 """
96
99 """The master cannot be reached.
100
101 This means that the master daemon is not running or the socket has
102 been removed.
103
104 """
105
108 """Permission denied while connecting to the master socket.
109
110 This means the user doesn't have the proper rights.
111
112 """
113
116 """Low-level transport class.
117
118 This is used on the client side.
119
120 This could be replace by any other class that provides the same
121 semantics to the Client. This means:
122 - can send messages and receive messages
123 - safe for multithreading
124
125 """
126
127 - def __init__(self, address, timeouts=None):
128 """Constructor for the Client class.
129
130 Arguments:
131 - address: a valid address the the used transport class
132 - timeout: a list of timeouts, to be used on connect and read/write
133
134 There are two timeouts used since we might want to wait for a long
135 time for a response, but the connect timeout should be lower.
136
137 If not passed, we use a default of 10 and respectively 60 seconds.
138
139 Note that on reading data, since the timeout applies to an
140 invidual receive, it might be that the total duration is longer
141 than timeout value passed (we make a hard limit at twice the read
142 timeout).
143
144 """
145 self.address = address
146 if timeouts is None:
147 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
148 else:
149 self._ctimeout, self._rwtimeout = timeouts
150
151 self.socket = None
152 self._buffer = ""
153 self._msgs = collections.deque()
154
155 try:
156 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157
158
159 try:
160 utils.Retry(self._Connect, 1.0, self._ctimeout,
161 args=(self.socket, address, self._ctimeout))
162 except utils.RetryTimeout:
163 raise TimeoutError("Connect timed out")
164
165 self.socket.settimeout(self._rwtimeout)
166 except (socket.error, NoMasterError):
167 if self.socket is not None:
168 self.socket.close()
169 self.socket = None
170 raise
171
172 @staticmethod
174 sock.settimeout(timeout)
175 try:
176 sock.connect(address)
177 except socket.timeout, err:
178 raise TimeoutError("Connect timed out: %s" % str(err))
179 except socket.error, err:
180 error_code = err.args[0]
181 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
182 raise NoMasterError(address)
183 elif error_code in (errno.EPERM, errno.EACCES):
184 raise PermissionError(address)
185 elif error_code == errno.EAGAIN:
186
187 raise utils.RetryAgain()
188 raise
189
191 """Make sure we are connected.
192
193 """
194 if self.socket is None:
195 raise ProtocolError("Connection is closed")
196
197 - def Send(self, msg):
198 """Send a message.
199
200 This just sends a message and doesn't wait for the response.
201
202 """
203 if constants.LUXI_EOM in msg:
204 raise ProtocolError("Message terminator found in payload")
205
206 self._CheckSocket()
207 try:
208
209 self.socket.sendall(msg + constants.LUXI_EOM)
210 except socket.timeout, err:
211 raise TimeoutError("Sending timeout: %s" % str(err))
212
214 """Try to receive a message from the socket.
215
216 In case we already have messages queued, we just return from the
217 queue. Otherwise, we try to read data with a _rwtimeout network
218 timeout, and making sure we don't go over 2x_rwtimeout as a global
219 limit.
220
221 """
222 self._CheckSocket()
223 etime = time.time() + self._rwtimeout
224 while not self._msgs:
225 if time.time() > etime:
226 raise TimeoutError("Extended receive timeout")
227 while True:
228 try:
229 data = self.socket.recv(4096)
230 except socket.error, err:
231 if err.args and err.args[0] == errno.EAGAIN:
232 continue
233 raise
234 except socket.timeout, err:
235 raise TimeoutError("Receive timeout: %s" % str(err))
236 break
237 if not data:
238 raise ConnectionClosedError("Connection closed while reading")
239 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
240 self._buffer = new_msgs.pop()
241 self._msgs.extend(new_msgs)
242 return self._msgs.popleft()
243
244 - def Call(self, msg):
245 """Send a message and wait for the response.
246
247 This is just a wrapper over Send and Recv.
248
249 """
250 self.Send(msg)
251 return self.Recv()
252
254 """Close the socket"""
255 if self.socket is not None:
256 self.socket.close()
257 self.socket = None
258
261 """Parses a LUXI request message.
262
263 """
264 try:
265 request = serializer.LoadJson(msg)
266 except ValueError, err:
267 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
268
269 logging.debug("LUXI request: %s", request)
270
271 if not isinstance(request, dict):
272 logging.error("LUXI request not a dict: %r", msg)
273 raise ProtocolError("Invalid LUXI request (not a dict)")
274
275 method = request.get(KEY_METHOD, None)
276 args = request.get(KEY_ARGS, None)
277
278 if method is None or args is None:
279 logging.error("LUXI request missing method or arguments: %r", msg)
280 raise ProtocolError(("Invalid LUXI request (no method or arguments"
281 " in request): %r") % msg)
282
283 return (method, args)
284
303
317
331
334 """Send a LUXI request via a transport and return the response.
335
336 """
337 assert callable(transport_cb)
338
339 request_msg = FormatRequest(method, args)
340
341
342 response_msg = transport_cb(request_msg)
343
344 (success, result) = ParseResponse(response_msg)
345
346 if success:
347 return result
348
349 errors.MaybeRaise(result)
350 raise RequestError(result)
351
354 """High-level client implementation.
355
356 This uses a backing Transport-like class on top of which it
357 implements data serialization/deserialization.
358
359 """
361 """Constructor for the Client class.
362
363 Arguments:
364 - address: a valid address the the used transport class
365 - timeout: a list of timeouts, to be used on connect and read/write
366 - transport: a Transport-like class
367
368
369 If timeout is not passed, the default timeouts of the transport
370 class are used.
371
372 """
373 if address is None:
374 address = constants.MASTER_SOCKET
375 self.address = address
376 self.timeouts = timeouts
377 self.transport_class = transport
378 self.transport = None
379 self._InitTransport()
380
382 """(Re)initialize the transport if needed.
383
384 """
385 if self.transport is None:
386 self.transport = self.transport_class(self.address,
387 timeouts=self.timeouts)
388
390 """Close the transport, ignoring errors.
391
392 """
393 if self.transport is None:
394 return
395 try:
396 old_transp = self.transport
397 self.transport = None
398 old_transp.Close()
399 except Exception:
400 pass
401
410
416
419
422
426
432
435
438
442
446 """Waits for changes on a job.
447
448 @param job_id: Job ID
449 @type fields: list
450 @param fields: List of field names to be observed
451 @type prev_job_info: None or list
452 @param prev_job_info: Previously received job information
453 @type prev_log_serial: None or int/long
454 @param prev_log_serial: Highest log serial number previously received
455 @type timeout: int/float
456 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
457 be capped to that value)
458
459 """
460 assert timeout >= 0, "Timeout can not be negative"
461 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
462 (job_id, fields, prev_job_info,
463 prev_log_serial,
464 min(WFJC_TIMEOUT, timeout)))
465
473
476
479
480 - def QueryNodes(self, names, fields, use_locking):
482
485
488
491
494
497