Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012, 2014 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module for the unix socket protocol 
 23   
 24  This module implements the local unix socket protocol. You only need 
 25  this module and the opcodes module in the client program in order to 
 26  communicate with the master. 
 27   
 28  The module is also used by the master daemon. 
 29   
 30  """ 
 31   
 32  import socket 
 33  import collections 
 34  import time 
 35  import errno 
 36  import logging 
 37   
 38  from ganeti import compat 
 39  from ganeti import serializer 
 40  from ganeti import constants 
 41  from ganeti import errors 
 42  from ganeti import utils 
 43  from ganeti import objects 
 44  from ganeti import pathutils 
 45   
 46   
 47  KEY_METHOD = "method" 
 48  KEY_ARGS = "args" 
 49  KEY_SUCCESS = "success" 
 50  KEY_RESULT = "result" 
 51  KEY_VERSION = "version" 
 52   
 53  REQ_SUBMIT_JOB = "SubmitJob" 
 54  REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs" 
 55  REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" 
 56  REQ_CANCEL_JOB = "CancelJob" 
 57  REQ_ARCHIVE_JOB = "ArchiveJob" 
 58  REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority" 
 59  REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs" 
 60  REQ_QUERY = "Query" 
 61  REQ_QUERY_FIELDS = "QueryFields" 
 62  REQ_QUERY_JOBS = "QueryJobs" 
 63  REQ_QUERY_INSTANCES = "QueryInstances" 
 64  REQ_QUERY_NODES = "QueryNodes" 
 65  REQ_QUERY_GROUPS = "QueryGroups" 
 66  REQ_QUERY_NETWORKS = "QueryNetworks" 
 67  REQ_QUERY_EXPORTS = "QueryExports" 
 68  REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" 
 69  REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" 
 70  REQ_QUERY_TAGS = "QueryTags" 
 71  REQ_SET_DRAIN_FLAG = "SetDrainFlag" 
 72  REQ_SET_WATCHER_PAUSE = "SetWatcherPause" 
 73   
 74  #: List of all LUXI requests 
 75  REQ_ALL = compat.UniqueFrozenset([ 
 76    REQ_ARCHIVE_JOB, 
 77    REQ_AUTO_ARCHIVE_JOBS, 
 78    REQ_CANCEL_JOB, 
 79    REQ_CHANGE_JOB_PRIORITY, 
 80    REQ_QUERY, 
 81    REQ_QUERY_CLUSTER_INFO, 
 82    REQ_QUERY_CONFIG_VALUES, 
 83    REQ_QUERY_EXPORTS, 
 84    REQ_QUERY_FIELDS, 
 85    REQ_QUERY_GROUPS, 
 86    REQ_QUERY_INSTANCES, 
 87    REQ_QUERY_JOBS, 
 88    REQ_QUERY_NODES, 
 89    REQ_QUERY_NETWORKS, 
 90    REQ_QUERY_TAGS, 
 91    REQ_SET_DRAIN_FLAG, 
 92    REQ_SET_WATCHER_PAUSE, 
 93    REQ_SUBMIT_JOB, 
 94    REQ_SUBMIT_MANY_JOBS, 
 95    REQ_WAIT_FOR_JOB_CHANGE, 
 96    ]) 
 97   
 98  DEF_CTMO = 10 
 99  DEF_RWTO = 60 
100   
101  # WaitForJobChange timeout 
102  WFJC_TIMEOUT = (DEF_RWTO - 1) / 2 
103 104 105 -class ProtocolError(errors.LuxiError):
106 """Denotes an error in the LUXI protocol."""
107
108 109 -class ConnectionClosedError(ProtocolError):
110 """Connection closed error."""
111
112 113 -class TimeoutError(ProtocolError):
114 """Operation timeout error."""
115
116 117 -class RequestError(ProtocolError):
118 """Error on request. 119 120 This signifies an error in the request format or request handling, 121 but not (e.g.) an error in starting up an instance. 122 123 Some common conditions that can trigger this exception: 124 - job submission failed because the job data was wrong 125 - query failed because required fields were missing 126 127 """
128
129 130 -class NoMasterError(ProtocolError):
131 """The master cannot be reached. 132 133 This means that the master daemon is not running or the socket has 134 been removed. 135 136 """
137
138 139 -class PermissionError(ProtocolError):
140 """Permission denied while connecting to the master socket. 141 142 This means the user doesn't have the proper rights. 143 144 """
145
146 147 -class Transport:
148 """Low-level transport class. 149 150 This is used on the client side. 151 152 This could be replace by any other class that provides the same 153 semantics to the Client. This means: 154 - can send messages and receive messages 155 - safe for multithreading 156 157 """ 158
159 - def __init__(self, address, timeouts=None):
160 """Constructor for the Client class. 161 162 Arguments: 163 - address: a valid address the the used transport class 164 - timeout: a list of timeouts, to be used on connect and read/write 165 166 There are two timeouts used since we might want to wait for a long 167 time for a response, but the connect timeout should be lower. 168 169 If not passed, we use a default of 10 and respectively 60 seconds. 170 171 Note that on reading data, since the timeout applies to an 172 invidual receive, it might be that the total duration is longer 173 than timeout value passed (we make a hard limit at twice the read 174 timeout). 175 176 """ 177 self.address = address 178 if timeouts is None: 179 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 180 else: 181 self._ctimeout, self._rwtimeout = timeouts 182 183 self.socket = None 184 self._buffer = "" 185 self._msgs = collections.deque() 186 187 try: 188 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 189 190 # Try to connect 191 try: 192 utils.Retry(self._Connect, 1.0, self._ctimeout, 193 args=(self.socket, address, self._ctimeout)) 194 except utils.RetryTimeout: 195 raise TimeoutError("Connect timed out") 196 197 self.socket.settimeout(self._rwtimeout) 198 except (socket.error, NoMasterError): 199 if self.socket is not None: 200 self.socket.close() 201 self.socket = None 202 raise
203 204 @staticmethod
205 - def _Connect(sock, address, timeout):
206 sock.settimeout(timeout) 207 try: 208 sock.connect(address) 209 except socket.timeout, err: 210 raise TimeoutError("Connect timed out: %s" % str(err)) 211 except socket.error, err: 212 error_code = err.args[0] 213 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 214 raise NoMasterError(address) 215 elif error_code in (errno.EPERM, errno.EACCES): 216 raise PermissionError(address) 217 elif error_code == errno.EAGAIN: 218 # Server's socket backlog is full at the moment 219 raise utils.RetryAgain() 220 raise
221
222 - def _CheckSocket(self):
223 """Make sure we are connected. 224 225 """ 226 if self.socket is None: 227 raise ProtocolError("Connection is closed")
228
229 - def Send(self, msg):
230 """Send a message. 231 232 This just sends a message and doesn't wait for the response. 233 234 """ 235 if constants.LUXI_EOM in msg: 236 raise ProtocolError("Message terminator found in payload") 237 238 self._CheckSocket() 239 try: 240 # TODO: sendall is not guaranteed to send everything 241 self.socket.sendall(msg + constants.LUXI_EOM) 242 except socket.timeout, err: 243 raise TimeoutError("Sending timeout: %s" % str(err))
244
245 - def Recv(self):
246 """Try to receive a message from the socket. 247 248 In case we already have messages queued, we just return from the 249 queue. Otherwise, we try to read data with a _rwtimeout network 250 timeout, and making sure we don't go over 2x_rwtimeout as a global 251 limit. 252 253 """ 254 self._CheckSocket() 255 etime = time.time() + self._rwtimeout 256 while not self._msgs: 257 if time.time() > etime: 258 raise TimeoutError("Extended receive timeout") 259 while True: 260 try: 261 data = self.socket.recv(4096) 262 except socket.timeout, err: 263 raise TimeoutError("Receive timeout: %s" % str(err)) 264 except socket.error, err: 265 if err.args and err.args[0] == errno.EAGAIN: 266 continue 267 raise 268 break 269 if not data: 270 raise ConnectionClosedError("Connection closed while reading") 271 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 272 self._buffer = new_msgs.pop() 273 self._msgs.extend(new_msgs) 274 return self._msgs.popleft()
275
276 - def Call(self, msg):
277 """Send a message and wait for the response. 278 279 This is just a wrapper over Send and Recv. 280 281 """ 282 self.Send(msg) 283 return self.Recv()
284
285 - def Close(self):
286 """Close the socket""" 287 if self.socket is not None: 288 self.socket.close() 289 self.socket = None
290
291 292 -def ParseRequest(msg):
293 """Parses a LUXI request message. 294 295 """ 296 try: 297 request = serializer.LoadJson(msg) 298 except ValueError, err: 299 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) 300 301 logging.debug("LUXI request: %s", request) 302 303 if not isinstance(request, dict): 304 logging.error("LUXI request not a dict: %r", msg) 305 raise ProtocolError("Invalid LUXI request (not a dict)") 306 307 method = request.get(KEY_METHOD, None) # pylint: disable=E1103 308 args = request.get(KEY_ARGS, None) # pylint: disable=E1103 309 version = request.get(KEY_VERSION, None) # pylint: disable=E1103 310 311 if method is None or args is None: 312 logging.error("LUXI request missing method or arguments: %r", msg) 313 raise ProtocolError(("Invalid LUXI request (no method or arguments" 314 " in request): %r") % msg) 315 316 return (method, args, version)
317
318 319 -def ParseResponse(msg):
320 """Parses a LUXI response message. 321 322 """ 323 # Parse the result 324 try: 325 data = serializer.LoadJson(msg) 326 except KeyboardInterrupt: 327 raise 328 except Exception, err: 329 raise ProtocolError("Error while deserializing response: %s" % str(err)) 330 331 # Validate response 332 if not (isinstance(data, dict) and 333 KEY_SUCCESS in data and 334 KEY_RESULT in data): 335 raise ProtocolError("Invalid response from server: %r" % data) 336 337 return (data[KEY_SUCCESS], data[KEY_RESULT], 338 data.get(KEY_VERSION, None)) # pylint: disable=E1103
339
340 341 -def FormatResponse(success, result, version=None):
342 """Formats a LUXI response message. 343 344 """ 345 response = { 346 KEY_SUCCESS: success, 347 KEY_RESULT: result, 348 } 349 350 if version is not None: 351 response[KEY_VERSION] = version 352 353 logging.debug("LUXI response: %s", response) 354 355 return serializer.DumpJson(response)
356
357 358 -def FormatRequest(method, args, version=None):
359 """Formats a LUXI request message. 360 361 """ 362 # Build request 363 request = { 364 KEY_METHOD: method, 365 KEY_ARGS: args, 366 } 367 368 if version is not None: 369 request[KEY_VERSION] = version 370 371 # Serialize the request 372 return serializer.DumpJson(request)
373
374 375 -def CallLuxiMethod(transport_cb, method, args, version=None):
376 """Send a LUXI request via a transport and return the response. 377 378 """ 379 assert callable(transport_cb) 380 381 request_msg = FormatRequest(method, args, version=version) 382 383 # Send request and wait for response 384 response_msg = transport_cb(request_msg) 385 386 (success, result, resp_version) = ParseResponse(response_msg) 387 388 # Verify version if there was one in the response 389 if resp_version is not None and resp_version != version: 390 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" % 391 (version, resp_version)) 392 393 if success: 394 return result 395 396 errors.MaybeRaise(result) 397 raise RequestError(result)
398
399 400 -class Client(object):
401 """High-level client implementation. 402 403 This uses a backing Transport-like class on top of which it 404 implements data serialization/deserialization. 405 406 """
407 - def __init__(self, address=None, timeouts=None, transport=Transport):
408 """Constructor for the Client class. 409 410 Arguments: 411 - address: a valid address the the used transport class 412 - timeout: a list of timeouts, to be used on connect and read/write 413 - transport: a Transport-like class 414 415 416 If timeout is not passed, the default timeouts of the transport 417 class are used. 418 419 """ 420 if address is None: 421 address = pathutils.MASTER_SOCKET 422 self.address = address 423 self.timeouts = timeouts 424 self.transport_class = transport 425 self.transport = None 426 self._InitTransport()
427
428 - def _InitTransport(self):
429 """(Re)initialize the transport if needed. 430 431 """ 432 if self.transport is None: 433 self.transport = self.transport_class(self.address, 434 timeouts=self.timeouts)
435
436 - def _CloseTransport(self):
437 """Close the transport, ignoring errors. 438 439 """ 440 if self.transport is None: 441 return 442 try: 443 old_transp = self.transport 444 self.transport = None 445 old_transp.Close() 446 except Exception: # pylint: disable=W0703 447 pass
448
449 - def _SendMethodCall(self, data):
450 # Send request and wait for response 451 try: 452 self._InitTransport() 453 return self.transport.Call(data) 454 except Exception: 455 self._CloseTransport() 456 raise
457
458 - def Close(self):
459 """Close the underlying connection. 460 461 """ 462 self._CloseTransport()
463
464 - def CallMethod(self, method, args):
465 """Send a generic request and return the response. 466 467 """ 468 if not isinstance(args, (list, tuple)): 469 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:" 470 " expected list, got %s" % type(args)) 471 return CallLuxiMethod(self._SendMethodCall, method, args, 472 version=constants.LUXI_VERSION)
473
474 - def SetQueueDrainFlag(self, drain_flag):
475 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
476
477 - def SetWatcherPause(self, until):
478 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
479
480 - def SubmitJob(self, ops):
481 ops_state = map(lambda op: op.__getstate__(), ops) 482 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
483
484 - def SubmitManyJobs(self, jobs):
485 jobs_state = [] 486 for ops in jobs: 487 jobs_state.append([op.__getstate__() for op in ops]) 488 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
489 490 @staticmethod
491 - def _PrepareJobId(request_name, job_id):
492 try: 493 return int(job_id) 494 except ValueError: 495 raise RequestError("Invalid parameter passed to %s as job id: " 496 " expected integer, got value %s" % 497 (request_name, job_id))
498
499 - def CancelJob(self, job_id):
500 job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id) 501 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
502
503 - def ArchiveJob(self, job_id):
504 job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id) 505 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
506
507 - def ChangeJobPriority(self, job_id, priority):
508 job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id) 509 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
510
511 - def AutoArchiveJobs(self, age):
512 timeout = (DEF_RWTO - 1) / 2 513 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
514
515 - def WaitForJobChangeOnce(self, job_id, fields, 516 prev_job_info, prev_log_serial, 517 timeout=WFJC_TIMEOUT):
518 """Waits for changes on a job. 519 520 @param job_id: Job ID 521 @type fields: list 522 @param fields: List of field names to be observed 523 @type prev_job_info: None or list 524 @param prev_job_info: Previously received job information 525 @type prev_log_serial: None or int/long 526 @param prev_log_serial: Highest log serial number previously received 527 @type timeout: int/float 528 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 529 be capped to that value) 530 531 """ 532 assert timeout >= 0, "Timeout can not be negative" 533 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 534 (job_id, fields, prev_job_info, 535 prev_log_serial, 536 min(WFJC_TIMEOUT, timeout)))
537
538 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
539 job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id) 540 while True: 541 result = self.WaitForJobChangeOnce(job_id, fields, 542 prev_job_info, prev_log_serial) 543 if result != constants.JOB_NOTCHANGED: 544 break 545 return result
546
547 - def Query(self, what, fields, qfilter):
548 """Query for resources/items. 549 550 @param what: One of L{constants.QR_VIA_LUXI} 551 @type fields: List of strings 552 @param fields: List of requested fields 553 @type qfilter: None or list 554 @param qfilter: Query filter 555 @rtype: L{objects.QueryResponse} 556 557 """ 558 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter)) 559 return objects.QueryResponse.FromDict(result)
560
561 - def QueryFields(self, what, fields):
562 """Query for available fields. 563 564 @param what: One of L{constants.QR_VIA_LUXI} 565 @type fields: None or list of strings 566 @param fields: List of requested fields 567 @rtype: L{objects.QueryFieldsResponse} 568 569 """ 570 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields)) 571 return objects.QueryFieldsResponse.FromDict(result)
572
573 - def QueryJobs(self, job_ids, fields):
574 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
575
576 - def QueryInstances(self, names, fields, use_locking):
577 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
578
579 - def QueryNodes(self, names, fields, use_locking):
580 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
581
582 - def QueryGroups(self, names, fields, use_locking):
583 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
584
585 - def QueryNetworks(self, names, fields, use_locking):
586 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
587
588 - def QueryExports(self, nodes, use_locking):
589 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
590
591 - def QueryClusterInfo(self):
592 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
593
594 - def QueryConfigValues(self, fields):
595 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
596
597 - def QueryTags(self, kind, name):
598 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
599