Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module for the unix socket protocol 
 23   
 24  This module implements the local unix socket protocol. You only need 
 25  this module and the opcodes module in the client program in order to 
 26  communicate with the master. 
 27   
 28  The module is also used by the master daemon. 
 29   
 30  """ 
 31   
 32  import socket 
 33  import collections 
 34  import time 
 35  import errno 
 36  import logging 
 37   
 38  from ganeti import serializer 
 39  from ganeti import constants 
 40  from ganeti import errors 
 41  from ganeti import utils 
 42  from ganeti import objects 
 43   
 44   
 45  KEY_METHOD = "method" 
 46  KEY_ARGS = "args" 
 47  KEY_SUCCESS = "success" 
 48  KEY_RESULT = "result" 
 49  KEY_VERSION = "version" 
 50   
 51  REQ_SUBMIT_JOB = "SubmitJob" 
 52  REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs" 
 53  REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" 
 54  REQ_CANCEL_JOB = "CancelJob" 
 55  REQ_ARCHIVE_JOB = "ArchiveJob" 
 56  REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs" 
 57  REQ_QUERY = "Query" 
 58  REQ_QUERY_FIELDS = "QueryFields" 
 59  REQ_QUERY_JOBS = "QueryJobs" 
 60  REQ_QUERY_INSTANCES = "QueryInstances" 
 61  REQ_QUERY_NODES = "QueryNodes" 
 62  REQ_QUERY_GROUPS = "QueryGroups" 
 63  REQ_QUERY_EXPORTS = "QueryExports" 
 64  REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" 
 65  REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" 
 66  REQ_QUERY_TAGS = "QueryTags" 
 67  REQ_SET_DRAIN_FLAG = "SetDrainFlag" 
 68  REQ_SET_WATCHER_PAUSE = "SetWatcherPause" 
 69   
 70  #: List of all LUXI requests 
 71  REQ_ALL = frozenset([ 
 72    REQ_ARCHIVE_JOB, 
 73    REQ_AUTO_ARCHIVE_JOBS, 
 74    REQ_CANCEL_JOB, 
 75    REQ_QUERY, 
 76    REQ_QUERY_CLUSTER_INFO, 
 77    REQ_QUERY_CONFIG_VALUES, 
 78    REQ_QUERY_EXPORTS, 
 79    REQ_QUERY_FIELDS, 
 80    REQ_QUERY_GROUPS, 
 81    REQ_QUERY_INSTANCES, 
 82    REQ_QUERY_JOBS, 
 83    REQ_QUERY_NODES, 
 84    REQ_QUERY_TAGS, 
 85    REQ_SET_DRAIN_FLAG, 
 86    REQ_SET_WATCHER_PAUSE, 
 87    REQ_SUBMIT_JOB, 
 88    REQ_SUBMIT_MANY_JOBS, 
 89    REQ_WAIT_FOR_JOB_CHANGE, 
 90    ]) 
 91   
 92  DEF_CTMO = 10 
 93  DEF_RWTO = 60 
 94   
 95  # WaitForJobChange timeout 
 96  WFJC_TIMEOUT = (DEF_RWTO - 1) / 2 
97 98 99 -class ProtocolError(errors.LuxiError):
100 """Denotes an error in the LUXI protocol."""
101
102 103 -class ConnectionClosedError(ProtocolError):
104 """Connection closed error."""
105
106 107 -class TimeoutError(ProtocolError):
108 """Operation timeout error."""
109
110 111 -class RequestError(ProtocolError):
112 """Error on request. 113 114 This signifies an error in the request format or request handling, 115 but not (e.g.) an error in starting up an instance. 116 117 Some common conditions that can trigger this exception: 118 - job submission failed because the job data was wrong 119 - query failed because required fields were missing 120 121 """
122
123 124 -class NoMasterError(ProtocolError):
125 """The master cannot be reached. 126 127 This means that the master daemon is not running or the socket has 128 been removed. 129 130 """
131
132 133 -class PermissionError(ProtocolError):
134 """Permission denied while connecting to the master socket. 135 136 This means the user doesn't have the proper rights. 137 138 """
139
140 141 -class Transport:
142 """Low-level transport class. 143 144 This is used on the client side. 145 146 This could be replace by any other class that provides the same 147 semantics to the Client. This means: 148 - can send messages and receive messages 149 - safe for multithreading 150 151 """ 152
153 - def __init__(self, address, timeouts=None):
154 """Constructor for the Client class. 155 156 Arguments: 157 - address: a valid address the the used transport class 158 - timeout: a list of timeouts, to be used on connect and read/write 159 160 There are two timeouts used since we might want to wait for a long 161 time for a response, but the connect timeout should be lower. 162 163 If not passed, we use a default of 10 and respectively 60 seconds. 164 165 Note that on reading data, since the timeout applies to an 166 invidual receive, it might be that the total duration is longer 167 than timeout value passed (we make a hard limit at twice the read 168 timeout). 169 170 """ 171 self.address = address 172 if timeouts is None: 173 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 174 else: 175 self._ctimeout, self._rwtimeout = timeouts 176 177 self.socket = None 178 self._buffer = "" 179 self._msgs = collections.deque() 180 181 try: 182 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 183 184 # Try to connect 185 try: 186 utils.Retry(self._Connect, 1.0, self._ctimeout, 187 args=(self.socket, address, self._ctimeout)) 188 except utils.RetryTimeout: 189 raise TimeoutError("Connect timed out") 190 191 self.socket.settimeout(self._rwtimeout) 192 except (socket.error, NoMasterError): 193 if self.socket is not None: 194 self.socket.close() 195 self.socket = None 196 raise
197 198 @staticmethod
199 - def _Connect(sock, address, timeout):
200 sock.settimeout(timeout) 201 try: 202 sock.connect(address) 203 except socket.timeout, err: 204 raise TimeoutError("Connect timed out: %s" % str(err)) 205 except socket.error, err: 206 error_code = err.args[0] 207 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 208 raise NoMasterError(address) 209 elif error_code in (errno.EPERM, errno.EACCES): 210 raise PermissionError(address) 211 elif error_code == errno.EAGAIN: 212 # Server's socket backlog is full at the moment 213 raise utils.RetryAgain() 214 raise
215
216 - def _CheckSocket(self):
217 """Make sure we are connected. 218 219 """ 220 if self.socket is None: 221 raise ProtocolError("Connection is closed")
222
223 - def Send(self, msg):
224 """Send a message. 225 226 This just sends a message and doesn't wait for the response. 227 228 """ 229 if constants.LUXI_EOM in msg: 230 raise ProtocolError("Message terminator found in payload") 231 232 self._CheckSocket() 233 try: 234 # TODO: sendall is not guaranteed to send everything 235 self.socket.sendall(msg + constants.LUXI_EOM) 236 except socket.timeout, err: 237 raise TimeoutError("Sending timeout: %s" % str(err))
238
239 - def Recv(self):
240 """Try to receive a message from the socket. 241 242 In case we already have messages queued, we just return from the 243 queue. Otherwise, we try to read data with a _rwtimeout network 244 timeout, and making sure we don't go over 2x_rwtimeout as a global 245 limit. 246 247 """ 248 self._CheckSocket() 249 etime = time.time() + self._rwtimeout 250 while not self._msgs: 251 if time.time() > etime: 252 raise TimeoutError("Extended receive timeout") 253 while True: 254 try: 255 data = self.socket.recv(4096) 256 except socket.timeout, err: 257 raise TimeoutError("Receive timeout: %s" % str(err)) 258 except socket.error, err: 259 if err.args and err.args[0] == errno.EAGAIN: 260 continue 261 raise 262 break 263 if not data: 264 raise ConnectionClosedError("Connection closed while reading") 265 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 266 self._buffer = new_msgs.pop() 267 self._msgs.extend(new_msgs) 268 return self._msgs.popleft()
269
270 - def Call(self, msg):
271 """Send a message and wait for the response. 272 273 This is just a wrapper over Send and Recv. 274 275 """ 276 self.Send(msg) 277 return self.Recv()
278
279 - def Close(self):
280 """Close the socket""" 281 if self.socket is not None: 282 self.socket.close() 283 self.socket = None
284
285 286 -def ParseRequest(msg):
287 """Parses a LUXI request message. 288 289 """ 290 try: 291 request = serializer.LoadJson(msg) 292 except ValueError, err: 293 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) 294 295 logging.debug("LUXI request: %s", request) 296 297 if not isinstance(request, dict): 298 logging.error("LUXI request not a dict: %r", msg) 299 raise ProtocolError("Invalid LUXI request (not a dict)") 300 301 method = request.get(KEY_METHOD, None) # pylint: disable=E1103 302 args = request.get(KEY_ARGS, None) # pylint: disable=E1103 303 version = request.get(KEY_VERSION, None) # pylint: disable=E1103 304 305 if method is None or args is None: 306 logging.error("LUXI request missing method or arguments: %r", msg) 307 raise ProtocolError(("Invalid LUXI request (no method or arguments" 308 " in request): %r") % msg) 309 310 return (method, args, version)
311
312 313 -def ParseResponse(msg):
314 """Parses a LUXI response message. 315 316 """ 317 # Parse the result 318 try: 319 data = serializer.LoadJson(msg) 320 except KeyboardInterrupt: 321 raise 322 except Exception, err: 323 raise ProtocolError("Error while deserializing response: %s" % str(err)) 324 325 # Validate response 326 if not (isinstance(data, dict) and 327 KEY_SUCCESS in data and 328 KEY_RESULT in data): 329 raise ProtocolError("Invalid response from server: %r" % data) 330 331 return (data[KEY_SUCCESS], data[KEY_RESULT], 332 data.get(KEY_VERSION, None)) # pylint: disable=E1103
333
334 335 -def FormatResponse(success, result, version=None):
336 """Formats a LUXI response message. 337 338 """ 339 response = { 340 KEY_SUCCESS: success, 341 KEY_RESULT: result, 342 } 343 344 if version is not None: 345 response[KEY_VERSION] = version 346 347 logging.debug("LUXI response: %s", response) 348 349 return serializer.DumpJson(response)
350
351 352 -def FormatRequest(method, args, version=None):
353 """Formats a LUXI request message. 354 355 """ 356 # Build request 357 request = { 358 KEY_METHOD: method, 359 KEY_ARGS: args, 360 } 361 362 if version is not None: 363 request[KEY_VERSION] = version 364 365 # Serialize the request 366 return serializer.DumpJson(request)
367
368 369 -def CallLuxiMethod(transport_cb, method, args, version=None):
370 """Send a LUXI request via a transport and return the response. 371 372 """ 373 assert callable(transport_cb) 374 375 request_msg = FormatRequest(method, args, version=version) 376 377 # Send request and wait for response 378 response_msg = transport_cb(request_msg) 379 380 (success, result, resp_version) = ParseResponse(response_msg) 381 382 # Verify version if there was one in the response 383 if resp_version is not None and resp_version != version: 384 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" % 385 (version, resp_version)) 386 387 if success: 388 return result 389 390 errors.MaybeRaise(result) 391 raise RequestError(result)
392
393 394 -class Client(object):
395 """High-level client implementation. 396 397 This uses a backing Transport-like class on top of which it 398 implements data serialization/deserialization. 399 400 """
401 - def __init__(self, address=None, timeouts=None, transport=Transport):
402 """Constructor for the Client class. 403 404 Arguments: 405 - address: a valid address the the used transport class 406 - timeout: a list of timeouts, to be used on connect and read/write 407 - transport: a Transport-like class 408 409 410 If timeout is not passed, the default timeouts of the transport 411 class are used. 412 413 """ 414 if address is None: 415 address = constants.MASTER_SOCKET 416 self.address = address 417 self.timeouts = timeouts 418 self.transport_class = transport 419 self.transport = None 420 self._InitTransport()
421
422 - def _InitTransport(self):
423 """(Re)initialize the transport if needed. 424 425 """ 426 if self.transport is None: 427 self.transport = self.transport_class(self.address, 428 timeouts=self.timeouts)
429
430 - def _CloseTransport(self):
431 """Close the transport, ignoring errors. 432 433 """ 434 if self.transport is None: 435 return 436 try: 437 old_transp = self.transport 438 self.transport = None 439 old_transp.Close() 440 except Exception: # pylint: disable=W0703 441 pass
442
443 - def _SendMethodCall(self, data):
444 # Send request and wait for response 445 try: 446 self._InitTransport() 447 return self.transport.Call(data) 448 except Exception: 449 self._CloseTransport() 450 raise
451
452 - def Close(self):
453 """Close the underlying connection. 454 455 """ 456 self._CloseTransport()
457
458 - def CallMethod(self, method, args):
459 """Send a generic request and return the response. 460 461 """ 462 if not isinstance(args, (list, tuple)): 463 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:" 464 " expected list, got %s" % type(args)) 465 return CallLuxiMethod(self._SendMethodCall, method, args, 466 version=constants.LUXI_VERSION)
467
468 - def SetQueueDrainFlag(self, drain_flag):
469 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
470
471 - def SetWatcherPause(self, until):
472 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
473
474 - def SubmitJob(self, ops):
475 ops_state = map(lambda op: op.__getstate__(), ops) 476 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
477
478 - def SubmitManyJobs(self, jobs):
479 jobs_state = [] 480 for ops in jobs: 481 jobs_state.append([op.__getstate__() for op in ops]) 482 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
483
484 - def CancelJob(self, job_id):
485 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
486
487 - def ArchiveJob(self, job_id):
488 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
489
490 - def AutoArchiveJobs(self, age):
491 timeout = (DEF_RWTO - 1) / 2 492 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
493
494 - def WaitForJobChangeOnce(self, job_id, fields, 495 prev_job_info, prev_log_serial, 496 timeout=WFJC_TIMEOUT):
497 """Waits for changes on a job. 498 499 @param job_id: Job ID 500 @type fields: list 501 @param fields: List of field names to be observed 502 @type prev_job_info: None or list 503 @param prev_job_info: Previously received job information 504 @type prev_log_serial: None or int/long 505 @param prev_log_serial: Highest log serial number previously received 506 @type timeout: int/float 507 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 508 be capped to that value) 509 510 """ 511 assert timeout >= 0, "Timeout can not be negative" 512 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 513 (job_id, fields, prev_job_info, 514 prev_log_serial, 515 min(WFJC_TIMEOUT, timeout)))
516
517 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
518 while True: 519 result = self.WaitForJobChangeOnce(job_id, fields, 520 prev_job_info, prev_log_serial) 521 if result != constants.JOB_NOTCHANGED: 522 break 523 return result
524
525 - def Query(self, what, fields, qfilter):
526 """Query for resources/items. 527 528 @param what: One of L{constants.QR_VIA_LUXI} 529 @type fields: List of strings 530 @param fields: List of requested fields 531 @type qfilter: None or list 532 @param qfilter: Query filter 533 @rtype: L{objects.QueryResponse} 534 535 """ 536 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter)) 537 return objects.QueryResponse.FromDict(result)
538
539 - def QueryFields(self, what, fields):
540 """Query for available fields. 541 542 @param what: One of L{constants.QR_VIA_LUXI} 543 @type fields: None or list of strings 544 @param fields: List of requested fields 545 @rtype: L{objects.QueryFieldsResponse} 546 547 """ 548 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields)) 549 return objects.QueryFieldsResponse.FromDict(result)
550
551 - def QueryJobs(self, job_ids, fields):
552 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
553
554 - def QueryInstances(self, names, fields, use_locking):
555 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
556
557 - def QueryNodes(self, names, fields, use_locking):
558 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
559
560 - def QueryGroups(self, names, fields, use_locking):
561 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
562
563 - def QueryExports(self, nodes, use_locking):
564 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
565
566 - def QueryClusterInfo(self):
567 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
568
569 - def QueryConfigValues(self, fields):
570 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
571
572 - def QueryTags(self, kind, name):
573 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
574