Package ganeti :: Package confd :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.confd.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2009, 2010, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Ganeti confd client 
 32   
 33  Clients can use the confd client library to send requests to a group of master 
 34  candidates running confd. The expected usage is through the asyncore framework, 
 35  by sending queries, and asynchronously receiving replies through a callback. 
 36   
 37  This way the client library doesn't ever need to "wait" on a particular answer, 
 38  and can proceed even if some udp packets are lost. It's up to the user to 
 39  reschedule queries if they haven't received responses and they need them. 
 40   
 41  Example usage:: 
 42   
 43    client = ConfdClient(...) # includes callback specification 
 44    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) 
 45    client.SendRequest(req) 
 46    # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run() 
 47    # ... wait ... 
 48    # And your callback will be called by asyncore, when your query gets a 
 49    # response, or when it expires. 
 50   
 51  You can use the provided ConfdFilterCallback to act as a filter, only passing 
 52  "newer" answer to your callback, and filtering out outdated ones, or ones 
 53  confirming what you already got. 
 54   
 55  """ 
 56   
 57  # pylint: disable=E0203 
 58   
 59  # E0203: Access to member %r before its definition, since we use 
 60  # objects.py which doesn't explicitly initialise its members 
 61   
 62  import time 
 63  import random 
 64   
 65  from ganeti import utils 
 66  from ganeti import constants 
 67  from ganeti import objects 
 68  from ganeti import serializer 
 69  from ganeti import daemon # contains AsyncUDPSocket 
 70  from ganeti import errors 
 71  from ganeti import confd 
 72  from ganeti import ssconf 
 73  from ganeti import compat 
 74  from ganeti import netutils 
 75  from ganeti import pathutils 
76 77 78 -class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
79 """Confd udp asyncore client 80 81 This is kept separate from the main ConfdClient to make sure it's easy to 82 implement a non-asyncore based client library. 83 84 """
85 - def __init__(self, client, family):
86 """Constructor for ConfdAsyncUDPClient 87 88 @type client: L{ConfdClient} 89 @param client: client library, to pass the datagrams to 90 91 """ 92 daemon.AsyncUDPSocket.__init__(self, family) 93 self.client = client
94 95 # this method is overriding a daemon.AsyncUDPSocket method
96 - def handle_datagram(self, payload, ip, port):
97 self.client.HandleResponse(payload, ip, port)
98
99 100 -class _Request(object):
101 """Request status structure. 102 103 @ivar request: the request data 104 @ivar args: any extra arguments for the callback 105 @ivar expiry: the expiry timestamp of the request 106 @ivar sent: the set of contacted peers 107 @ivar rcvd: the set of peers who replied 108 109 """
110 - def __init__(self, request, args, expiry, sent):
111 self.request = request 112 self.args = args 113 self.expiry = expiry 114 self.sent = frozenset(sent) 115 self.rcvd = set()
116
117 118 -class ConfdClient(object):
119 """Send queries to confd, and get back answers. 120 121 Since the confd model works by querying multiple master candidates, and 122 getting back answers, this is an asynchronous library. It can either work 123 through asyncore or with your own handling. 124 125 @type _requests: dict 126 @ivar _requests: dictionary indexes by salt, which contains data 127 about the outstanding requests; the values are objects of type 128 L{_Request} 129 130 """
131 - def __init__(self, hmac_key, peers, callback, port=None, logger=None):
132 """Constructor for ConfdClient 133 134 @type hmac_key: string 135 @param hmac_key: hmac key to talk to confd 136 @type peers: list 137 @param peers: list of peer nodes 138 @type callback: f(L{ConfdUpcallPayload}) 139 @param callback: function to call when getting answers 140 @type port: integer 141 @param port: confd port (default: use GetDaemonPort) 142 @type logger: logging.Logger 143 @param logger: optional logger for internal conditions 144 145 """ 146 if not callable(callback): 147 raise errors.ProgrammerError("callback must be callable") 148 149 self.UpdatePeerList(peers) 150 self._SetPeersAddressFamily() 151 self._hmac_key = hmac_key 152 self._socket = ConfdAsyncUDPClient(self, self._family) 153 self._callback = callback 154 self._confd_port = port 155 self._logger = logger 156 self._requests = {} 157 158 if self._confd_port is None: 159 self._confd_port = netutils.GetDaemonPort(constants.CONFD)
160
161 - def UpdatePeerList(self, peers):
162 """Update the list of peers 163 164 @type peers: list 165 @param peers: list of peer nodes 166 167 """ 168 # we are actually called from init, so: 169 # pylint: disable=W0201 170 if not isinstance(peers, list): 171 raise errors.ProgrammerError("peers must be a list") 172 # make a copy of peers, since we're going to shuffle the list, later 173 self._peers = list(peers)
174
175 - def _PackRequest(self, request, now=None):
176 """Prepare a request to be sent on the wire. 177 178 This function puts a proper salt in a confd request, puts the proper salt, 179 and adds the correct magic number. 180 181 """ 182 if now is None: 183 now = time.time() 184 tstamp = "%d" % now 185 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp) 186 return confd.PackMagic(req)
187
188 - def _UnpackReply(self, payload):
189 in_payload = confd.UnpackMagic(payload) 190 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key) 191 answer = objects.ConfdReply.FromDict(dict_answer) 192 return answer, salt
193
194 - def ExpireRequests(self):
195 """Delete all the expired requests. 196 197 """ 198 now = time.time() 199 for rsalt, rq in self._requests.items(): 200 if now >= rq.expiry: 201 del self._requests[rsalt] 202 client_reply = ConfdUpcallPayload(salt=rsalt, 203 type=UPCALL_EXPIRE, 204 orig_request=rq.request, 205 extra_args=rq.args, 206 client=self, 207 ) 208 self._callback(client_reply)
209
210 - def SendRequest(self, request, args=None, coverage=0, async=True):
211 """Send a confd request to some MCs 212 213 @type request: L{objects.ConfdRequest} 214 @param request: the request to send 215 @type args: tuple 216 @param args: additional callback arguments 217 @type coverage: integer 218 @param coverage: number of remote nodes to contact; if default 219 (0), it will use a reasonable default 220 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is 221 passed, it will use the maximum number of peers, otherwise the 222 number passed in will be used 223 @type async: boolean 224 @param async: handle the write asynchronously 225 226 """ 227 if coverage == 0: 228 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE) 229 elif coverage == -1: 230 coverage = len(self._peers) 231 232 if coverage > len(self._peers): 233 raise errors.ConfdClientError("Not enough MCs known to provide the" 234 " desired coverage") 235 236 if not request.rsalt: 237 raise errors.ConfdClientError("Missing request rsalt") 238 239 self.ExpireRequests() 240 if request.rsalt in self._requests: 241 raise errors.ConfdClientError("Duplicate request rsalt") 242 243 if request.type not in constants.CONFD_REQS: 244 raise errors.ConfdClientError("Invalid request type") 245 246 random.shuffle(self._peers) 247 targets = self._peers[:coverage] 248 249 now = time.time() 250 payload = self._PackRequest(request, now=now) 251 252 for target in targets: 253 try: 254 self._socket.enqueue_send(target, self._confd_port, payload) 255 except errors.UdpDataSizeError: 256 raise errors.ConfdClientError("Request too big") 257 258 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT 259 self._requests[request.rsalt] = _Request(request, args, expire_time, 260 targets) 261 262 if not async: 263 self.FlushSendQueue()
264
265 - def HandleResponse(self, payload, ip, port):
266 """Asynchronous handler for a confd reply 267 268 Call the relevant callback associated to the current request. 269 270 """ 271 try: 272 try: 273 answer, salt = self._UnpackReply(payload) 274 except (errors.SignatureError, errors.ConfdMagicError), err: 275 if self._logger: 276 self._logger.debug("Discarding broken package: %s" % err) 277 return 278 279 try: 280 rq = self._requests[salt] 281 except KeyError: 282 if self._logger: 283 self._logger.debug("Discarding unknown (expired?) reply: %s" % err) 284 return 285 286 rq.rcvd.add(ip) 287 288 client_reply = ConfdUpcallPayload(salt=salt, 289 type=UPCALL_REPLY, 290 server_reply=answer, 291 orig_request=rq.request, 292 server_ip=ip, 293 server_port=port, 294 extra_args=rq.args, 295 client=self, 296 ) 297 self._callback(client_reply) 298 299 finally: 300 self.ExpireRequests()
301
302 - def FlushSendQueue(self):
303 """Send out all pending requests. 304 305 Can be used for synchronous client use. 306 307 """ 308 while self._socket.writable(): 309 self._socket.handle_write()
310
311 - def ReceiveReply(self, timeout=1):
312 """Receive one reply. 313 314 @type timeout: float 315 @param timeout: how long to wait for the reply 316 @rtype: boolean 317 @return: True if some data has been handled, False otherwise 318 319 """ 320 return self._socket.process_next_packet(timeout=timeout)
321 322 @staticmethod
323 - def _NeededReplies(peer_cnt):
324 """Compute the minimum safe number of replies for a query. 325 326 The algorithm is designed to work well for both small and big 327 number of peers: 328 - for less than three, we require all responses 329 - for less than five, we allow one miss 330 - otherwise, half the number plus one 331 332 This guarantees that we progress monotonically: 1->1, 2->2, 3->2, 333 4->2, 5->3, 6->3, 7->4, etc. 334 335 @type peer_cnt: int 336 @param peer_cnt: the number of peers contacted 337 @rtype: int 338 @return: the number of replies which should give a safe coverage 339 340 """ 341 if peer_cnt < 3: 342 return peer_cnt 343 elif peer_cnt < 5: 344 return peer_cnt - 1 345 else: 346 return int(peer_cnt / 2) + 1
347
348 - def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
349 """Wait for replies to a given request. 350 351 This method will wait until either the timeout expires or a 352 minimum number (computed using L{_NeededReplies}) of replies are 353 received for the given salt. It is useful when doing synchronous 354 calls to this library. 355 356 @param salt: the salt of the request we want responses for 357 @param timeout: the maximum timeout (should be less or equal to 358 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT} 359 @rtype: tuple 360 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the 361 request is unknown, timed_out will be true and the counters 362 will be zero 363 364 """ 365 def _CheckResponse(): 366 if salt not in self._requests: 367 # expired? 368 if self._logger: 369 self._logger.debug("Discarding unknown/expired request: %s" % salt) 370 return MISSING 371 rq = self._requests[salt] 372 if len(rq.rcvd) >= expected: 373 # already got all replies 374 return (False, len(rq.sent), len(rq.rcvd)) 375 # else wait, using default timeout 376 self.ReceiveReply() 377 raise utils.RetryAgain()
378 379 MISSING = (True, 0, 0) 380 381 if salt not in self._requests: 382 return MISSING 383 # extend the expire time with the current timeout, so that we 384 # don't get the request expired from under us 385 rq = self._requests[salt] 386 rq.expiry += timeout 387 sent = len(rq.sent) 388 expected = self._NeededReplies(sent) 389 390 try: 391 return utils.Retry(_CheckResponse, 0, timeout) 392 except utils.RetryTimeout: 393 if salt in self._requests: 394 rq = self._requests[salt] 395 return (True, len(rq.sent), len(rq.rcvd)) 396 else: 397 return MISSING
398
399 - def _SetPeersAddressFamily(self):
400 if not self._peers: 401 raise errors.ConfdClientError("Peer list empty") 402 try: 403 peer = self._peers[0] 404 self._family = netutils.IPAddress.GetAddressFamily(peer) 405 for peer in self._peers[1:]: 406 if netutils.IPAddress.GetAddressFamily(peer) != self._family: 407 raise errors.ConfdClientError("Peers must be of same address family") 408 except errors.IPAddressError: 409 raise errors.ConfdClientError("Peer address %s invalid" % peer)
410 411 412 # UPCALL_REPLY: server reply upcall 413 # has all ConfdUpcallPayload fields populated 414 UPCALL_REPLY = 1 415 # UPCALL_EXPIRE: internal library request expire 416 # has only salt, type, orig_request and extra_args 417 UPCALL_EXPIRE = 2 418 CONFD_UPCALL_TYPES = compat.UniqueFrozenset([ 419 UPCALL_REPLY, 420 UPCALL_EXPIRE, 421 ])
422 423 424 -class ConfdUpcallPayload(objects.ConfigObject):
425 """Callback argument for confd replies 426 427 @type salt: string 428 @ivar salt: salt associated with the query 429 @type type: one of confd.client.CONFD_UPCALL_TYPES 430 @ivar type: upcall type (server reply, expired request, ...) 431 @type orig_request: L{objects.ConfdRequest} 432 @ivar orig_request: original request 433 @type server_reply: L{objects.ConfdReply} 434 @ivar server_reply: server reply 435 @type server_ip: string 436 @ivar server_ip: answering server ip address 437 @type server_port: int 438 @ivar server_port: answering server port 439 @type extra_args: any 440 @ivar extra_args: 'args' argument of the SendRequest function 441 @type client: L{ConfdClient} 442 @ivar client: current confd client instance 443 444 """ 445 __slots__ = [ 446 "salt", 447 "type", 448 "orig_request", 449 "server_reply", 450 "server_ip", 451 "server_port", 452 "extra_args", 453 "client", 454 ]
455
456 457 -class ConfdClientRequest(objects.ConfdRequest):
458 """This is the client-side version of ConfdRequest. 459 460 This version of the class helps creating requests, on the client side, by 461 filling in some default values. 462 463 """
464 - def __init__(self, **kwargs):
465 objects.ConfdRequest.__init__(self, **kwargs) 466 if not self.rsalt: 467 self.rsalt = utils.NewUUID() 468 if not self.protocol: 469 self.protocol = constants.CONFD_PROTOCOL_VERSION 470 if self.type not in constants.CONFD_REQS: 471 raise errors.ConfdClientError("Invalid request type")
472
473 474 -class ConfdFilterCallback(object):
475 """Callback that calls another callback, but filters duplicate results. 476 477 @ivar consistent: a dictionary indexed by salt; for each salt, if 478 all responses ware identical, this will be True; this is the 479 expected state on a healthy cluster; on inconsistent or 480 partitioned clusters, this might be False, if we see answers 481 with the same serial but different contents 482 483 """
484 - def __init__(self, callback, logger=None):
485 """Constructor for ConfdFilterCallback 486 487 @type callback: f(L{ConfdUpcallPayload}) 488 @param callback: function to call when getting answers 489 @type logger: logging.Logger 490 @param logger: optional logger for internal conditions 491 492 """ 493 if not callable(callback): 494 raise errors.ProgrammerError("callback must be callable") 495 496 self._callback = callback 497 self._logger = logger 498 # answers contains a dict of salt -> answer 499 self._answers = {} 500 self.consistent = {}
501
502 - def _LogFilter(self, salt, new_reply, old_reply):
503 if not self._logger: 504 return 505 506 if new_reply.serial > old_reply.serial: 507 self._logger.debug("Filtering confirming answer, with newer" 508 " serial for query %s" % salt) 509 elif new_reply.serial == old_reply.serial: 510 if new_reply.answer != old_reply.answer: 511 self._logger.warning("Got incoherent answers for query %s" 512 " (serial: %s)" % (salt, new_reply.serial)) 513 else: 514 self._logger.debug("Filtering confirming answer, with same" 515 " serial for query %s" % salt) 516 else: 517 self._logger.debug("Filtering outdated answer for query %s" 518 " serial: (%d < %d)" % (salt, old_reply.serial, 519 new_reply.serial))
520
521 - def _HandleExpire(self, up):
522 # if we have no answer we have received none, before the expiration. 523 if up.salt in self._answers: 524 del self._answers[up.salt] 525 if up.salt in self.consistent: 526 del self.consistent[up.salt]
527
528 - def _HandleReply(self, up):
529 """Handle a single confd reply, and decide whether to filter it. 530 531 @rtype: boolean 532 @return: True if the reply should be filtered, False if it should be passed 533 on to the up-callback 534 535 """ 536 filter_upcall = False 537 salt = up.salt 538 if salt not in self.consistent: 539 self.consistent[salt] = True 540 if salt not in self._answers: 541 # first answer for a query (don't filter, and record) 542 self._answers[salt] = up.server_reply 543 elif up.server_reply.serial > self._answers[salt].serial: 544 # newer answer (record, and compare contents) 545 old_answer = self._answers[salt] 546 self._answers[salt] = up.server_reply 547 if up.server_reply.answer == old_answer.answer: 548 # same content (filter) (version upgrade was unrelated) 549 filter_upcall = True 550 self._LogFilter(salt, up.server_reply, old_answer) 551 # else: different content, pass up a second answer 552 else: 553 # older or same-version answer (duplicate or outdated, filter) 554 if (up.server_reply.serial == self._answers[salt].serial and 555 up.server_reply.answer != self._answers[salt].answer): 556 self.consistent[salt] = False 557 filter_upcall = True 558 self._LogFilter(salt, up.server_reply, self._answers[salt]) 559 560 return filter_upcall
561
562 - def __call__(self, up):
563 """Filtering callback 564 565 @type up: L{ConfdUpcallPayload} 566 @param up: upper callback 567 568 """ 569 filter_upcall = False 570 if up.type == UPCALL_REPLY: 571 filter_upcall = self._HandleReply(up) 572 elif up.type == UPCALL_EXPIRE: 573 self._HandleExpire(up) 574 575 if not filter_upcall: 576 self._callback(up)
577
578 579 -class ConfdCountingCallback(object):
580 """Callback that calls another callback, and counts the answers 581 582 """
583 - def __init__(self, callback, logger=None):
584 """Constructor for ConfdCountingCallback 585 586 @type callback: f(L{ConfdUpcallPayload}) 587 @param callback: function to call when getting answers 588 @type logger: logging.Logger 589 @param logger: optional logger for internal conditions 590 591 """ 592 if not callable(callback): 593 raise errors.ProgrammerError("callback must be callable") 594 595 self._callback = callback 596 self._logger = logger 597 # answers contains a dict of salt -> count 598 self._answers = {}
599
600 - def RegisterQuery(self, salt):
601 if salt in self._answers: 602 raise errors.ProgrammerError("query already registered") 603 self._answers[salt] = 0
604
605 - def AllAnswered(self):
606 """Have all the registered queries received at least an answer? 607 608 """ 609 return compat.all(self._answers.values())
610
611 - def _HandleExpire(self, up):
612 # if we have no answer we have received none, before the expiration. 613 if up.salt in self._answers: 614 del self._answers[up.salt]
615
616 - def _HandleReply(self, up):
617 """Handle a single confd reply, and decide whether to filter it. 618 619 @rtype: boolean 620 @return: True if the reply should be filtered, False if it should be passed 621 on to the up-callback 622 623 """ 624 if up.salt in self._answers: 625 self._answers[up.salt] += 1
626
627 - def __call__(self, up):
628 """Filtering callback 629 630 @type up: L{ConfdUpcallPayload} 631 @param up: upper callback 632 633 """ 634 if up.type == UPCALL_REPLY: 635 self._HandleReply(up) 636 elif up.type == UPCALL_EXPIRE: 637 self._HandleExpire(up) 638 self._callback(up)
639
640 641 -class StoreResultCallback(object):
642 """Callback that simply stores the most recent answer. 643 644 @ivar _answers: dict of salt to (have_answer, reply) 645 646 """ 647 _NO_KEY = (False, None) 648
649 - def __init__(self):
650 """Constructor for StoreResultCallback 651 652 """ 653 # answers contains a dict of salt -> best result 654 self._answers = {}
655
656 - def GetResponse(self, salt):
657 """Return the best match for a salt 658 659 """ 660 return self._answers.get(salt, self._NO_KEY)
661
662 - def _HandleExpire(self, up):
663 """Expiration handler. 664 665 """ 666 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY: 667 del self._answers[up.salt]
668
669 - def _HandleReply(self, up):
670 """Handle a single confd reply, and decide whether to filter it. 671 672 """ 673 self._answers[up.salt] = (True, up)
674
675 - def __call__(self, up):
676 """Filtering callback 677 678 @type up: L{ConfdUpcallPayload} 679 @param up: upper callback 680 681 """ 682 if up.type == UPCALL_REPLY: 683 self._HandleReply(up) 684 elif up.type == UPCALL_EXPIRE: 685 self._HandleExpire(up)
686
687 688 -def GetConfdClient(callback):
689 """Return a client configured using the given callback. 690 691 This is handy to abstract the MC list and HMAC key reading. 692 693 @attention: This should only be called on nodes which are part of a 694 cluster, since it depends on a valid (ganeti) data directory; 695 for code running outside of a cluster, you need to create the 696 client manually 697 698 """ 699 ss = ssconf.SimpleStore() 700 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS) 701 mc_list = utils.ReadFile(mc_file).splitlines() 702 hmac_key = utils.ReadFile(pathutils.CONFD_HMAC_KEY) 703 return ConfdClient(hmac_key, mc_list, callback)
704