Package ganeti :: Package confd :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.confd.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2009, 2010, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Ganeti confd client 
 23   
 24  Clients can use the confd client library to send requests to a group of master 
 25  candidates running confd. The expected usage is through the asyncore framework, 
 26  by sending queries, and asynchronously receiving replies through a callback. 
 27   
 28  This way the client library doesn't ever need to "wait" on a particular answer, 
 29  and can proceed even if some udp packets are lost. It's up to the user to 
 30  reschedule queries if they haven't received responses and they need them. 
 31   
 32  Example usage:: 
 33   
 34    client = ConfdClient(...) # includes callback specification 
 35    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) 
 36    client.SendRequest(req) 
 37    # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run() 
 38    # ... wait ... 
 39    # And your callback will be called by asyncore, when your query gets a 
 40    # response, or when it expires. 
 41   
 42  You can use the provided ConfdFilterCallback to act as a filter, only passing 
 43  "newer" answer to your callback, and filtering out outdated ones, or ones 
 44  confirming what you already got. 
 45   
 46  """ 
 47   
 48  # pylint: disable=E0203 
 49   
 50  # E0203: Access to member %r before its definition, since we use 
 51  # objects.py which doesn't explicitly initialise its members 
 52   
 53  import time 
 54  import random 
 55   
 56  from ganeti import utils 
 57  from ganeti import constants 
 58  from ganeti import objects 
 59  from ganeti import serializer 
 60  from ganeti import daemon # contains AsyncUDPSocket 
 61  from ganeti import errors 
 62  from ganeti import confd 
 63  from ganeti import ssconf 
 64  from ganeti import compat 
 65  from ganeti import netutils 
 66  from ganeti import pathutils 
67 68 69 -class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
70 """Confd udp asyncore client 71 72 This is kept separate from the main ConfdClient to make sure it's easy to 73 implement a non-asyncore based client library. 74 75 """
76 - def __init__(self, client, family):
77 """Constructor for ConfdAsyncUDPClient 78 79 @type client: L{ConfdClient} 80 @param client: client library, to pass the datagrams to 81 82 """ 83 daemon.AsyncUDPSocket.__init__(self, family) 84 self.client = client
85 86 # this method is overriding a daemon.AsyncUDPSocket method
87 - def handle_datagram(self, payload, ip, port):
88 self.client.HandleResponse(payload, ip, port)
89
90 91 -class _Request(object):
92 """Request status structure. 93 94 @ivar request: the request data 95 @ivar args: any extra arguments for the callback 96 @ivar expiry: the expiry timestamp of the request 97 @ivar sent: the set of contacted peers 98 @ivar rcvd: the set of peers who replied 99 100 """
101 - def __init__(self, request, args, expiry, sent):
102 self.request = request 103 self.args = args 104 self.expiry = expiry 105 self.sent = frozenset(sent) 106 self.rcvd = set()
107
108 109 -class ConfdClient:
110 """Send queries to confd, and get back answers. 111 112 Since the confd model works by querying multiple master candidates, and 113 getting back answers, this is an asynchronous library. It can either work 114 through asyncore or with your own handling. 115 116 @type _requests: dict 117 @ivar _requests: dictionary indexes by salt, which contains data 118 about the outstanding requests; the values are objects of type 119 L{_Request} 120 121 """
122 - def __init__(self, hmac_key, peers, callback, port=None, logger=None):
123 """Constructor for ConfdClient 124 125 @type hmac_key: string 126 @param hmac_key: hmac key to talk to confd 127 @type peers: list 128 @param peers: list of peer nodes 129 @type callback: f(L{ConfdUpcallPayload}) 130 @param callback: function to call when getting answers 131 @type port: integer 132 @param port: confd port (default: use GetDaemonPort) 133 @type logger: logging.Logger 134 @param logger: optional logger for internal conditions 135 136 """ 137 if not callable(callback): 138 raise errors.ProgrammerError("callback must be callable") 139 140 self.UpdatePeerList(peers) 141 self._SetPeersAddressFamily() 142 self._hmac_key = hmac_key 143 self._socket = ConfdAsyncUDPClient(self, self._family) 144 self._callback = callback 145 self._confd_port = port 146 self._logger = logger 147 self._requests = {} 148 149 if self._confd_port is None: 150 self._confd_port = netutils.GetDaemonPort(constants.CONFD)
151
152 - def UpdatePeerList(self, peers):
153 """Update the list of peers 154 155 @type peers: list 156 @param peers: list of peer nodes 157 158 """ 159 # we are actually called from init, so: 160 # pylint: disable=W0201 161 if not isinstance(peers, list): 162 raise errors.ProgrammerError("peers must be a list") 163 # make a copy of peers, since we're going to shuffle the list, later 164 self._peers = list(peers)
165
166 - def _PackRequest(self, request, now=None):
167 """Prepare a request to be sent on the wire. 168 169 This function puts a proper salt in a confd request, puts the proper salt, 170 and adds the correct magic number. 171 172 """ 173 if now is None: 174 now = time.time() 175 tstamp = "%d" % now 176 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp) 177 return confd.PackMagic(req)
178
179 - def _UnpackReply(self, payload):
180 in_payload = confd.UnpackMagic(payload) 181 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key) 182 answer = objects.ConfdReply.FromDict(dict_answer) 183 return answer, salt
184
185 - def ExpireRequests(self):
186 """Delete all the expired requests. 187 188 """ 189 now = time.time() 190 for rsalt, rq in self._requests.items(): 191 if now >= rq.expiry: 192 del self._requests[rsalt] 193 client_reply = ConfdUpcallPayload(salt=rsalt, 194 type=UPCALL_EXPIRE, 195 orig_request=rq.request, 196 extra_args=rq.args, 197 client=self, 198 ) 199 self._callback(client_reply)
200
201 - def SendRequest(self, request, args=None, coverage=0, async=True):
202 """Send a confd request to some MCs 203 204 @type request: L{objects.ConfdRequest} 205 @param request: the request to send 206 @type args: tuple 207 @param args: additional callback arguments 208 @type coverage: integer 209 @param coverage: number of remote nodes to contact; if default 210 (0), it will use a reasonable default 211 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is 212 passed, it will use the maximum number of peers, otherwise the 213 number passed in will be used 214 @type async: boolean 215 @param async: handle the write asynchronously 216 217 """ 218 if coverage == 0: 219 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE) 220 elif coverage == -1: 221 coverage = len(self._peers) 222 223 if coverage > len(self._peers): 224 raise errors.ConfdClientError("Not enough MCs known to provide the" 225 " desired coverage") 226 227 if not request.rsalt: 228 raise errors.ConfdClientError("Missing request rsalt") 229 230 self.ExpireRequests() 231 if request.rsalt in self._requests: 232 raise errors.ConfdClientError("Duplicate request rsalt") 233 234 if request.type not in constants.CONFD_REQS: 235 raise errors.ConfdClientError("Invalid request type") 236 237 random.shuffle(self._peers) 238 targets = self._peers[:coverage] 239 240 now = time.time() 241 payload = self._PackRequest(request, now=now) 242 243 for target in targets: 244 try: 245 self._socket.enqueue_send(target, self._confd_port, payload) 246 except errors.UdpDataSizeError: 247 raise errors.ConfdClientError("Request too big") 248 249 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT 250 self._requests[request.rsalt] = _Request(request, args, expire_time, 251 targets) 252 253 if not async: 254 self.FlushSendQueue()
255
256 - def HandleResponse(self, payload, ip, port):
257 """Asynchronous handler for a confd reply 258 259 Call the relevant callback associated to the current request. 260 261 """ 262 try: 263 try: 264 answer, salt = self._UnpackReply(payload) 265 except (errors.SignatureError, errors.ConfdMagicError), err: 266 if self._logger: 267 self._logger.debug("Discarding broken package: %s" % err) 268 return 269 270 try: 271 rq = self._requests[salt] 272 except KeyError: 273 if self._logger: 274 self._logger.debug("Discarding unknown (expired?) reply: %s" % err) 275 return 276 277 rq.rcvd.add(ip) 278 279 client_reply = ConfdUpcallPayload(salt=salt, 280 type=UPCALL_REPLY, 281 server_reply=answer, 282 orig_request=rq.request, 283 server_ip=ip, 284 server_port=port, 285 extra_args=rq.args, 286 client=self, 287 ) 288 self._callback(client_reply) 289 290 finally: 291 self.ExpireRequests()
292
293 - def FlushSendQueue(self):
294 """Send out all pending requests. 295 296 Can be used for synchronous client use. 297 298 """ 299 while self._socket.writable(): 300 self._socket.handle_write()
301
302 - def ReceiveReply(self, timeout=1):
303 """Receive one reply. 304 305 @type timeout: float 306 @param timeout: how long to wait for the reply 307 @rtype: boolean 308 @return: True if some data has been handled, False otherwise 309 310 """ 311 return self._socket.process_next_packet(timeout=timeout)
312 313 @staticmethod
314 - def _NeededReplies(peer_cnt):
315 """Compute the minimum safe number of replies for a query. 316 317 The algorithm is designed to work well for both small and big 318 number of peers: 319 - for less than three, we require all responses 320 - for less than five, we allow one miss 321 - otherwise, half the number plus one 322 323 This guarantees that we progress monotonically: 1->1, 2->2, 3->2, 324 4->2, 5->3, 6->3, 7->4, etc. 325 326 @type peer_cnt: int 327 @param peer_cnt: the number of peers contacted 328 @rtype: int 329 @return: the number of replies which should give a safe coverage 330 331 """ 332 if peer_cnt < 3: 333 return peer_cnt 334 elif peer_cnt < 5: 335 return peer_cnt - 1 336 else: 337 return int(peer_cnt / 2) + 1
338
339 - def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
340 """Wait for replies to a given request. 341 342 This method will wait until either the timeout expires or a 343 minimum number (computed using L{_NeededReplies}) of replies are 344 received for the given salt. It is useful when doing synchronous 345 calls to this library. 346 347 @param salt: the salt of the request we want responses for 348 @param timeout: the maximum timeout (should be less or equal to 349 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT} 350 @rtype: tuple 351 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the 352 request is unknown, timed_out will be true and the counters 353 will be zero 354 355 """ 356 def _CheckResponse(): 357 if salt not in self._requests: 358 # expired? 359 if self._logger: 360 self._logger.debug("Discarding unknown/expired request: %s" % salt) 361 return MISSING 362 rq = self._requests[salt] 363 if len(rq.rcvd) >= expected: 364 # already got all replies 365 return (False, len(rq.sent), len(rq.rcvd)) 366 # else wait, using default timeout 367 self.ReceiveReply() 368 raise utils.RetryAgain()
369 370 MISSING = (True, 0, 0) 371 372 if salt not in self._requests: 373 return MISSING 374 # extend the expire time with the current timeout, so that we 375 # don't get the request expired from under us 376 rq = self._requests[salt] 377 rq.expiry += timeout 378 sent = len(rq.sent) 379 expected = self._NeededReplies(sent) 380 381 try: 382 return utils.Retry(_CheckResponse, 0, timeout) 383 except utils.RetryTimeout: 384 if salt in self._requests: 385 rq = self._requests[salt] 386 return (True, len(rq.sent), len(rq.rcvd)) 387 else: 388 return MISSING
389
390 - def _SetPeersAddressFamily(self):
391 if not self._peers: 392 raise errors.ConfdClientError("Peer list empty") 393 try: 394 peer = self._peers[0] 395 self._family = netutils.IPAddress.GetAddressFamily(peer) 396 for peer in self._peers[1:]: 397 if netutils.IPAddress.GetAddressFamily(peer) != self._family: 398 raise errors.ConfdClientError("Peers must be of same address family") 399 except errors.IPAddressError: 400 raise errors.ConfdClientError("Peer address %s invalid" % peer)
401 402 403 # UPCALL_REPLY: server reply upcall 404 # has all ConfdUpcallPayload fields populated 405 UPCALL_REPLY = 1 406 # UPCALL_EXPIRE: internal library request expire 407 # has only salt, type, orig_request and extra_args 408 UPCALL_EXPIRE = 2 409 CONFD_UPCALL_TYPES = compat.UniqueFrozenset([ 410 UPCALL_REPLY, 411 UPCALL_EXPIRE, 412 ])
413 414 415 -class ConfdUpcallPayload(objects.ConfigObject):
416 """Callback argument for confd replies 417 418 @type salt: string 419 @ivar salt: salt associated with the query 420 @type type: one of confd.client.CONFD_UPCALL_TYPES 421 @ivar type: upcall type (server reply, expired request, ...) 422 @type orig_request: L{objects.ConfdRequest} 423 @ivar orig_request: original request 424 @type server_reply: L{objects.ConfdReply} 425 @ivar server_reply: server reply 426 @type server_ip: string 427 @ivar server_ip: answering server ip address 428 @type server_port: int 429 @ivar server_port: answering server port 430 @type extra_args: any 431 @ivar extra_args: 'args' argument of the SendRequest function 432 @type client: L{ConfdClient} 433 @ivar client: current confd client instance 434 435 """ 436 __slots__ = [ 437 "salt", 438 "type", 439 "orig_request", 440 "server_reply", 441 "server_ip", 442 "server_port", 443 "extra_args", 444 "client", 445 ]
446
447 448 -class ConfdClientRequest(objects.ConfdRequest):
449 """This is the client-side version of ConfdRequest. 450 451 This version of the class helps creating requests, on the client side, by 452 filling in some default values. 453 454 """
455 - def __init__(self, **kwargs):
456 objects.ConfdRequest.__init__(self, **kwargs) 457 if not self.rsalt: 458 self.rsalt = utils.NewUUID() 459 if not self.protocol: 460 self.protocol = constants.CONFD_PROTOCOL_VERSION 461 if self.type not in constants.CONFD_REQS: 462 raise errors.ConfdClientError("Invalid request type")
463
464 465 -class ConfdFilterCallback:
466 """Callback that calls another callback, but filters duplicate results. 467 468 @ivar consistent: a dictionary indexed by salt; for each salt, if 469 all responses ware identical, this will be True; this is the 470 expected state on a healthy cluster; on inconsistent or 471 partitioned clusters, this might be False, if we see answers 472 with the same serial but different contents 473 474 """
475 - def __init__(self, callback, logger=None):
476 """Constructor for ConfdFilterCallback 477 478 @type callback: f(L{ConfdUpcallPayload}) 479 @param callback: function to call when getting answers 480 @type logger: logging.Logger 481 @param logger: optional logger for internal conditions 482 483 """ 484 if not callable(callback): 485 raise errors.ProgrammerError("callback must be callable") 486 487 self._callback = callback 488 self._logger = logger 489 # answers contains a dict of salt -> answer 490 self._answers = {} 491 self.consistent = {}
492
493 - def _LogFilter(self, salt, new_reply, old_reply):
494 if not self._logger: 495 return 496 497 if new_reply.serial > old_reply.serial: 498 self._logger.debug("Filtering confirming answer, with newer" 499 " serial for query %s" % salt) 500 elif new_reply.serial == old_reply.serial: 501 if new_reply.answer != old_reply.answer: 502 self._logger.warning("Got incoherent answers for query %s" 503 " (serial: %s)" % (salt, new_reply.serial)) 504 else: 505 self._logger.debug("Filtering confirming answer, with same" 506 " serial for query %s" % salt) 507 else: 508 self._logger.debug("Filtering outdated answer for query %s" 509 " serial: (%d < %d)" % (salt, old_reply.serial, 510 new_reply.serial))
511
512 - def _HandleExpire(self, up):
513 # if we have no answer we have received none, before the expiration. 514 if up.salt in self._answers: 515 del self._answers[up.salt] 516 if up.salt in self.consistent: 517 del self.consistent[up.salt]
518
519 - def _HandleReply(self, up):
520 """Handle a single confd reply, and decide whether to filter it. 521 522 @rtype: boolean 523 @return: True if the reply should be filtered, False if it should be passed 524 on to the up-callback 525 526 """ 527 filter_upcall = False 528 salt = up.salt 529 if salt not in self.consistent: 530 self.consistent[salt] = True 531 if salt not in self._answers: 532 # first answer for a query (don't filter, and record) 533 self._answers[salt] = up.server_reply 534 elif up.server_reply.serial > self._answers[salt].serial: 535 # newer answer (record, and compare contents) 536 old_answer = self._answers[salt] 537 self._answers[salt] = up.server_reply 538 if up.server_reply.answer == old_answer.answer: 539 # same content (filter) (version upgrade was unrelated) 540 filter_upcall = True 541 self._LogFilter(salt, up.server_reply, old_answer) 542 # else: different content, pass up a second answer 543 else: 544 # older or same-version answer (duplicate or outdated, filter) 545 if (up.server_reply.serial == self._answers[salt].serial and 546 up.server_reply.answer != self._answers[salt].answer): 547 self.consistent[salt] = False 548 filter_upcall = True 549 self._LogFilter(salt, up.server_reply, self._answers[salt]) 550 551 return filter_upcall
552
553 - def __call__(self, up):
554 """Filtering callback 555 556 @type up: L{ConfdUpcallPayload} 557 @param up: upper callback 558 559 """ 560 filter_upcall = False 561 if up.type == UPCALL_REPLY: 562 filter_upcall = self._HandleReply(up) 563 elif up.type == UPCALL_EXPIRE: 564 self._HandleExpire(up) 565 566 if not filter_upcall: 567 self._callback(up)
568
569 570 -class ConfdCountingCallback:
571 """Callback that calls another callback, and counts the answers 572 573 """
574 - def __init__(self, callback, logger=None):
575 """Constructor for ConfdCountingCallback 576 577 @type callback: f(L{ConfdUpcallPayload}) 578 @param callback: function to call when getting answers 579 @type logger: logging.Logger 580 @param logger: optional logger for internal conditions 581 582 """ 583 if not callable(callback): 584 raise errors.ProgrammerError("callback must be callable") 585 586 self._callback = callback 587 self._logger = logger 588 # answers contains a dict of salt -> count 589 self._answers = {}
590
591 - def RegisterQuery(self, salt):
592 if salt in self._answers: 593 raise errors.ProgrammerError("query already registered") 594 self._answers[salt] = 0
595
596 - def AllAnswered(self):
597 """Have all the registered queries received at least an answer? 598 599 """ 600 return compat.all(self._answers.values())
601
602 - def _HandleExpire(self, up):
603 # if we have no answer we have received none, before the expiration. 604 if up.salt in self._answers: 605 del self._answers[up.salt]
606
607 - def _HandleReply(self, up):
608 """Handle a single confd reply, and decide whether to filter it. 609 610 @rtype: boolean 611 @return: True if the reply should be filtered, False if it should be passed 612 on to the up-callback 613 614 """ 615 if up.salt in self._answers: 616 self._answers[up.salt] += 1
617
618 - def __call__(self, up):
619 """Filtering callback 620 621 @type up: L{ConfdUpcallPayload} 622 @param up: upper callback 623 624 """ 625 if up.type == UPCALL_REPLY: 626 self._HandleReply(up) 627 elif up.type == UPCALL_EXPIRE: 628 self._HandleExpire(up) 629 self._callback(up)
630
631 632 -class StoreResultCallback:
633 """Callback that simply stores the most recent answer. 634 635 @ivar _answers: dict of salt to (have_answer, reply) 636 637 """ 638 _NO_KEY = (False, None) 639
640 - def __init__(self):
641 """Constructor for StoreResultCallback 642 643 """ 644 # answers contains a dict of salt -> best result 645 self._answers = {}
646
647 - def GetResponse(self, salt):
648 """Return the best match for a salt 649 650 """ 651 return self._answers.get(salt, self._NO_KEY)
652
653 - def _HandleExpire(self, up):
654 """Expiration handler. 655 656 """ 657 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY: 658 del self._answers[up.salt]
659
660 - def _HandleReply(self, up):
661 """Handle a single confd reply, and decide whether to filter it. 662 663 """ 664 self._answers[up.salt] = (True, up)
665
666 - def __call__(self, up):
667 """Filtering callback 668 669 @type up: L{ConfdUpcallPayload} 670 @param up: upper callback 671 672 """ 673 if up.type == UPCALL_REPLY: 674 self._HandleReply(up) 675 elif up.type == UPCALL_EXPIRE: 676 self._HandleExpire(up)
677
678 679 -def GetConfdClient(callback):
680 """Return a client configured using the given callback. 681 682 This is handy to abstract the MC list and HMAC key reading. 683 684 @attention: This should only be called on nodes which are part of a 685 cluster, since it depends on a valid (ganeti) data directory; 686 for code running outside of a cluster, you need to create the 687 client manually 688 689 """ 690 ss = ssconf.SimpleStore() 691 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS) 692 mc_list = utils.ReadFile(mc_file).splitlines() 693 hmac_key = utils.ReadFile(pathutils.CONFD_HMAC_KEY) 694 return ConfdClient(hmac_key, mc_list, callback)
695