Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module for the unix socket protocol 
 23   
 24  This module implements the local unix socket protocol. You only need 
 25  this module and the opcodes module in the client program in order to 
 26  communicate with the master. 
 27   
 28  The module is also used by the master daemon. 
 29   
 30  """ 
 31   
 32  import socket 
 33  import collections 
 34  import time 
 35  import errno 
 36   
 37  from ganeti import serializer 
 38  from ganeti import constants 
 39  from ganeti import errors 
 40   
 41   
 42  KEY_METHOD = 'method' 
 43  KEY_ARGS = 'args' 
 44  KEY_SUCCESS = "success" 
 45  KEY_RESULT = "result" 
 46   
 47  REQ_SUBMIT_JOB = "SubmitJob" 
 48  REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs" 
 49  REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" 
 50  REQ_CANCEL_JOB = "CancelJob" 
 51  REQ_ARCHIVE_JOB = "ArchiveJob" 
 52  REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs" 
 53  REQ_QUERY_JOBS = "QueryJobs" 
 54  REQ_QUERY_INSTANCES = "QueryInstances" 
 55  REQ_QUERY_NODES = "QueryNodes" 
 56  REQ_QUERY_EXPORTS = "QueryExports" 
 57  REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" 
 58  REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" 
 59  REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag" 
 60   
 61  DEF_CTMO = 10 
 62  DEF_RWTO = 60 
 63   
 64   
65 -class ProtocolError(Exception):
66 """Denotes an error in the server communication"""
67 68
69 -class ConnectionClosedError(ProtocolError):
70 """Connection closed error"""
71 72
73 -class TimeoutError(ProtocolError):
74 """Operation timeout error"""
75 76
77 -class EncodingError(ProtocolError):
78 """Encoding failure on the sending side"""
79 80
81 -class DecodingError(ProtocolError):
82 """Decoding failure on the receiving side"""
83 84
85 -class RequestError(ProtocolError):
86 """Error on request 87 88 This signifies an error in the request format or request handling, 89 but not (e.g.) an error in starting up an instance. 90 91 Some common conditions that can trigger this exception: 92 - job submission failed because the job data was wrong 93 - query failed because required fields were missing 94 95 """
96 97
98 -class NoMasterError(ProtocolError):
99 """The master cannot be reached 100 101 This means that the master daemon is not running or the socket has 102 been removed. 103 104 """
105 106
107 -class Transport:
108 """Low-level transport class. 109 110 This is used on the client side. 111 112 This could be replace by any other class that provides the same 113 semantics to the Client. This means: 114 - can send messages and receive messages 115 - safe for multithreading 116 117 """ 118
119 - def __init__(self, address, timeouts=None, eom=None):
120 """Constructor for the Client class. 121 122 Arguments: 123 - address: a valid address the the used transport class 124 - timeout: a list of timeouts, to be used on connect and read/write 125 - eom: an identifier to be used as end-of-message which the 126 upper-layer will guarantee that this identifier will not appear 127 in any message 128 129 There are two timeouts used since we might want to wait for a long 130 time for a response, but the connect timeout should be lower. 131 132 If not passed, we use a default of 10 and respectively 60 seconds. 133 134 Note that on reading data, since the timeout applies to an 135 invidual receive, it might be that the total duration is longer 136 than timeout value passed (we make a hard limit at twice the read 137 timeout). 138 139 """ 140 self.address = address 141 if timeouts is None: 142 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 143 else: 144 self._ctimeout, self._rwtimeout = timeouts 145 146 self.socket = None 147 self._buffer = "" 148 self._msgs = collections.deque() 149 150 if eom is None: 151 self.eom = '\3' 152 else: 153 self.eom = eom 154 155 try: 156 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 157 self.socket.settimeout(self._ctimeout) 158 try: 159 self.socket.connect(address) 160 except socket.timeout, err: 161 raise TimeoutError("Connect timed out: %s" % str(err)) 162 except socket.error, err: 163 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED): 164 raise NoMasterError((address,)) 165 raise 166 self.socket.settimeout(self._rwtimeout) 167 except (socket.error, NoMasterError): 168 if self.socket is not None: 169 self.socket.close() 170 self.socket = None 171 raise
172
173 - def _CheckSocket(self):
174 """Make sure we are connected. 175 176 """ 177 if self.socket is None: 178 raise ProtocolError("Connection is closed")
179
180 - def Send(self, msg):
181 """Send a message. 182 183 This just sends a message and doesn't wait for the response. 184 185 """ 186 if self.eom in msg: 187 raise EncodingError("Message terminator found in payload") 188 self._CheckSocket() 189 try: 190 # TODO: sendall is not guaranteed to send everything 191 self.socket.sendall(msg + self.eom) 192 except socket.timeout, err: 193 raise TimeoutError("Sending timeout: %s" % str(err))
194
195 - def Recv(self):
196 """Try to receive a message from the socket. 197 198 In case we already have messages queued, we just return from the 199 queue. Otherwise, we try to read data with a _rwtimeout network 200 timeout, and making sure we don't go over 2x_rwtimeout as a global 201 limit. 202 203 """ 204 self._CheckSocket() 205 etime = time.time() + self._rwtimeout 206 while not self._msgs: 207 if time.time() > etime: 208 raise TimeoutError("Extended receive timeout") 209 while True: 210 try: 211 data = self.socket.recv(4096) 212 except socket.error, err: 213 if err.args and err.args[0] == errno.EAGAIN: 214 continue 215 raise 216 except socket.timeout, err: 217 raise TimeoutError("Receive timeout: %s" % str(err)) 218 break 219 if not data: 220 raise ConnectionClosedError("Connection closed while reading") 221 new_msgs = (self._buffer + data).split(self.eom) 222 self._buffer = new_msgs.pop() 223 self._msgs.extend(new_msgs) 224 return self._msgs.popleft()
225
226 - def Call(self, msg):
227 """Send a message and wait for the response. 228 229 This is just a wrapper over Send and Recv. 230 231 """ 232 self.Send(msg) 233 return self.Recv()
234
235 - def Close(self):
236 """Close the socket""" 237 if self.socket is not None: 238 self.socket.close() 239 self.socket = None
240 241
242 -class Client(object):
243 """High-level client implementation. 244 245 This uses a backing Transport-like class on top of which it 246 implements data serialization/deserialization. 247 248 """
249 - def __init__(self, address=None, timeouts=None, transport=Transport):
250 """Constructor for the Client class. 251 252 Arguments: 253 - address: a valid address the the used transport class 254 - timeout: a list of timeouts, to be used on connect and read/write 255 - transport: a Transport-like class 256 257 258 If timeout is not passed, the default timeouts of the transport 259 class are used. 260 261 """ 262 if address is None: 263 address = constants.MASTER_SOCKET 264 self.address = address 265 self.timeouts = timeouts 266 self.transport_class = transport 267 self.transport = None 268 self._InitTransport()
269
270 - def _InitTransport(self):
271 """(Re)initialize the transport if needed. 272 273 """ 274 if self.transport is None: 275 self.transport = self.transport_class(self.address, 276 timeouts=self.timeouts)
277
278 - def _CloseTransport(self):
279 """Close the transport, ignoring errors. 280 281 """ 282 if self.transport is None: 283 return 284 try: 285 old_transp = self.transport 286 self.transport = None 287 old_transp.Close() 288 except Exception: 289 pass
290
291 - def CallMethod(self, method, args):
292 """Send a generic request and return the response. 293 294 """ 295 # Build request 296 request = { 297 KEY_METHOD: method, 298 KEY_ARGS: args, 299 } 300 301 # Serialize the request 302 send_data = serializer.DumpJson(request, indent=False) 303 304 # Send request and wait for response 305 try: 306 self._InitTransport() 307 result = self.transport.Call(send_data) 308 except Exception: 309 self._CloseTransport() 310 raise 311 312 # Parse the result 313 try: 314 data = serializer.LoadJson(result) 315 except Exception, err: 316 raise ProtocolError("Error while deserializing response: %s" % str(err)) 317 318 # Validate response 319 if (not isinstance(data, dict) or 320 KEY_SUCCESS not in data or 321 KEY_RESULT not in data): 322 raise DecodingError("Invalid response from server: %s" % str(data)) 323 324 result = data[KEY_RESULT] 325 326 if not data[KEY_SUCCESS]: 327 errors.MaybeRaise(result) 328 raise RequestError(result) 329 330 return result
331
332 - def SetQueueDrainFlag(self, drain_flag):
333 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
334
335 - def SubmitJob(self, ops):
336 ops_state = map(lambda op: op.__getstate__(), ops) 337 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
338
339 - def SubmitManyJobs(self, jobs):
340 jobs_state = [] 341 for ops in jobs: 342 jobs_state.append([op.__getstate__() for op in ops]) 343 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
344
345 - def CancelJob(self, job_id):
346 return self.CallMethod(REQ_CANCEL_JOB, job_id)
347
348 - def ArchiveJob(self, job_id):
349 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
350
351 - def AutoArchiveJobs(self, age):
352 timeout = (DEF_RWTO - 1) / 2 353 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
354
355 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
356 timeout = (DEF_RWTO - 1) / 2 357 while True: 358 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 359 (job_id, fields, prev_job_info, 360 prev_log_serial, timeout)) 361 if result != constants.JOB_NOTCHANGED: 362 break 363 return result
364
365 - def QueryJobs(self, job_ids, fields):
366 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
367
368 - def QueryInstances(self, names, fields, use_locking):
369 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
370
371 - def QueryNodes(self, names, fields, use_locking):
372 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
373
374 - def QueryExports(self, nodes, use_locking):
375 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
376
377 - def QueryClusterInfo(self):
378 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
379
380 - def QueryConfigValues(self, fields):
381 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
382 383 384 # TODO: class Server(object) 385