Package ganeti :: Package confd :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.confd.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2009, 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Ganeti confd client 
 23   
 24  Clients can use the confd client library to send requests to a group of master 
 25  candidates running confd. The expected usage is through the asyncore framework, 
 26  by sending queries, and asynchronously receiving replies through a callback. 
 27   
 28  This way the client library doesn't ever need to "wait" on a particular answer, 
 29  and can proceed even if some udp packets are lost. It's up to the user to 
 30  reschedule queries if they haven't received responses and they need them. 
 31   
 32  Example usage:: 
 33   
 34    client = ConfdClient(...) # includes callback specification 
 35    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) 
 36    client.SendRequest(req) 
 37    # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run() 
 38    # ... wait ... 
 39    # And your callback will be called by asyncore, when your query gets a 
 40    # response, or when it expires. 
 41   
 42  You can use the provided ConfdFilterCallback to act as a filter, only passing 
 43  "newer" answer to your callback, and filtering out outdated ones, or ones 
 44  confirming what you already got. 
 45   
 46  """ 
 47   
 48  # pylint: disable-msg=E0203 
 49   
 50  # E0203: Access to member %r before its definition, since we use 
 51  # objects.py which doesn't explicitely initialise its members 
 52   
 53  import time 
 54  import random 
 55   
 56  from ganeti import utils 
 57  from ganeti import constants 
 58  from ganeti import objects 
 59  from ganeti import serializer 
 60  from ganeti import daemon # contains AsyncUDPSocket 
 61  from ganeti import errors 
 62  from ganeti import confd 
 63  from ganeti import ssconf 
 64  from ganeti import compat 
 65  from ganeti import netutils 
66 67 68 -class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
69 """Confd udp asyncore client 70 71 This is kept separate from the main ConfdClient to make sure it's easy to 72 implement a non-asyncore based client library. 73 74 """
75 - def __init__(self, client, family):
76 """Constructor for ConfdAsyncUDPClient 77 78 @type client: L{ConfdClient} 79 @param client: client library, to pass the datagrams to 80 81 """ 82 daemon.AsyncUDPSocket.__init__(self, family) 83 self.client = client
84 85 # this method is overriding a daemon.AsyncUDPSocket method
86 - def handle_datagram(self, payload, ip, port):
87 self.client.HandleResponse(payload, ip, port)
88
89 90 -class _Request(object):
91 """Request status structure. 92 93 @ivar request: the request data 94 @ivar args: any extra arguments for the callback 95 @ivar expiry: the expiry timestamp of the request 96 @ivar sent: the set of contacted peers 97 @ivar rcvd: the set of peers who replied 98 99 """
100 - def __init__(self, request, args, expiry, sent):
101 self.request = request 102 self.args = args 103 self.expiry = expiry 104 self.sent = frozenset(sent) 105 self.rcvd = set()
106
107 108 -class ConfdClient:
109 """Send queries to confd, and get back answers. 110 111 Since the confd model works by querying multiple master candidates, and 112 getting back answers, this is an asynchronous library. It can either work 113 through asyncore or with your own handling. 114 115 @type _requests: dict 116 @ivar _requests: dictionary indexes by salt, which contains data 117 about the outstanding requests; the values are objects of type 118 L{_Request} 119 120 """
121 - def __init__(self, hmac_key, peers, callback, port=None, logger=None):
122 """Constructor for ConfdClient 123 124 @type hmac_key: string 125 @param hmac_key: hmac key to talk to confd 126 @type peers: list 127 @param peers: list of peer nodes 128 @type callback: f(L{ConfdUpcallPayload}) 129 @param callback: function to call when getting answers 130 @type port: integer 131 @param port: confd port (default: use GetDaemonPort) 132 @type logger: logging.Logger 133 @param logger: optional logger for internal conditions 134 135 """ 136 if not callable(callback): 137 raise errors.ProgrammerError("callback must be callable") 138 139 self.UpdatePeerList(peers) 140 self._SetPeersAddressFamily() 141 self._hmac_key = hmac_key 142 self._socket = ConfdAsyncUDPClient(self, self._family) 143 self._callback = callback 144 self._confd_port = port 145 self._logger = logger 146 self._requests = {} 147 148 if self._confd_port is None: 149 self._confd_port = netutils.GetDaemonPort(constants.CONFD)
150
151 - def UpdatePeerList(self, peers):
152 """Update the list of peers 153 154 @type peers: list 155 @param peers: list of peer nodes 156 157 """ 158 # we are actually called from init, so: 159 # pylint: disable-msg=W0201 160 if not isinstance(peers, list): 161 raise errors.ProgrammerError("peers must be a list") 162 # make a copy of peers, since we're going to shuffle the list, later 163 self._peers = list(peers)
164
165 - def _PackRequest(self, request, now=None):
166 """Prepare a request to be sent on the wire. 167 168 This function puts a proper salt in a confd request, puts the proper salt, 169 and adds the correct magic number. 170 171 """ 172 if now is None: 173 now = time.time() 174 tstamp = '%d' % now 175 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp) 176 return confd.PackMagic(req)
177
178 - def _UnpackReply(self, payload):
179 in_payload = confd.UnpackMagic(payload) 180 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key) 181 answer = objects.ConfdReply.FromDict(dict_answer) 182 return answer, salt
183
184 - def ExpireRequests(self):
185 """Delete all the expired requests. 186 187 """ 188 now = time.time() 189 for rsalt, rq in self._requests.items(): 190 if now >= rq.expiry: 191 del self._requests[rsalt] 192 client_reply = ConfdUpcallPayload(salt=rsalt, 193 type=UPCALL_EXPIRE, 194 orig_request=rq.request, 195 extra_args=rq.args, 196 client=self, 197 ) 198 self._callback(client_reply)
199
200 - def SendRequest(self, request, args=None, coverage=0, async=True):
201 """Send a confd request to some MCs 202 203 @type request: L{objects.ConfdRequest} 204 @param request: the request to send 205 @type args: tuple 206 @param args: additional callback arguments 207 @type coverage: integer 208 @param coverage: number of remote nodes to contact; if default 209 (0), it will use a reasonable default 210 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is 211 passed, it will use the maximum number of peers, otherwise the 212 number passed in will be used 213 @type async: boolean 214 @param async: handle the write asynchronously 215 216 """ 217 if coverage == 0: 218 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE) 219 elif coverage == -1: 220 coverage = len(self._peers) 221 222 if coverage > len(self._peers): 223 raise errors.ConfdClientError("Not enough MCs known to provide the" 224 " desired coverage") 225 226 if not request.rsalt: 227 raise errors.ConfdClientError("Missing request rsalt") 228 229 self.ExpireRequests() 230 if request.rsalt in self._requests: 231 raise errors.ConfdClientError("Duplicate request rsalt") 232 233 if request.type not in constants.CONFD_REQS: 234 raise errors.ConfdClientError("Invalid request type") 235 236 random.shuffle(self._peers) 237 targets = self._peers[:coverage] 238 239 now = time.time() 240 payload = self._PackRequest(request, now=now) 241 242 for target in targets: 243 try: 244 self._socket.enqueue_send(target, self._confd_port, payload) 245 except errors.UdpDataSizeError: 246 raise errors.ConfdClientError("Request too big") 247 248 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT 249 self._requests[request.rsalt] = _Request(request, args, expire_time, 250 targets) 251 252 if not async: 253 self.FlushSendQueue()
254
255 - def HandleResponse(self, payload, ip, port):
256 """Asynchronous handler for a confd reply 257 258 Call the relevant callback associated to the current request. 259 260 """ 261 try: 262 try: 263 answer, salt = self._UnpackReply(payload) 264 except (errors.SignatureError, errors.ConfdMagicError), err: 265 if self._logger: 266 self._logger.debug("Discarding broken package: %s" % err) 267 return 268 269 try: 270 rq = self._requests[salt] 271 except KeyError: 272 if self._logger: 273 self._logger.debug("Discarding unknown (expired?) reply: %s" % err) 274 return 275 276 rq.rcvd.add(ip) 277 278 client_reply = ConfdUpcallPayload(salt=salt, 279 type=UPCALL_REPLY, 280 server_reply=answer, 281 orig_request=rq.request, 282 server_ip=ip, 283 server_port=port, 284 extra_args=rq.args, 285 client=self, 286 ) 287 self._callback(client_reply) 288 289 finally: 290 self.ExpireRequests()
291
292 - def FlushSendQueue(self):
293 """Send out all pending requests. 294 295 Can be used for synchronous client use. 296 297 """ 298 while self._socket.writable(): 299 self._socket.handle_write()
300
301 - def ReceiveReply(self, timeout=1):
302 """Receive one reply. 303 304 @type timeout: float 305 @param timeout: how long to wait for the reply 306 @rtype: boolean 307 @return: True if some data has been handled, False otherwise 308 309 """ 310 return self._socket.process_next_packet(timeout=timeout)
311 312 @staticmethod
313 - def _NeededReplies(peer_cnt):
314 """Compute the minimum safe number of replies for a query. 315 316 The algorithm is designed to work well for both small and big 317 number of peers: 318 - for less than three, we require all responses 319 - for less than five, we allow one miss 320 - otherwise, half the number plus one 321 322 This guarantees that we progress monotonically: 1->1, 2->2, 3->2, 323 4->2, 5->3, 6->3, 7->4, etc. 324 325 @type peer_cnt: int 326 @param peer_cnt: the number of peers contacted 327 @rtype: int 328 @return: the number of replies which should give a safe coverage 329 330 """ 331 if peer_cnt < 3: 332 return peer_cnt 333 elif peer_cnt < 5: 334 return peer_cnt - 1 335 else: 336 return int(peer_cnt/2) + 1
337
338 - def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
339 """Wait for replies to a given request. 340 341 This method will wait until either the timeout expires or a 342 minimum number (computed using L{_NeededReplies}) of replies are 343 received for the given salt. It is useful when doing synchronous 344 calls to this library. 345 346 @param salt: the salt of the request we want responses for 347 @param timeout: the maximum timeout (should be less or equal to 348 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT} 349 @rtype: tuple 350 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the 351 request is unknown, timed_out will be true and the counters 352 will be zero 353 354 """ 355 def _CheckResponse(): 356 if salt not in self._requests: 357 # expired? 358 if self._logger: 359 self._logger.debug("Discarding unknown/expired request: %s" % salt) 360 return MISSING 361 rq = self._requests[salt] 362 if len(rq.rcvd) >= expected: 363 # already got all replies 364 return (False, len(rq.sent), len(rq.rcvd)) 365 # else wait, using default timeout 366 self.ReceiveReply() 367 raise utils.RetryAgain()
368 369 MISSING = (True, 0, 0) 370 371 if salt not in self._requests: 372 return MISSING 373 # extend the expire time with the current timeout, so that we 374 # don't get the request expired from under us 375 rq = self._requests[salt] 376 rq.expiry += timeout 377 sent = len(rq.sent) 378 expected = self._NeededReplies(sent) 379 380 try: 381 return utils.Retry(_CheckResponse, 0, timeout) 382 except utils.RetryTimeout: 383 if salt in self._requests: 384 rq = self._requests[salt] 385 return (True, len(rq.sent), len(rq.rcvd)) 386 else: 387 return MISSING
388
389 - def _SetPeersAddressFamily(self):
390 if not self._peers: 391 raise errors.ConfdClientError("Peer list empty") 392 try: 393 peer = self._peers[0] 394 self._family = netutils.IPAddress.GetAddressFamily(peer) 395 for peer in self._peers[1:]: 396 if netutils.IPAddress.GetAddressFamily(peer) != self._family: 397 raise errors.ConfdClientError("Peers must be of same address family") 398 except errors.IPAddressError: 399 raise errors.ConfdClientError("Peer address %s invalid" % peer)
400 401 402 # UPCALL_REPLY: server reply upcall 403 # has all ConfdUpcallPayload fields populated 404 UPCALL_REPLY = 1 405 # UPCALL_EXPIRE: internal library request expire 406 # has only salt, type, orig_request and extra_args 407 UPCALL_EXPIRE = 2 408 CONFD_UPCALL_TYPES = frozenset([ 409 UPCALL_REPLY, 410 UPCALL_EXPIRE, 411 ])
412 413 414 -class ConfdUpcallPayload(objects.ConfigObject):
415 """Callback argument for confd replies 416 417 @type salt: string 418 @ivar salt: salt associated with the query 419 @type type: one of confd.client.CONFD_UPCALL_TYPES 420 @ivar type: upcall type (server reply, expired request, ...) 421 @type orig_request: L{objects.ConfdRequest} 422 @ivar orig_request: original request 423 @type server_reply: L{objects.ConfdReply} 424 @ivar server_reply: server reply 425 @type server_ip: string 426 @ivar server_ip: answering server ip address 427 @type server_port: int 428 @ivar server_port: answering server port 429 @type extra_args: any 430 @ivar extra_args: 'args' argument of the SendRequest function 431 @type client: L{ConfdClient} 432 @ivar client: current confd client instance 433 434 """ 435 __slots__ = [ 436 "salt", 437 "type", 438 "orig_request", 439 "server_reply", 440 "server_ip", 441 "server_port", 442 "extra_args", 443 "client", 444 ]
445
446 447 -class ConfdClientRequest(objects.ConfdRequest):
448 """This is the client-side version of ConfdRequest. 449 450 This version of the class helps creating requests, on the client side, by 451 filling in some default values. 452 453 """
454 - def __init__(self, **kwargs):
455 objects.ConfdRequest.__init__(self, **kwargs) 456 if not self.rsalt: 457 self.rsalt = utils.NewUUID() 458 if not self.protocol: 459 self.protocol = constants.CONFD_PROTOCOL_VERSION 460 if self.type not in constants.CONFD_REQS: 461 raise errors.ConfdClientError("Invalid request type")
462
463 464 -class ConfdFilterCallback:
465 """Callback that calls another callback, but filters duplicate results. 466 467 @ivar consistent: a dictionary indexed by salt; for each salt, if 468 all responses ware identical, this will be True; this is the 469 expected state on a healthy cluster; on inconsistent or 470 partitioned clusters, this might be False, if we see answers 471 with the same serial but different contents 472 473 """
474 - def __init__(self, callback, logger=None):
475 """Constructor for ConfdFilterCallback 476 477 @type callback: f(L{ConfdUpcallPayload}) 478 @param callback: function to call when getting answers 479 @type logger: logging.Logger 480 @param logger: optional logger for internal conditions 481 482 """ 483 if not callable(callback): 484 raise errors.ProgrammerError("callback must be callable") 485 486 self._callback = callback 487 self._logger = logger 488 # answers contains a dict of salt -> answer 489 self._answers = {} 490 self.consistent = {}
491
492 - def _LogFilter(self, salt, new_reply, old_reply):
493 if not self._logger: 494 return 495 496 if new_reply.serial > old_reply.serial: 497 self._logger.debug("Filtering confirming answer, with newer" 498 " serial for query %s" % salt) 499 elif new_reply.serial == old_reply.serial: 500 if new_reply.answer != old_reply.answer: 501 self._logger.warning("Got incoherent answers for query %s" 502 " (serial: %s)" % (salt, new_reply.serial)) 503 else: 504 self._logger.debug("Filtering confirming answer, with same" 505 " serial for query %s" % salt) 506 else: 507 self._logger.debug("Filtering outdated answer for query %s" 508 " serial: (%d < %d)" % (salt, old_reply.serial, 509 new_reply.serial))
510
511 - def _HandleExpire(self, up):
512 # if we have no answer we have received none, before the expiration. 513 if up.salt in self._answers: 514 del self._answers[up.salt] 515 if up.salt in self.consistent: 516 del self.consistent[up.salt]
517
518 - def _HandleReply(self, up):
519 """Handle a single confd reply, and decide whether to filter it. 520 521 @rtype: boolean 522 @return: True if the reply should be filtered, False if it should be passed 523 on to the up-callback 524 525 """ 526 filter_upcall = False 527 salt = up.salt 528 if salt not in self.consistent: 529 self.consistent[salt] = True 530 if salt not in self._answers: 531 # first answer for a query (don't filter, and record) 532 self._answers[salt] = up.server_reply 533 elif up.server_reply.serial > self._answers[salt].serial: 534 # newer answer (record, and compare contents) 535 old_answer = self._answers[salt] 536 self._answers[salt] = up.server_reply 537 if up.server_reply.answer == old_answer.answer: 538 # same content (filter) (version upgrade was unrelated) 539 filter_upcall = True 540 self._LogFilter(salt, up.server_reply, old_answer) 541 # else: different content, pass up a second answer 542 else: 543 # older or same-version answer (duplicate or outdated, filter) 544 if (up.server_reply.serial == self._answers[salt].serial and 545 up.server_reply.answer != self._answers[salt].answer): 546 self.consistent[salt] = False 547 filter_upcall = True 548 self._LogFilter(salt, up.server_reply, self._answers[salt]) 549 550 return filter_upcall
551
552 - def __call__(self, up):
553 """Filtering callback 554 555 @type up: L{ConfdUpcallPayload} 556 @param up: upper callback 557 558 """ 559 filter_upcall = False 560 if up.type == UPCALL_REPLY: 561 filter_upcall = self._HandleReply(up) 562 elif up.type == UPCALL_EXPIRE: 563 self._HandleExpire(up) 564 565 if not filter_upcall: 566 self._callback(up)
567
568 569 -class ConfdCountingCallback:
570 """Callback that calls another callback, and counts the answers 571 572 """
573 - def __init__(self, callback, logger=None):
574 """Constructor for ConfdCountingCallback 575 576 @type callback: f(L{ConfdUpcallPayload}) 577 @param callback: function to call when getting answers 578 @type logger: logging.Logger 579 @param logger: optional logger for internal conditions 580 581 """ 582 if not callable(callback): 583 raise errors.ProgrammerError("callback must be callable") 584 585 self._callback = callback 586 self._logger = logger 587 # answers contains a dict of salt -> count 588 self._answers = {}
589
590 - def RegisterQuery(self, salt):
591 if salt in self._answers: 592 raise errors.ProgrammerError("query already registered") 593 self._answers[salt] = 0
594
595 - def AllAnswered(self):
596 """Have all the registered queries received at least an answer? 597 598 """ 599 return compat.all(self._answers.values())
600
601 - def _HandleExpire(self, up):
602 # if we have no answer we have received none, before the expiration. 603 if up.salt in self._answers: 604 del self._answers[up.salt]
605
606 - def _HandleReply(self, up):
607 """Handle a single confd reply, and decide whether to filter it. 608 609 @rtype: boolean 610 @return: True if the reply should be filtered, False if it should be passed 611 on to the up-callback 612 613 """ 614 if up.salt in self._answers: 615 self._answers[up.salt] += 1
616
617 - def __call__(self, up):
618 """Filtering callback 619 620 @type up: L{ConfdUpcallPayload} 621 @param up: upper callback 622 623 """ 624 if up.type == UPCALL_REPLY: 625 self._HandleReply(up) 626 elif up.type == UPCALL_EXPIRE: 627 self._HandleExpire(up) 628 self._callback(up)
629
630 631 -class StoreResultCallback:
632 """Callback that simply stores the most recent answer. 633 634 @ivar _answers: dict of salt to (have_answer, reply) 635 636 """ 637 _NO_KEY = (False, None) 638
639 - def __init__(self):
640 """Constructor for StoreResultCallback 641 642 """ 643 # answers contains a dict of salt -> best result 644 self._answers = {}
645
646 - def GetResponse(self, salt):
647 """Return the best match for a salt 648 649 """ 650 return self._answers.get(salt, self._NO_KEY)
651
652 - def _HandleExpire(self, up):
653 """Expiration handler. 654 655 """ 656 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY: 657 del self._answers[up.salt]
658
659 - def _HandleReply(self, up):
660 """Handle a single confd reply, and decide whether to filter it. 661 662 """ 663 self._answers[up.salt] = (True, up)
664
665 - def __call__(self, up):
666 """Filtering callback 667 668 @type up: L{ConfdUpcallPayload} 669 @param up: upper callback 670 671 """ 672 if up.type == UPCALL_REPLY: 673 self._HandleReply(up) 674 elif up.type == UPCALL_EXPIRE: 675 self._HandleExpire(up)
676
677 678 -def GetConfdClient(callback):
679 """Return a client configured using the given callback. 680 681 This is handy to abstract the MC list and HMAC key reading. 682 683 @attention: This should only be called on nodes which are part of a 684 cluster, since it depends on a valid (ganeti) data directory; 685 for code running outside of a cluster, you need to create the 686 client manually 687 688 """ 689 ss = ssconf.SimpleStore() 690 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS) 691 mc_list = utils.ReadFile(mc_file).splitlines() 692 hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY) 693 return ConfdClient(hmac_key, mc_list, callback)
694