Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module for the unix socket protocol 
 32   
 33  This module implements the local unix socket protocol. You only need 
 34  this module and the opcodes module in the client program in order to 
 35  communicate with the master. 
 36   
 37  The module is also used by the master daemon. 
 38   
 39  """ 
 40   
 41  import socket 
 42  import collections 
 43  import time 
 44  import errno 
 45  import logging 
 46   
 47  from ganeti import serializer 
 48  from ganeti import constants 
 49  from ganeti import errors 
 50  from ganeti import utils 
 51  from ganeti import objects 
 52  from ganeti import pathutils 
 53   
 54   
 55  KEY_METHOD = constants.LUXI_KEY_METHOD 
 56  KEY_ARGS = constants.LUXI_KEY_ARGS 
 57  KEY_SUCCESS = constants.LUXI_KEY_SUCCESS 
 58  KEY_RESULT = constants.LUXI_KEY_RESULT 
 59  KEY_VERSION = constants.LUXI_KEY_VERSION 
 60   
 61  REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB 
 62  REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE 
 63  REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS 
 64  REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE 
 65  REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB 
 66  REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB 
 67  REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY 
 68  REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS 
 69  REQ_QUERY = constants.LUXI_REQ_QUERY 
 70  REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS 
 71  REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS 
 72  REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES 
 73  REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES 
 74  REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS 
 75  REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS 
 76  REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS 
 77  REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES 
 78  REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO 
 79  REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS 
 80  REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG 
 81  REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE 
 82  REQ_ALL = constants.LUXI_REQ_ALL 
 83   
 84  DEF_CTMO = constants.LUXI_DEF_CTMO 
 85  DEF_RWTO = constants.LUXI_DEF_RWTO 
 86  WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT 
87 88 89 -class ProtocolError(errors.LuxiError):
90 """Denotes an error in the LUXI protocol."""
91
92 93 -class ConnectionClosedError(ProtocolError):
94 """Connection closed error."""
95
96 97 -class TimeoutError(ProtocolError):
98 """Operation timeout error."""
99
100 101 -class RequestError(ProtocolError):
102 """Error on request. 103 104 This signifies an error in the request format or request handling, 105 but not (e.g.) an error in starting up an instance. 106 107 Some common conditions that can trigger this exception: 108 - job submission failed because the job data was wrong 109 - query failed because required fields were missing 110 111 """
112
113 114 -class NoMasterError(ProtocolError):
115 """The master cannot be reached. 116 117 This means that the master daemon is not running or the socket has 118 been removed. 119 120 """
121
122 123 -class PermissionError(ProtocolError):
124 """Permission denied while connecting to the master socket. 125 126 This means the user doesn't have the proper rights. 127 128 """
129
130 131 -class Transport:
132 """Low-level transport class. 133 134 This is used on the client side. 135 136 This could be replace by any other class that provides the same 137 semantics to the Client. This means: 138 - can send messages and receive messages 139 - safe for multithreading 140 141 """ 142
143 - def __init__(self, address, timeouts=None):
144 """Constructor for the Client class. 145 146 Arguments: 147 - address: a valid address the the used transport class 148 - timeout: a list of timeouts, to be used on connect and read/write 149 150 There are two timeouts used since we might want to wait for a long 151 time for a response, but the connect timeout should be lower. 152 153 If not passed, we use a default of 10 and respectively 60 seconds. 154 155 Note that on reading data, since the timeout applies to an 156 invidual receive, it might be that the total duration is longer 157 than timeout value passed (we make a hard limit at twice the read 158 timeout). 159 160 """ 161 self.address = address 162 if timeouts is None: 163 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 164 else: 165 self._ctimeout, self._rwtimeout = timeouts 166 167 self.socket = None 168 self._buffer = "" 169 self._msgs = collections.deque() 170 171 try: 172 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 173 174 # Try to connect 175 try: 176 utils.Retry(self._Connect, 1.0, self._ctimeout, 177 args=(self.socket, address, self._ctimeout)) 178 except utils.RetryTimeout: 179 raise TimeoutError("Connect timed out") 180 181 self.socket.settimeout(self._rwtimeout) 182 except (socket.error, NoMasterError): 183 if self.socket is not None: 184 self.socket.close() 185 self.socket = None 186 raise
187 188 @staticmethod
189 - def _Connect(sock, address, timeout):
190 sock.settimeout(timeout) 191 try: 192 sock.connect(address) 193 except socket.timeout, err: 194 raise TimeoutError("Connect timed out: %s" % str(err)) 195 except socket.error, err: 196 error_code = err.args[0] 197 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 198 raise NoMasterError(address) 199 elif error_code in (errno.EPERM, errno.EACCES): 200 raise PermissionError(address) 201 elif error_code == errno.EAGAIN: 202 # Server's socket backlog is full at the moment 203 raise utils.RetryAgain() 204 raise
205
206 - def _CheckSocket(self):
207 """Make sure we are connected. 208 209 """ 210 if self.socket is None: 211 raise ProtocolError("Connection is closed")
212
213 - def Send(self, msg):
214 """Send a message. 215 216 This just sends a message and doesn't wait for the response. 217 218 """ 219 if constants.LUXI_EOM in msg: 220 raise ProtocolError("Message terminator found in payload") 221 222 self._CheckSocket() 223 try: 224 # TODO: sendall is not guaranteed to send everything 225 self.socket.sendall(msg + constants.LUXI_EOM) 226 except socket.timeout, err: 227 raise TimeoutError("Sending timeout: %s" % str(err))
228
229 - def Recv(self):
230 """Try to receive a message from the socket. 231 232 In case we already have messages queued, we just return from the 233 queue. Otherwise, we try to read data with a _rwtimeout network 234 timeout, and making sure we don't go over 2x_rwtimeout as a global 235 limit. 236 237 """ 238 self._CheckSocket() 239 etime = time.time() + self._rwtimeout 240 while not self._msgs: 241 if time.time() > etime: 242 raise TimeoutError("Extended receive timeout") 243 while True: 244 try: 245 data = self.socket.recv(4096) 246 except socket.timeout, err: 247 raise TimeoutError("Receive timeout: %s" % str(err)) 248 except socket.error, err: 249 if err.args and err.args[0] == errno.EAGAIN: 250 continue 251 raise 252 break 253 if not data: 254 raise ConnectionClosedError("Connection closed while reading") 255 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 256 self._buffer = new_msgs.pop() 257 self._msgs.extend(new_msgs) 258 return self._msgs.popleft()
259
260 - def Call(self, msg):
261 """Send a message and wait for the response. 262 263 This is just a wrapper over Send and Recv. 264 265 """ 266 self.Send(msg) 267 return self.Recv()
268
269 - def Close(self):
270 """Close the socket""" 271 if self.socket is not None: 272 self.socket.close() 273 self.socket = None
274
275 276 -def ParseRequest(msg):
277 """Parses a LUXI request message. 278 279 """ 280 try: 281 request = serializer.LoadJson(msg) 282 except ValueError, err: 283 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) 284 285 logging.debug("LUXI request: %s", request) 286 287 if not isinstance(request, dict): 288 logging.error("LUXI request not a dict: %r", msg) 289 raise ProtocolError("Invalid LUXI request (not a dict)") 290 291 method = request.get(KEY_METHOD, None) # pylint: disable=E1103 292 args = request.get(KEY_ARGS, None) # pylint: disable=E1103 293 version = request.get(KEY_VERSION, None) # pylint: disable=E1103 294 295 if method is None or args is None: 296 logging.error("LUXI request missing method or arguments: %r", msg) 297 raise ProtocolError(("Invalid LUXI request (no method or arguments" 298 " in request): %r") % msg) 299 300 return (method, args, version)
301
302 303 -def ParseResponse(msg):
304 """Parses a LUXI response message. 305 306 """ 307 # Parse the result 308 try: 309 data = serializer.LoadJson(msg) 310 except KeyboardInterrupt: 311 raise 312 except Exception, err: 313 raise ProtocolError("Error while deserializing response: %s" % str(err)) 314 315 # Validate response 316 if not (isinstance(data, dict) and 317 KEY_SUCCESS in data and 318 KEY_RESULT in data): 319 raise ProtocolError("Invalid response from server: %r" % data) 320 321 return (data[KEY_SUCCESS], data[KEY_RESULT], 322 data.get(KEY_VERSION, None)) # pylint: disable=E1103
323
324 325 -def FormatResponse(success, result, version=None):
326 """Formats a LUXI response message. 327 328 """ 329 response = { 330 KEY_SUCCESS: success, 331 KEY_RESULT: result, 332 } 333 334 if version is not None: 335 response[KEY_VERSION] = version 336 337 logging.debug("LUXI response: %s", response) 338 339 return serializer.DumpJson(response)
340
341 342 -def FormatRequest(method, args, version=None):
343 """Formats a LUXI request message. 344 345 """ 346 # Build request 347 request = { 348 KEY_METHOD: method, 349 KEY_ARGS: args, 350 } 351 352 if version is not None: 353 request[KEY_VERSION] = version 354 355 # Serialize the request 356 return serializer.DumpJson(request)
357
358 359 -def CallLuxiMethod(transport_cb, method, args, version=None):
360 """Send a LUXI request via a transport and return the response. 361 362 """ 363 assert callable(transport_cb) 364 365 request_msg = FormatRequest(method, args, version=version) 366 367 # Send request and wait for response 368 response_msg = transport_cb(request_msg) 369 370 (success, result, resp_version) = ParseResponse(response_msg) 371 372 # Verify version if there was one in the response 373 if resp_version is not None and resp_version != version: 374 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" % 375 (version, resp_version)) 376 377 if success: 378 return result 379 380 errors.MaybeRaise(result) 381 raise RequestError(result)
382
383 384 -class Client(object):
385 """High-level client implementation. 386 387 This uses a backing Transport-like class on top of which it 388 implements data serialization/deserialization. 389 390 """
391 - def __init__(self, address=None, timeouts=None, transport=Transport):
392 """Constructor for the Client class. 393 394 Arguments: 395 - address: a valid address the the used transport class 396 - timeout: a list of timeouts, to be used on connect and read/write 397 - transport: a Transport-like class 398 399 400 If timeout is not passed, the default timeouts of the transport 401 class are used. 402 403 """ 404 if address is None: 405 address = pathutils.MASTER_SOCKET 406 self.address = address 407 self.timeouts = timeouts 408 self.transport_class = transport 409 self.transport = None 410 self._InitTransport()
411
412 - def _InitTransport(self):
413 """(Re)initialize the transport if needed. 414 415 """ 416 if self.transport is None: 417 self.transport = self.transport_class(self.address, 418 timeouts=self.timeouts)
419
420 - def _CloseTransport(self):
421 """Close the transport, ignoring errors. 422 423 """ 424 if self.transport is None: 425 return 426 try: 427 old_transp = self.transport 428 self.transport = None 429 old_transp.Close() 430 except Exception: # pylint: disable=W0703 431 pass
432
433 - def _SendMethodCall(self, data):
434 # Send request and wait for response 435 try: 436 self._InitTransport() 437 return self.transport.Call(data) 438 except Exception: 439 self._CloseTransport() 440 raise
441
442 - def Close(self):
443 """Close the underlying connection. 444 445 """ 446 self._CloseTransport()
447
448 - def CallMethod(self, method, args):
449 """Send a generic request and return the response. 450 451 """ 452 if not isinstance(args, (list, tuple)): 453 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:" 454 " expected list, got %s" % type(args)) 455 return CallLuxiMethod(self._SendMethodCall, method, args, 456 version=constants.LUXI_VERSION)
457
458 - def SetQueueDrainFlag(self, drain_flag):
459 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
460
461 - def SetWatcherPause(self, until):
462 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
463
464 - def SubmitJob(self, ops):
465 ops_state = map(lambda op: op.__getstate__(), ops) 466 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
467
468 - def SubmitJobToDrainedQueue(self, ops):
469 ops_state = map(lambda op: op.__getstate__(), ops) 470 return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
471
472 - def SubmitManyJobs(self, jobs):
473 jobs_state = [] 474 for ops in jobs: 475 jobs_state.append([op.__getstate__() for op in ops]) 476 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
477 478 @staticmethod
479 - def _PrepareJobId(request_name, job_id):
480 try: 481 return int(job_id) 482 except ValueError: 483 raise RequestError("Invalid parameter passed to %s as job id: " 484 " expected integer, got value %s" % 485 (request_name, job_id))
486
487 - def CancelJob(self, job_id):
488 job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id) 489 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
490
491 - def ArchiveJob(self, job_id):
492 job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id) 493 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
494
495 - def ChangeJobPriority(self, job_id, priority):
496 job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id) 497 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
498
499 - def AutoArchiveJobs(self, age):
500 timeout = (DEF_RWTO - 1) / 2 501 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
502
503 - def WaitForJobChangeOnce(self, job_id, fields, 504 prev_job_info, prev_log_serial, 505 timeout=WFJC_TIMEOUT):
506 """Waits for changes on a job. 507 508 @param job_id: Job ID 509 @type fields: list 510 @param fields: List of field names to be observed 511 @type prev_job_info: None or list 512 @param prev_job_info: Previously received job information 513 @type prev_log_serial: None or int/long 514 @param prev_log_serial: Highest log serial number previously received 515 @type timeout: int/float 516 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 517 be capped to that value) 518 519 """ 520 assert timeout >= 0, "Timeout can not be negative" 521 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 522 (job_id, fields, prev_job_info, 523 prev_log_serial, 524 min(WFJC_TIMEOUT, timeout)))
525
526 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
527 job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id) 528 while True: 529 result = self.WaitForJobChangeOnce(job_id, fields, 530 prev_job_info, prev_log_serial) 531 if result != constants.JOB_NOTCHANGED: 532 break 533 return result
534
535 - def Query(self, what, fields, qfilter):
536 """Query for resources/items. 537 538 @param what: One of L{constants.QR_VIA_LUXI} 539 @type fields: List of strings 540 @param fields: List of requested fields 541 @type qfilter: None or list 542 @param qfilter: Query filter 543 @rtype: L{objects.QueryResponse} 544 545 """ 546 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter)) 547 return objects.QueryResponse.FromDict(result)
548
549 - def QueryFields(self, what, fields):
550 """Query for available fields. 551 552 @param what: One of L{constants.QR_VIA_LUXI} 553 @type fields: None or list of strings 554 @param fields: List of requested fields 555 @rtype: L{objects.QueryFieldsResponse} 556 557 """ 558 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields)) 559 return objects.QueryFieldsResponse.FromDict(result)
560
561 - def QueryJobs(self, job_ids, fields):
562 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
563
564 - def QueryInstances(self, names, fields, use_locking):
565 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
566
567 - def QueryNodes(self, names, fields, use_locking):
568 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
569
570 - def QueryGroups(self, names, fields, use_locking):
571 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
572
573 - def QueryNetworks(self, names, fields, use_locking):
574 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
575
576 - def QueryExports(self, nodes, use_locking):
577 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
578
579 - def QueryClusterInfo(self):
580 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
581
582 - def QueryConfigValues(self, fields):
583 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
584
585 - def QueryTags(self, kind, name):
586 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
587