1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60
61 DEF_CTMO = 10
62 DEF_RWTO = 60
63
64
66 """Denotes an error in the server communication"""
67
68
70 """Connection closed error"""
71
72
74 """Operation timeout error"""
75
76
78 """Encoding failure on the sending side"""
79
80
82 """Decoding failure on the receiving side"""
83
84
86 """Error on request
87
88 This signifies an error in the request format or request handling,
89 but not (e.g.) an error in starting up an instance.
90
91 Some common conditions that can trigger this exception:
92 - job submission failed because the job data was wrong
93 - query failed because required fields were missing
94
95 """
96
97
99 """The master cannot be reached
100
101 This means that the master daemon is not running or the socket has
102 been removed.
103
104 """
105
106
108 """Low-level transport class.
109
110 This is used on the client side.
111
112 This could be replace by any other class that provides the same
113 semantics to the Client. This means:
114 - can send messages and receive messages
115 - safe for multithreading
116
117 """
118
119 - def __init__(self, address, timeouts=None, eom=None):
120 """Constructor for the Client class.
121
122 Arguments:
123 - address: a valid address the the used transport class
124 - timeout: a list of timeouts, to be used on connect and read/write
125 - eom: an identifier to be used as end-of-message which the
126 upper-layer will guarantee that this identifier will not appear
127 in any message
128
129 There are two timeouts used since we might want to wait for a long
130 time for a response, but the connect timeout should be lower.
131
132 If not passed, we use a default of 10 and respectively 60 seconds.
133
134 Note that on reading data, since the timeout applies to an
135 invidual receive, it might be that the total duration is longer
136 than timeout value passed (we make a hard limit at twice the read
137 timeout).
138
139 """
140 self.address = address
141 if timeouts is None:
142 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
143 else:
144 self._ctimeout, self._rwtimeout = timeouts
145
146 self.socket = None
147 self._buffer = ""
148 self._msgs = collections.deque()
149
150 if eom is None:
151 self.eom = '\3'
152 else:
153 self.eom = eom
154
155 try:
156 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157 self.socket.settimeout(self._ctimeout)
158 try:
159 self.socket.connect(address)
160 except socket.timeout, err:
161 raise TimeoutError("Connect timed out: %s" % str(err))
162 except socket.error, err:
163 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
164 raise NoMasterError((address,))
165 raise
166 self.socket.settimeout(self._rwtimeout)
167 except (socket.error, NoMasterError):
168 if self.socket is not None:
169 self.socket.close()
170 self.socket = None
171 raise
172
174 """Make sure we are connected.
175
176 """
177 if self.socket is None:
178 raise ProtocolError("Connection is closed")
179
180 - def Send(self, msg):
181 """Send a message.
182
183 This just sends a message and doesn't wait for the response.
184
185 """
186 if self.eom in msg:
187 raise EncodingError("Message terminator found in payload")
188 self._CheckSocket()
189 try:
190
191 self.socket.sendall(msg + self.eom)
192 except socket.timeout, err:
193 raise TimeoutError("Sending timeout: %s" % str(err))
194
196 """Try to receive a message from the socket.
197
198 In case we already have messages queued, we just return from the
199 queue. Otherwise, we try to read data with a _rwtimeout network
200 timeout, and making sure we don't go over 2x_rwtimeout as a global
201 limit.
202
203 """
204 self._CheckSocket()
205 etime = time.time() + self._rwtimeout
206 while not self._msgs:
207 if time.time() > etime:
208 raise TimeoutError("Extended receive timeout")
209 while True:
210 try:
211 data = self.socket.recv(4096)
212 except socket.error, err:
213 if err.args and err.args[0] == errno.EAGAIN:
214 continue
215 raise
216 except socket.timeout, err:
217 raise TimeoutError("Receive timeout: %s" % str(err))
218 break
219 if not data:
220 raise ConnectionClosedError("Connection closed while reading")
221 new_msgs = (self._buffer + data).split(self.eom)
222 self._buffer = new_msgs.pop()
223 self._msgs.extend(new_msgs)
224 return self._msgs.popleft()
225
226 - def Call(self, msg):
227 """Send a message and wait for the response.
228
229 This is just a wrapper over Send and Recv.
230
231 """
232 self.Send(msg)
233 return self.Recv()
234
236 """Close the socket"""
237 if self.socket is not None:
238 self.socket.close()
239 self.socket = None
240
241
243 """High-level client implementation.
244
245 This uses a backing Transport-like class on top of which it
246 implements data serialization/deserialization.
247
248 """
250 """Constructor for the Client class.
251
252 Arguments:
253 - address: a valid address the the used transport class
254 - timeout: a list of timeouts, to be used on connect and read/write
255 - transport: a Transport-like class
256
257
258 If timeout is not passed, the default timeouts of the transport
259 class are used.
260
261 """
262 if address is None:
263 address = constants.MASTER_SOCKET
264 self.address = address
265 self.timeouts = timeouts
266 self.transport_class = transport
267 self.transport = None
268 self._InitTransport()
269
271 """(Re)initialize the transport if needed.
272
273 """
274 if self.transport is None:
275 self.transport = self.transport_class(self.address,
276 timeouts=self.timeouts)
277
279 """Close the transport, ignoring errors.
280
281 """
282 if self.transport is None:
283 return
284 try:
285 old_transp = self.transport
286 self.transport = None
287 old_transp.Close()
288 except Exception:
289 pass
290
331
334
338
344
347
350
354
364
367
370
371 - def QueryNodes(self, names, fields, use_locking):
373
376
379
382
383
384
385