Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module for the unix socket protocol 
 23   
 24  This module implements the local unix socket protocol. You only need 
 25  this module and the opcodes module in the client program in order to 
 26  communicate with the master. 
 27   
 28  The module is also used by the master daemon. 
 29   
 30  """ 
 31   
 32  import socket 
 33  import collections 
 34  import time 
 35  import errno 
 36  import logging 
 37  import warnings 
 38   
 39  from ganeti import serializer 
 40  from ganeti import constants 
 41  from ganeti import errors 
 42  from ganeti import utils 
 43  from ganeti import objects 
 44   
 45   
 46  KEY_METHOD = "method" 
 47  KEY_ARGS = "args" 
 48  KEY_SUCCESS = "success" 
 49  KEY_RESULT = "result" 
 50  KEY_VERSION = "version" 
 51   
 52  REQ_SUBMIT_JOB = "SubmitJob" 
 53  REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs" 
 54  REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" 
 55  REQ_CANCEL_JOB = "CancelJob" 
 56  REQ_ARCHIVE_JOB = "ArchiveJob" 
 57  REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs" 
 58  REQ_QUERY = "Query" 
 59  REQ_QUERY_FIELDS = "QueryFields" 
 60  REQ_QUERY_JOBS = "QueryJobs" 
 61  REQ_QUERY_INSTANCES = "QueryInstances" 
 62  REQ_QUERY_NODES = "QueryNodes" 
 63  REQ_QUERY_GROUPS = "QueryGroups" 
 64  REQ_QUERY_EXPORTS = "QueryExports" 
 65  REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" 
 66  REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" 
 67  REQ_QUERY_TAGS = "QueryTags" 
 68  REQ_QUERY_LOCKS = "QueryLocks" 
 69  REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag" 
 70  REQ_SET_WATCHER_PAUSE = "SetWatcherPause" 
 71   
 72  DEF_CTMO = 10 
 73  DEF_RWTO = 60 
 74   
 75  # WaitForJobChange timeout 
 76  WFJC_TIMEOUT = (DEF_RWTO - 1) / 2 
77 78 79 -class ProtocolError(errors.LuxiError):
80 """Denotes an error in the LUXI protocol."""
81
82 83 -class ConnectionClosedError(ProtocolError):
84 """Connection closed error."""
85
86 87 -class TimeoutError(ProtocolError):
88 """Operation timeout error."""
89
90 91 -class RequestError(ProtocolError):
92 """Error on request. 93 94 This signifies an error in the request format or request handling, 95 but not (e.g.) an error in starting up an instance. 96 97 Some common conditions that can trigger this exception: 98 - job submission failed because the job data was wrong 99 - query failed because required fields were missing 100 101 """
102
103 104 -class NoMasterError(ProtocolError):
105 """The master cannot be reached. 106 107 This means that the master daemon is not running or the socket has 108 been removed. 109 110 """
111
112 113 -class PermissionError(ProtocolError):
114 """Permission denied while connecting to the master socket. 115 116 This means the user doesn't have the proper rights. 117 118 """
119
120 121 -class Transport:
122 """Low-level transport class. 123 124 This is used on the client side. 125 126 This could be replace by any other class that provides the same 127 semantics to the Client. This means: 128 - can send messages and receive messages 129 - safe for multithreading 130 131 """ 132
133 - def __init__(self, address, timeouts=None):
134 """Constructor for the Client class. 135 136 Arguments: 137 - address: a valid address the the used transport class 138 - timeout: a list of timeouts, to be used on connect and read/write 139 140 There are two timeouts used since we might want to wait for a long 141 time for a response, but the connect timeout should be lower. 142 143 If not passed, we use a default of 10 and respectively 60 seconds. 144 145 Note that on reading data, since the timeout applies to an 146 invidual receive, it might be that the total duration is longer 147 than timeout value passed (we make a hard limit at twice the read 148 timeout). 149 150 """ 151 self.address = address 152 if timeouts is None: 153 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 154 else: 155 self._ctimeout, self._rwtimeout = timeouts 156 157 self.socket = None 158 self._buffer = "" 159 self._msgs = collections.deque() 160 161 try: 162 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 163 164 # Try to connect 165 try: 166 utils.Retry(self._Connect, 1.0, self._ctimeout, 167 args=(self.socket, address, self._ctimeout)) 168 except utils.RetryTimeout: 169 raise TimeoutError("Connect timed out") 170 171 self.socket.settimeout(self._rwtimeout) 172 except (socket.error, NoMasterError): 173 if self.socket is not None: 174 self.socket.close() 175 self.socket = None 176 raise
177 178 @staticmethod
179 - def _Connect(sock, address, timeout):
180 sock.settimeout(timeout) 181 try: 182 sock.connect(address) 183 except socket.timeout, err: 184 raise TimeoutError("Connect timed out: %s" % str(err)) 185 except socket.error, err: 186 error_code = err.args[0] 187 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 188 raise NoMasterError(address) 189 elif error_code in (errno.EPERM, errno.EACCES): 190 raise PermissionError(address) 191 elif error_code == errno.EAGAIN: 192 # Server's socket backlog is full at the moment 193 raise utils.RetryAgain() 194 raise
195
196 - def _CheckSocket(self):
197 """Make sure we are connected. 198 199 """ 200 if self.socket is None: 201 raise ProtocolError("Connection is closed")
202
203 - def Send(self, msg):
204 """Send a message. 205 206 This just sends a message and doesn't wait for the response. 207 208 """ 209 if constants.LUXI_EOM in msg: 210 raise ProtocolError("Message terminator found in payload") 211 212 self._CheckSocket() 213 try: 214 # TODO: sendall is not guaranteed to send everything 215 self.socket.sendall(msg + constants.LUXI_EOM) 216 except socket.timeout, err: 217 raise TimeoutError("Sending timeout: %s" % str(err))
218
219 - def Recv(self):
220 """Try to receive a message from the socket. 221 222 In case we already have messages queued, we just return from the 223 queue. Otherwise, we try to read data with a _rwtimeout network 224 timeout, and making sure we don't go over 2x_rwtimeout as a global 225 limit. 226 227 """ 228 self._CheckSocket() 229 etime = time.time() + self._rwtimeout 230 while not self._msgs: 231 if time.time() > etime: 232 raise TimeoutError("Extended receive timeout") 233 while True: 234 try: 235 data = self.socket.recv(4096) 236 except socket.timeout, err: 237 raise TimeoutError("Receive timeout: %s" % str(err)) 238 except socket.error, err: 239 if err.args and err.args[0] == errno.EAGAIN: 240 continue 241 raise 242 break 243 if not data: 244 raise ConnectionClosedError("Connection closed while reading") 245 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 246 self._buffer = new_msgs.pop() 247 self._msgs.extend(new_msgs) 248 return self._msgs.popleft()
249
250 - def Call(self, msg):
251 """Send a message and wait for the response. 252 253 This is just a wrapper over Send and Recv. 254 255 """ 256 self.Send(msg) 257 return self.Recv()
258
259 - def Close(self):
260 """Close the socket""" 261 if self.socket is not None: 262 self.socket.close() 263 self.socket = None
264
265 266 -def ParseRequest(msg):
267 """Parses a LUXI request message. 268 269 """ 270 try: 271 request = serializer.LoadJson(msg) 272 except ValueError, err: 273 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) 274 275 logging.debug("LUXI request: %s", request) 276 277 if not isinstance(request, dict): 278 logging.error("LUXI request not a dict: %r", msg) 279 raise ProtocolError("Invalid LUXI request (not a dict)") 280 281 method = request.get(KEY_METHOD, None) # pylint: disable=E1103 282 args = request.get(KEY_ARGS, None) # pylint: disable=E1103 283 version = request.get(KEY_VERSION, None) # pylint: disable=E1103 284 285 if method is None or args is None: 286 logging.error("LUXI request missing method or arguments: %r", msg) 287 raise ProtocolError(("Invalid LUXI request (no method or arguments" 288 " in request): %r") % msg) 289 290 return (method, args, version)
291
292 293 -def ParseResponse(msg):
294 """Parses a LUXI response message. 295 296 """ 297 # Parse the result 298 try: 299 data = serializer.LoadJson(msg) 300 except KeyboardInterrupt: 301 raise 302 except Exception, err: 303 raise ProtocolError("Error while deserializing response: %s" % str(err)) 304 305 # Validate response 306 if not (isinstance(data, dict) and 307 KEY_SUCCESS in data and 308 KEY_RESULT in data): 309 raise ProtocolError("Invalid response from server: %r" % data) 310 311 return (data[KEY_SUCCESS], data[KEY_RESULT], 312 data.get(KEY_VERSION, None)) # pylint: disable=E1103
313
314 315 -def FormatResponse(success, result, version=None):
316 """Formats a LUXI response message. 317 318 """ 319 response = { 320 KEY_SUCCESS: success, 321 KEY_RESULT: result, 322 } 323 324 if version is not None: 325 response[KEY_VERSION] = version 326 327 logging.debug("LUXI response: %s", response) 328 329 return serializer.DumpJson(response)
330
331 332 -def FormatRequest(method, args, version=None):
333 """Formats a LUXI request message. 334 335 """ 336 # Build request 337 request = { 338 KEY_METHOD: method, 339 KEY_ARGS: args, 340 } 341 342 if version is not None: 343 request[KEY_VERSION] = version 344 345 # Serialize the request 346 return serializer.DumpJson(request, indent=False)
347
348 349 -def CallLuxiMethod(transport_cb, method, args, version=None):
350 """Send a LUXI request via a transport and return the response. 351 352 """ 353 assert callable(transport_cb) 354 355 request_msg = FormatRequest(method, args, version=version) 356 357 # Send request and wait for response 358 response_msg = transport_cb(request_msg) 359 360 (success, result, resp_version) = ParseResponse(response_msg) 361 362 # Verify version if there was one in the response 363 if resp_version is not None and resp_version != version: 364 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" % 365 (version, resp_version)) 366 367 if success: 368 return result 369 370 errors.MaybeRaise(result) 371 raise RequestError(result)
372
373 374 -class Client(object):
375 """High-level client implementation. 376 377 This uses a backing Transport-like class on top of which it 378 implements data serialization/deserialization. 379 380 """
381 - def __init__(self, address=None, timeouts=None, transport=Transport):
382 """Constructor for the Client class. 383 384 Arguments: 385 - address: a valid address the the used transport class 386 - timeout: a list of timeouts, to be used on connect and read/write 387 - transport: a Transport-like class 388 389 390 If timeout is not passed, the default timeouts of the transport 391 class are used. 392 393 """ 394 if address is None: 395 address = constants.MASTER_SOCKET 396 self.address = address 397 self.timeouts = timeouts 398 self.transport_class = transport 399 self.transport = None 400 self._InitTransport()
401
402 - def _InitTransport(self):
403 """(Re)initialize the transport if needed. 404 405 """ 406 if self.transport is None: 407 self.transport = self.transport_class(self.address, 408 timeouts=self.timeouts)
409
410 - def _CloseTransport(self):
411 """Close the transport, ignoring errors. 412 413 """ 414 if self.transport is None: 415 return 416 try: 417 old_transp = self.transport 418 self.transport = None 419 old_transp.Close() 420 except Exception: # pylint: disable=W0703 421 pass
422
423 - def _SendMethodCall(self, data):
424 # Send request and wait for response 425 try: 426 self._InitTransport() 427 return self.transport.Call(data) 428 except Exception: 429 self._CloseTransport() 430 raise
431
432 - def Close(self):
433 """Close the underlying connection. 434 435 """ 436 self._CloseTransport()
437
438 - def CallMethod(self, method, args):
439 """Send a generic request and return the response. 440 441 """ 442 return CallLuxiMethod(self._SendMethodCall, method, args, 443 version=constants.LUXI_VERSION)
444
445 - def SetQueueDrainFlag(self, drain_flag):
446 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
447
448 - def SetWatcherPause(self, until):
449 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
450
451 - def SubmitJob(self, ops):
452 ops_state = map(lambda op: op.__getstate__(), ops) 453 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
454
455 - def SubmitManyJobs(self, jobs):
456 jobs_state = [] 457 for ops in jobs: 458 jobs_state.append([op.__getstate__() for op in ops]) 459 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
460
461 - def CancelJob(self, job_id):
462 return self.CallMethod(REQ_CANCEL_JOB, job_id)
463
464 - def ArchiveJob(self, job_id):
465 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
466
467 - def AutoArchiveJobs(self, age):
468 timeout = (DEF_RWTO - 1) / 2 469 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
470
471 - def WaitForJobChangeOnce(self, job_id, fields, 472 prev_job_info, prev_log_serial, 473 timeout=WFJC_TIMEOUT):
474 """Waits for changes on a job. 475 476 @param job_id: Job ID 477 @type fields: list 478 @param fields: List of field names to be observed 479 @type prev_job_info: None or list 480 @param prev_job_info: Previously received job information 481 @type prev_log_serial: None or int/long 482 @param prev_log_serial: Highest log serial number previously received 483 @type timeout: int/float 484 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 485 be capped to that value) 486 487 """ 488 assert timeout >= 0, "Timeout can not be negative" 489 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 490 (job_id, fields, prev_job_info, 491 prev_log_serial, 492 min(WFJC_TIMEOUT, timeout)))
493
494 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
495 while True: 496 result = self.WaitForJobChangeOnce(job_id, fields, 497 prev_job_info, prev_log_serial) 498 if result != constants.JOB_NOTCHANGED: 499 break 500 return result
501
502 - def Query(self, what, fields, filter_):
503 """Query for resources/items. 504 505 @param what: One of L{constants.QR_VIA_LUXI} 506 @type fields: List of strings 507 @param fields: List of requested fields 508 @type filter_: None or list 509 @param filter_: Query filter 510 @rtype: L{objects.QueryResponse} 511 512 """ 513 req = objects.QueryRequest(what=what, fields=fields, filter=filter_) 514 result = self.CallMethod(REQ_QUERY, req.ToDict()) 515 return objects.QueryResponse.FromDict(result)
516
517 - def QueryFields(self, what, fields):
518 """Query for available fields. 519 520 @param what: One of L{constants.QR_VIA_LUXI} 521 @type fields: None or list of strings 522 @param fields: List of requested fields 523 @rtype: L{objects.QueryFieldsResponse} 524 525 """ 526 req = objects.QueryFieldsRequest(what=what, fields=fields) 527 result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict()) 528 return objects.QueryFieldsResponse.FromDict(result)
529
530 - def QueryJobs(self, job_ids, fields):
531 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
532
533 - def QueryInstances(self, names, fields, use_locking):
534 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
535
536 - def QueryNodes(self, names, fields, use_locking):
537 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
538
539 - def QueryGroups(self, names, fields, use_locking):
540 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
541
542 - def QueryExports(self, nodes, use_locking):
543 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
544
545 - def QueryClusterInfo(self):
546 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
547
548 - def QueryConfigValues(self, fields):
549 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
550
551 - def QueryTags(self, kind, name):
552 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
553
554 - def QueryLocks(self, fields, sync):
555 warnings.warn("This LUXI call is deprecated and will be removed, use" 556 " Query(\"%s\", ...) instead" % constants.QR_LOCK) 557 return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))
558