Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module for the unix socket protocol 
 23   
 24  This module implements the local unix socket protocol. You only need 
 25  this module and the opcodes module in the client program in order to 
 26  communicate with the master. 
 27   
 28  The module is also used by the master daemon. 
 29   
 30  """ 
 31   
 32  import socket 
 33  import collections 
 34  import time 
 35  import errno 
 36  import logging 
 37   
 38  from ganeti import compat 
 39  from ganeti import serializer 
 40  from ganeti import constants 
 41  from ganeti import errors 
 42  from ganeti import utils 
 43  from ganeti import objects 
 44  from ganeti import pathutils 
 45   
 46   
 47  KEY_METHOD = "method" 
 48  KEY_ARGS = "args" 
 49  KEY_SUCCESS = "success" 
 50  KEY_RESULT = "result" 
 51  KEY_VERSION = "version" 
 52   
 53  REQ_SUBMIT_JOB = "SubmitJob" 
 54  REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs" 
 55  REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" 
 56  REQ_CANCEL_JOB = "CancelJob" 
 57  REQ_ARCHIVE_JOB = "ArchiveJob" 
 58  REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority" 
 59  REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs" 
 60  REQ_QUERY = "Query" 
 61  REQ_QUERY_FIELDS = "QueryFields" 
 62  REQ_QUERY_JOBS = "QueryJobs" 
 63  REQ_QUERY_INSTANCES = "QueryInstances" 
 64  REQ_QUERY_NODES = "QueryNodes" 
 65  REQ_QUERY_GROUPS = "QueryGroups" 
 66  REQ_QUERY_NETWORKS = "QueryNetworks" 
 67  REQ_QUERY_EXPORTS = "QueryExports" 
 68  REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" 
 69  REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" 
 70  REQ_QUERY_TAGS = "QueryTags" 
 71  REQ_SET_DRAIN_FLAG = "SetDrainFlag" 
 72  REQ_SET_WATCHER_PAUSE = "SetWatcherPause" 
 73   
 74  #: List of all LUXI requests 
 75  REQ_ALL = compat.UniqueFrozenset([ 
 76    REQ_ARCHIVE_JOB, 
 77    REQ_AUTO_ARCHIVE_JOBS, 
 78    REQ_CANCEL_JOB, 
 79    REQ_CHANGE_JOB_PRIORITY, 
 80    REQ_QUERY, 
 81    REQ_QUERY_CLUSTER_INFO, 
 82    REQ_QUERY_CONFIG_VALUES, 
 83    REQ_QUERY_EXPORTS, 
 84    REQ_QUERY_FIELDS, 
 85    REQ_QUERY_GROUPS, 
 86    REQ_QUERY_INSTANCES, 
 87    REQ_QUERY_JOBS, 
 88    REQ_QUERY_NODES, 
 89    REQ_QUERY_TAGS, 
 90    REQ_SET_DRAIN_FLAG, 
 91    REQ_SET_WATCHER_PAUSE, 
 92    REQ_SUBMIT_JOB, 
 93    REQ_SUBMIT_MANY_JOBS, 
 94    REQ_WAIT_FOR_JOB_CHANGE, 
 95    ]) 
 96   
 97  DEF_CTMO = 10 
 98  DEF_RWTO = 60 
 99   
100  # WaitForJobChange timeout 
101  WFJC_TIMEOUT = (DEF_RWTO - 1) / 2 
102 103 104 -class ProtocolError(errors.LuxiError):
105 """Denotes an error in the LUXI protocol."""
106
107 108 -class ConnectionClosedError(ProtocolError):
109 """Connection closed error."""
110
111 112 -class TimeoutError(ProtocolError):
113 """Operation timeout error."""
114
115 116 -class RequestError(ProtocolError):
117 """Error on request. 118 119 This signifies an error in the request format or request handling, 120 but not (e.g.) an error in starting up an instance. 121 122 Some common conditions that can trigger this exception: 123 - job submission failed because the job data was wrong 124 - query failed because required fields were missing 125 126 """
127
128 129 -class NoMasterError(ProtocolError):
130 """The master cannot be reached. 131 132 This means that the master daemon is not running or the socket has 133 been removed. 134 135 """
136
137 138 -class PermissionError(ProtocolError):
139 """Permission denied while connecting to the master socket. 140 141 This means the user doesn't have the proper rights. 142 143 """
144
145 146 -class Transport:
147 """Low-level transport class. 148 149 This is used on the client side. 150 151 This could be replace by any other class that provides the same 152 semantics to the Client. This means: 153 - can send messages and receive messages 154 - safe for multithreading 155 156 """ 157
158 - def __init__(self, address, timeouts=None):
159 """Constructor for the Client class. 160 161 Arguments: 162 - address: a valid address the the used transport class 163 - timeout: a list of timeouts, to be used on connect and read/write 164 165 There are two timeouts used since we might want to wait for a long 166 time for a response, but the connect timeout should be lower. 167 168 If not passed, we use a default of 10 and respectively 60 seconds. 169 170 Note that on reading data, since the timeout applies to an 171 invidual receive, it might be that the total duration is longer 172 than timeout value passed (we make a hard limit at twice the read 173 timeout). 174 175 """ 176 self.address = address 177 if timeouts is None: 178 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO 179 else: 180 self._ctimeout, self._rwtimeout = timeouts 181 182 self.socket = None 183 self._buffer = "" 184 self._msgs = collections.deque() 185 186 try: 187 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 188 189 # Try to connect 190 try: 191 utils.Retry(self._Connect, 1.0, self._ctimeout, 192 args=(self.socket, address, self._ctimeout)) 193 except utils.RetryTimeout: 194 raise TimeoutError("Connect timed out") 195 196 self.socket.settimeout(self._rwtimeout) 197 except (socket.error, NoMasterError): 198 if self.socket is not None: 199 self.socket.close() 200 self.socket = None 201 raise
202 203 @staticmethod
204 - def _Connect(sock, address, timeout):
205 sock.settimeout(timeout) 206 try: 207 sock.connect(address) 208 except socket.timeout, err: 209 raise TimeoutError("Connect timed out: %s" % str(err)) 210 except socket.error, err: 211 error_code = err.args[0] 212 if error_code in (errno.ENOENT, errno.ECONNREFUSED): 213 raise NoMasterError(address) 214 elif error_code in (errno.EPERM, errno.EACCES): 215 raise PermissionError(address) 216 elif error_code == errno.EAGAIN: 217 # Server's socket backlog is full at the moment 218 raise utils.RetryAgain() 219 raise
220
221 - def _CheckSocket(self):
222 """Make sure we are connected. 223 224 """ 225 if self.socket is None: 226 raise ProtocolError("Connection is closed")
227
228 - def Send(self, msg):
229 """Send a message. 230 231 This just sends a message and doesn't wait for the response. 232 233 """ 234 if constants.LUXI_EOM in msg: 235 raise ProtocolError("Message terminator found in payload") 236 237 self._CheckSocket() 238 try: 239 # TODO: sendall is not guaranteed to send everything 240 self.socket.sendall(msg + constants.LUXI_EOM) 241 except socket.timeout, err: 242 raise TimeoutError("Sending timeout: %s" % str(err))
243
244 - def Recv(self):
245 """Try to receive a message from the socket. 246 247 In case we already have messages queued, we just return from the 248 queue. Otherwise, we try to read data with a _rwtimeout network 249 timeout, and making sure we don't go over 2x_rwtimeout as a global 250 limit. 251 252 """ 253 self._CheckSocket() 254 etime = time.time() + self._rwtimeout 255 while not self._msgs: 256 if time.time() > etime: 257 raise TimeoutError("Extended receive timeout") 258 while True: 259 try: 260 data = self.socket.recv(4096) 261 except socket.timeout, err: 262 raise TimeoutError("Receive timeout: %s" % str(err)) 263 except socket.error, err: 264 if err.args and err.args[0] == errno.EAGAIN: 265 continue 266 raise 267 break 268 if not data: 269 raise ConnectionClosedError("Connection closed while reading") 270 new_msgs = (self._buffer + data).split(constants.LUXI_EOM) 271 self._buffer = new_msgs.pop() 272 self._msgs.extend(new_msgs) 273 return self._msgs.popleft()
274
275 - def Call(self, msg):
276 """Send a message and wait for the response. 277 278 This is just a wrapper over Send and Recv. 279 280 """ 281 self.Send(msg) 282 return self.Recv()
283
284 - def Close(self):
285 """Close the socket""" 286 if self.socket is not None: 287 self.socket.close() 288 self.socket = None
289
290 291 -def ParseRequest(msg):
292 """Parses a LUXI request message. 293 294 """ 295 try: 296 request = serializer.LoadJson(msg) 297 except ValueError, err: 298 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) 299 300 logging.debug("LUXI request: %s", request) 301 302 if not isinstance(request, dict): 303 logging.error("LUXI request not a dict: %r", msg) 304 raise ProtocolError("Invalid LUXI request (not a dict)") 305 306 method = request.get(KEY_METHOD, None) # pylint: disable=E1103 307 args = request.get(KEY_ARGS, None) # pylint: disable=E1103 308 version = request.get(KEY_VERSION, None) # pylint: disable=E1103 309 310 if method is None or args is None: 311 logging.error("LUXI request missing method or arguments: %r", msg) 312 raise ProtocolError(("Invalid LUXI request (no method or arguments" 313 " in request): %r") % msg) 314 315 return (method, args, version)
316
317 318 -def ParseResponse(msg):
319 """Parses a LUXI response message. 320 321 """ 322 # Parse the result 323 try: 324 data = serializer.LoadJson(msg) 325 except KeyboardInterrupt: 326 raise 327 except Exception, err: 328 raise ProtocolError("Error while deserializing response: %s" % str(err)) 329 330 # Validate response 331 if not (isinstance(data, dict) and 332 KEY_SUCCESS in data and 333 KEY_RESULT in data): 334 raise ProtocolError("Invalid response from server: %r" % data) 335 336 return (data[KEY_SUCCESS], data[KEY_RESULT], 337 data.get(KEY_VERSION, None)) # pylint: disable=E1103
338
339 340 -def FormatResponse(success, result, version=None):
341 """Formats a LUXI response message. 342 343 """ 344 response = { 345 KEY_SUCCESS: success, 346 KEY_RESULT: result, 347 } 348 349 if version is not None: 350 response[KEY_VERSION] = version 351 352 logging.debug("LUXI response: %s", response) 353 354 return serializer.DumpJson(response)
355
356 357 -def FormatRequest(method, args, version=None):
358 """Formats a LUXI request message. 359 360 """ 361 # Build request 362 request = { 363 KEY_METHOD: method, 364 KEY_ARGS: args, 365 } 366 367 if version is not None: 368 request[KEY_VERSION] = version 369 370 # Serialize the request 371 return serializer.DumpJson(request)
372
373 374 -def CallLuxiMethod(transport_cb, method, args, version=None):
375 """Send a LUXI request via a transport and return the response. 376 377 """ 378 assert callable(transport_cb) 379 380 request_msg = FormatRequest(method, args, version=version) 381 382 # Send request and wait for response 383 response_msg = transport_cb(request_msg) 384 385 (success, result, resp_version) = ParseResponse(response_msg) 386 387 # Verify version if there was one in the response 388 if resp_version is not None and resp_version != version: 389 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" % 390 (version, resp_version)) 391 392 if success: 393 return result 394 395 errors.MaybeRaise(result) 396 raise RequestError(result)
397
398 399 -class Client(object):
400 """High-level client implementation. 401 402 This uses a backing Transport-like class on top of which it 403 implements data serialization/deserialization. 404 405 """
406 - def __init__(self, address=None, timeouts=None, transport=Transport):
407 """Constructor for the Client class. 408 409 Arguments: 410 - address: a valid address the the used transport class 411 - timeout: a list of timeouts, to be used on connect and read/write 412 - transport: a Transport-like class 413 414 415 If timeout is not passed, the default timeouts of the transport 416 class are used. 417 418 """ 419 if address is None: 420 address = pathutils.MASTER_SOCKET 421 self.address = address 422 self.timeouts = timeouts 423 self.transport_class = transport 424 self.transport = None 425 self._InitTransport()
426
427 - def _InitTransport(self):
428 """(Re)initialize the transport if needed. 429 430 """ 431 if self.transport is None: 432 self.transport = self.transport_class(self.address, 433 timeouts=self.timeouts)
434
435 - def _CloseTransport(self):
436 """Close the transport, ignoring errors. 437 438 """ 439 if self.transport is None: 440 return 441 try: 442 old_transp = self.transport 443 self.transport = None 444 old_transp.Close() 445 except Exception: # pylint: disable=W0703 446 pass
447
448 - def _SendMethodCall(self, data):
449 # Send request and wait for response 450 try: 451 self._InitTransport() 452 return self.transport.Call(data) 453 except Exception: 454 self._CloseTransport() 455 raise
456
457 - def Close(self):
458 """Close the underlying connection. 459 460 """ 461 self._CloseTransport()
462
463 - def CallMethod(self, method, args):
464 """Send a generic request and return the response. 465 466 """ 467 if not isinstance(args, (list, tuple)): 468 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:" 469 " expected list, got %s" % type(args)) 470 return CallLuxiMethod(self._SendMethodCall, method, args, 471 version=constants.LUXI_VERSION)
472
473 - def SetQueueDrainFlag(self, drain_flag):
474 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
475
476 - def SetWatcherPause(self, until):
477 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
478
479 - def SubmitJob(self, ops):
480 ops_state = map(lambda op: op.__getstate__(), ops) 481 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
482
483 - def SubmitManyJobs(self, jobs):
484 jobs_state = [] 485 for ops in jobs: 486 jobs_state.append([op.__getstate__() for op in ops]) 487 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
488
489 - def CancelJob(self, job_id):
490 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
491
492 - def ArchiveJob(self, job_id):
493 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
494
495 - def ChangeJobPriority(self, job_id, priority):
496 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
497
498 - def AutoArchiveJobs(self, age):
499 timeout = (DEF_RWTO - 1) / 2 500 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
501
502 - def WaitForJobChangeOnce(self, job_id, fields, 503 prev_job_info, prev_log_serial, 504 timeout=WFJC_TIMEOUT):
505 """Waits for changes on a job. 506 507 @param job_id: Job ID 508 @type fields: list 509 @param fields: List of field names to be observed 510 @type prev_job_info: None or list 511 @param prev_job_info: Previously received job information 512 @type prev_log_serial: None or int/long 513 @param prev_log_serial: Highest log serial number previously received 514 @type timeout: int/float 515 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 516 be capped to that value) 517 518 """ 519 assert timeout >= 0, "Timeout can not be negative" 520 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 521 (job_id, fields, prev_job_info, 522 prev_log_serial, 523 min(WFJC_TIMEOUT, timeout)))
524
525 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
526 while True: 527 result = self.WaitForJobChangeOnce(job_id, fields, 528 prev_job_info, prev_log_serial) 529 if result != constants.JOB_NOTCHANGED: 530 break 531 return result
532
533 - def Query(self, what, fields, qfilter):
534 """Query for resources/items. 535 536 @param what: One of L{constants.QR_VIA_LUXI} 537 @type fields: List of strings 538 @param fields: List of requested fields 539 @type qfilter: None or list 540 @param qfilter: Query filter 541 @rtype: L{objects.QueryResponse} 542 543 """ 544 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter)) 545 return objects.QueryResponse.FromDict(result)
546
547 - def QueryFields(self, what, fields):
548 """Query for available fields. 549 550 @param what: One of L{constants.QR_VIA_LUXI} 551 @type fields: None or list of strings 552 @param fields: List of requested fields 553 @rtype: L{objects.QueryFieldsResponse} 554 555 """ 556 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields)) 557 return objects.QueryFieldsResponse.FromDict(result)
558
559 - def QueryJobs(self, job_ids, fields):
560 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
561
562 - def QueryInstances(self, names, fields, use_locking):
563 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
564
565 - def QueryNodes(self, names, fields, use_locking):
566 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
567
568 - def QueryGroups(self, names, fields, use_locking):
569 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
570
571 - def QueryNetworks(self, names, fields, use_locking):
572 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
573
574 - def QueryExports(self, nodes, use_locking):
575 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
576
577 - def QueryClusterInfo(self):
578 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
579
580 - def QueryConfigValues(self, fields):
581 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
582
583 - def QueryTags(self, kind, name):
584 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
585