Package ganeti :: Package confd :: Module client
[hide private]
[frames] | no frames]

Source Code for Module ganeti.confd.client

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2009 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Ganeti confd client 
 23   
 24  Clients can use the confd client library to send requests to a group of master 
 25  candidates running confd. The expected usage is through the asyncore framework, 
 26  by sending queries, and asynchronously receiving replies through a callback. 
 27   
 28  This way the client library doesn't ever need to "wait" on a particular answer, 
 29  and can proceed even if some udp packets are lost. It's up to the user to 
 30  reschedule queries if they haven't received responses and they need them. 
 31   
 32  Example usage:: 
 33   
 34    client = ConfdClient(...) # includes callback specification 
 35    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) 
 36    client.SendRequest(req) 
 37    # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run() 
 38    # ... wait ... 
 39    # And your callback will be called by asyncore, when your query gets a 
 40    # response, or when it expires. 
 41   
 42  You can use the provided ConfdFilterCallback to act as a filter, only passing 
 43  "newer" answer to your callback, and filtering out outdated ones, or ones 
 44  confirming what you already got. 
 45   
 46  """ 
 47   
 48  # pylint: disable-msg=E0203 
 49   
 50  # E0203: Access to member %r before its definition, since we use 
 51  # objects.py which doesn't explicitely initialise its members 
 52   
 53  import time 
 54  import random 
 55   
 56  from ganeti import utils 
 57  from ganeti import constants 
 58  from ganeti import objects 
 59  from ganeti import serializer 
 60  from ganeti import daemon # contains AsyncUDPSocket 
 61  from ganeti import errors 
 62  from ganeti import confd 
 63  from ganeti import ssconf 
 64  from ganeti import compat 
65 66 67 -class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
68 """Confd udp asyncore client 69 70 This is kept separate from the main ConfdClient to make sure it's easy to 71 implement a non-asyncore based client library. 72 73 """
74 - def __init__(self, client):
75 """Constructor for ConfdAsyncUDPClient 76 77 @type client: L{ConfdClient} 78 @param client: client library, to pass the datagrams to 79 80 """ 81 daemon.AsyncUDPSocket.__init__(self) 82 self.client = client
83 84 # this method is overriding a daemon.AsyncUDPSocket method
85 - def handle_datagram(self, payload, ip, port):
86 self.client.HandleResponse(payload, ip, port)
87
88 89 -class _Request(object):
90 """Request status structure. 91 92 @ivar request: the request data 93 @ivar args: any extra arguments for the callback 94 @ivar expiry: the expiry timestamp of the request 95 @ivar sent: the set of contacted peers 96 @ivar rcvd: the set of peers who replied 97 98 """
99 - def __init__(self, request, args, expiry, sent):
100 self.request = request 101 self.args = args 102 self.expiry = expiry 103 self.sent = frozenset(sent) 104 self.rcvd = set()
105
106 107 -class ConfdClient:
108 """Send queries to confd, and get back answers. 109 110 Since the confd model works by querying multiple master candidates, and 111 getting back answers, this is an asynchronous library. It can either work 112 through asyncore or with your own handling. 113 114 @type _requests: dict 115 @ivar _requests: dictionary indexes by salt, which contains data 116 about the outstanding requests; the values are objects of type 117 L{_Request} 118 119 """
120 - def __init__(self, hmac_key, peers, callback, port=None, logger=None):
121 """Constructor for ConfdClient 122 123 @type hmac_key: string 124 @param hmac_key: hmac key to talk to confd 125 @type peers: list 126 @param peers: list of peer nodes 127 @type callback: f(L{ConfdUpcallPayload}) 128 @param callback: function to call when getting answers 129 @type port: integer 130 @param port: confd port (default: use GetDaemonPort) 131 @type logger: logging.Logger 132 @param logger: optional logger for internal conditions 133 134 """ 135 if not callable(callback): 136 raise errors.ProgrammerError("callback must be callable") 137 138 self.UpdatePeerList(peers) 139 self._hmac_key = hmac_key 140 self._socket = ConfdAsyncUDPClient(self) 141 self._callback = callback 142 self._confd_port = port 143 self._logger = logger 144 self._requests = {} 145 146 if self._confd_port is None: 147 self._confd_port = utils.GetDaemonPort(constants.CONFD)
148
149 - def UpdatePeerList(self, peers):
150 """Update the list of peers 151 152 @type peers: list 153 @param peers: list of peer nodes 154 155 """ 156 # we are actually called from init, so: 157 # pylint: disable-msg=W0201 158 if not isinstance(peers, list): 159 raise errors.ProgrammerError("peers must be a list") 160 # make a copy of peers, since we're going to shuffle the list, later 161 self._peers = list(peers)
162
163 - def _PackRequest(self, request, now=None):
164 """Prepare a request to be sent on the wire. 165 166 This function puts a proper salt in a confd request, puts the proper salt, 167 and adds the correct magic number. 168 169 """ 170 if now is None: 171 now = time.time() 172 tstamp = '%d' % now 173 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp) 174 return confd.PackMagic(req)
175
176 - def _UnpackReply(self, payload):
177 in_payload = confd.UnpackMagic(payload) 178 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key) 179 answer = objects.ConfdReply.FromDict(dict_answer) 180 return answer, salt
181
182 - def ExpireRequests(self):
183 """Delete all the expired requests. 184 185 """ 186 now = time.time() 187 for rsalt, rq in self._requests.items(): 188 if now >= rq.expiry: 189 del self._requests[rsalt] 190 client_reply = ConfdUpcallPayload(salt=rsalt, 191 type=UPCALL_EXPIRE, 192 orig_request=rq.request, 193 extra_args=rq.args, 194 client=self, 195 ) 196 self._callback(client_reply)
197
198 - def SendRequest(self, request, args=None, coverage=0, async=True):
199 """Send a confd request to some MCs 200 201 @type request: L{objects.ConfdRequest} 202 @param request: the request to send 203 @type args: tuple 204 @param args: additional callback arguments 205 @type coverage: integer 206 @param coverage: number of remote nodes to contact; if default 207 (0), it will use a reasonable default 208 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is 209 passed, it will use the maximum number of peers, otherwise the 210 number passed in will be used 211 @type async: boolean 212 @param async: handle the write asynchronously 213 214 """ 215 if coverage == 0: 216 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE) 217 elif coverage == -1: 218 coverage = len(self._peers) 219 220 if coverage > len(self._peers): 221 raise errors.ConfdClientError("Not enough MCs known to provide the" 222 " desired coverage") 223 224 if not request.rsalt: 225 raise errors.ConfdClientError("Missing request rsalt") 226 227 self.ExpireRequests() 228 if request.rsalt in self._requests: 229 raise errors.ConfdClientError("Duplicate request rsalt") 230 231 if request.type not in constants.CONFD_REQS: 232 raise errors.ConfdClientError("Invalid request type") 233 234 random.shuffle(self._peers) 235 targets = self._peers[:coverage] 236 237 now = time.time() 238 payload = self._PackRequest(request, now=now) 239 240 for target in targets: 241 try: 242 self._socket.enqueue_send(target, self._confd_port, payload) 243 except errors.UdpDataSizeError: 244 raise errors.ConfdClientError("Request too big") 245 246 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT 247 self._requests[request.rsalt] = _Request(request, args, expire_time, 248 targets) 249 250 if not async: 251 self.FlushSendQueue()
252
253 - def HandleResponse(self, payload, ip, port):
254 """Asynchronous handler for a confd reply 255 256 Call the relevant callback associated to the current request. 257 258 """ 259 try: 260 try: 261 answer, salt = self._UnpackReply(payload) 262 except (errors.SignatureError, errors.ConfdMagicError), err: 263 if self._logger: 264 self._logger.debug("Discarding broken package: %s" % err) 265 return 266 267 try: 268 rq = self._requests[salt] 269 except KeyError: 270 if self._logger: 271 self._logger.debug("Discarding unknown (expired?) reply: %s" % err) 272 return 273 274 rq.rcvd.add(ip) 275 276 client_reply = ConfdUpcallPayload(salt=salt, 277 type=UPCALL_REPLY, 278 server_reply=answer, 279 orig_request=rq.request, 280 server_ip=ip, 281 server_port=port, 282 extra_args=rq.args, 283 client=self, 284 ) 285 self._callback(client_reply) 286 287 finally: 288 self.ExpireRequests()
289
290 - def FlushSendQueue(self):
291 """Send out all pending requests. 292 293 Can be used for synchronous client use. 294 295 """ 296 while self._socket.writable(): 297 self._socket.handle_write()
298
299 - def ReceiveReply(self, timeout=1):
300 """Receive one reply. 301 302 @type timeout: float 303 @param timeout: how long to wait for the reply 304 @rtype: boolean 305 @return: True if some data has been handled, False otherwise 306 307 """ 308 return self._socket.process_next_packet(timeout=timeout)
309 310 @staticmethod
311 - def _NeededReplies(peer_cnt):
312 """Compute the minimum safe number of replies for a query. 313 314 The algorithm is designed to work well for both small and big 315 number of peers: 316 - for less than three, we require all responses 317 - for less than five, we allow one miss 318 - otherwise, half the number plus one 319 320 This guarantees that we progress monotonically: 1->1, 2->2, 3->2, 321 4->2, 5->3, 6->3, 7->4, etc. 322 323 @type peer_cnt: int 324 @param peer_cnt: the number of peers contacted 325 @rtype: int 326 @return: the number of replies which should give a safe coverage 327 328 """ 329 if peer_cnt < 3: 330 return peer_cnt 331 elif peer_cnt < 5: 332 return peer_cnt - 1 333 else: 334 return int(peer_cnt/2) + 1
335
336 - def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
337 """Wait for replies to a given request. 338 339 This method will wait until either the timeout expires or a 340 minimum number (computed using L{_NeededReplies}) of replies are 341 received for the given salt. It is useful when doing synchronous 342 calls to this library. 343 344 @param salt: the salt of the request we want responses for 345 @param timeout: the maximum timeout (should be less or equal to 346 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT} 347 @rtype: tuple 348 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the 349 request is unknown, timed_out will be true and the counters 350 will be zero 351 352 """ 353 def _CheckResponse(): 354 if salt not in self._requests: 355 # expired? 356 if self._logger: 357 self._logger.debug("Discarding unknown/expired request: %s" % salt) 358 return MISSING 359 rq = self._requests[salt] 360 if len(rq.rcvd) >= expected: 361 # already got all replies 362 return (False, len(rq.sent), len(rq.rcvd)) 363 # else wait, using default timeout 364 self.ReceiveReply() 365 raise utils.RetryAgain()
366 367 MISSING = (True, 0, 0) 368 369 if salt not in self._requests: 370 return MISSING 371 # extend the expire time with the current timeout, so that we 372 # don't get the request expired from under us 373 rq = self._requests[salt] 374 rq.expiry += timeout 375 sent = len(rq.sent) 376 expected = self._NeededReplies(sent) 377 378 try: 379 return utils.Retry(_CheckResponse, 0, timeout) 380 except utils.RetryTimeout: 381 if salt in self._requests: 382 rq = self._requests[salt] 383 return (True, len(rq.sent), len(rq.rcvd)) 384 else: 385 return MISSING
386 387 388 # UPCALL_REPLY: server reply upcall 389 # has all ConfdUpcallPayload fields populated 390 UPCALL_REPLY = 1 391 # UPCALL_EXPIRE: internal library request expire 392 # has only salt, type, orig_request and extra_args 393 UPCALL_EXPIRE = 2 394 CONFD_UPCALL_TYPES = frozenset([ 395 UPCALL_REPLY, 396 UPCALL_EXPIRE, 397 ])
398 399 400 -class ConfdUpcallPayload(objects.ConfigObject):
401 """Callback argument for confd replies 402 403 @type salt: string 404 @ivar salt: salt associated with the query 405 @type type: one of confd.client.CONFD_UPCALL_TYPES 406 @ivar type: upcall type (server reply, expired request, ...) 407 @type orig_request: L{objects.ConfdRequest} 408 @ivar orig_request: original request 409 @type server_reply: L{objects.ConfdReply} 410 @ivar server_reply: server reply 411 @type server_ip: string 412 @ivar server_ip: answering server ip address 413 @type server_port: int 414 @ivar server_port: answering server port 415 @type extra_args: any 416 @ivar extra_args: 'args' argument of the SendRequest function 417 @type client: L{ConfdClient} 418 @ivar client: current confd client instance 419 420 """ 421 __slots__ = [ 422 "salt", 423 "type", 424 "orig_request", 425 "server_reply", 426 "server_ip", 427 "server_port", 428 "extra_args", 429 "client", 430 ]
431
432 433 -class ConfdClientRequest(objects.ConfdRequest):
434 """This is the client-side version of ConfdRequest. 435 436 This version of the class helps creating requests, on the client side, by 437 filling in some default values. 438 439 """
440 - def __init__(self, **kwargs):
441 objects.ConfdRequest.__init__(self, **kwargs) 442 if not self.rsalt: 443 self.rsalt = utils.NewUUID() 444 if not self.protocol: 445 self.protocol = constants.CONFD_PROTOCOL_VERSION 446 if self.type not in constants.CONFD_REQS: 447 raise errors.ConfdClientError("Invalid request type")
448
449 450 -class ConfdFilterCallback:
451 """Callback that calls another callback, but filters duplicate results. 452 453 @ivar consistent: a dictionary indexed by salt; for each salt, if 454 all responses ware identical, this will be True; this is the 455 expected state on a healthy cluster; on inconsistent or 456 partitioned clusters, this might be False, if we see answers 457 with the same serial but different contents 458 459 """
460 - def __init__(self, callback, logger=None):
461 """Constructor for ConfdFilterCallback 462 463 @type callback: f(L{ConfdUpcallPayload}) 464 @param callback: function to call when getting answers 465 @type logger: logging.Logger 466 @param logger: optional logger for internal conditions 467 468 """ 469 if not callable(callback): 470 raise errors.ProgrammerError("callback must be callable") 471 472 self._callback = callback 473 self._logger = logger 474 # answers contains a dict of salt -> answer 475 self._answers = {} 476 self.consistent = {}
477
478 - def _LogFilter(self, salt, new_reply, old_reply):
479 if not self._logger: 480 return 481 482 if new_reply.serial > old_reply.serial: 483 self._logger.debug("Filtering confirming answer, with newer" 484 " serial for query %s" % salt) 485 elif new_reply.serial == old_reply.serial: 486 if new_reply.answer != old_reply.answer: 487 self._logger.warning("Got incoherent answers for query %s" 488 " (serial: %s)" % (salt, new_reply.serial)) 489 else: 490 self._logger.debug("Filtering confirming answer, with same" 491 " serial for query %s" % salt) 492 else: 493 self._logger.debug("Filtering outdated answer for query %s" 494 " serial: (%d < %d)" % (salt, old_reply.serial, 495 new_reply.serial))
496
497 - def _HandleExpire(self, up):
498 # if we have no answer we have received none, before the expiration. 499 if up.salt in self._answers: 500 del self._answers[up.salt] 501 if up.salt in self.consistent: 502 del self.consistent[up.salt]
503
504 - def _HandleReply(self, up):
505 """Handle a single confd reply, and decide whether to filter it. 506 507 @rtype: boolean 508 @return: True if the reply should be filtered, False if it should be passed 509 on to the up-callback 510 511 """ 512 filter_upcall = False 513 salt = up.salt 514 if salt not in self.consistent: 515 self.consistent[salt] = True 516 if salt not in self._answers: 517 # first answer for a query (don't filter, and record) 518 self._answers[salt] = up.server_reply 519 elif up.server_reply.serial > self._answers[salt].serial: 520 # newer answer (record, and compare contents) 521 old_answer = self._answers[salt] 522 self._answers[salt] = up.server_reply 523 if up.server_reply.answer == old_answer.answer: 524 # same content (filter) (version upgrade was unrelated) 525 filter_upcall = True 526 self._LogFilter(salt, up.server_reply, old_answer) 527 # else: different content, pass up a second answer 528 else: 529 # older or same-version answer (duplicate or outdated, filter) 530 if (up.server_reply.serial == self._answers[salt].serial and 531 up.server_reply.answer != self._answers[salt].answer): 532 self.consistent[salt] = False 533 filter_upcall = True 534 self._LogFilter(salt, up.server_reply, self._answers[salt]) 535 536 return filter_upcall
537
538 - def __call__(self, up):
539 """Filtering callback 540 541 @type up: L{ConfdUpcallPayload} 542 @param up: upper callback 543 544 """ 545 filter_upcall = False 546 if up.type == UPCALL_REPLY: 547 filter_upcall = self._HandleReply(up) 548 elif up.type == UPCALL_EXPIRE: 549 self._HandleExpire(up) 550 551 if not filter_upcall: 552 self._callback(up)
553
554 555 -class ConfdCountingCallback:
556 """Callback that calls another callback, and counts the answers 557 558 """
559 - def __init__(self, callback, logger=None):
560 """Constructor for ConfdCountingCallback 561 562 @type callback: f(L{ConfdUpcallPayload}) 563 @param callback: function to call when getting answers 564 @type logger: logging.Logger 565 @param logger: optional logger for internal conditions 566 567 """ 568 if not callable(callback): 569 raise errors.ProgrammerError("callback must be callable") 570 571 self._callback = callback 572 self._logger = logger 573 # answers contains a dict of salt -> count 574 self._answers = {}
575
576 - def RegisterQuery(self, salt):
577 if salt in self._answers: 578 raise errors.ProgrammerError("query already registered") 579 self._answers[salt] = 0
580
581 - def AllAnswered(self):
582 """Have all the registered queries received at least an answer? 583 584 """ 585 return compat.all(self._answers.values())
586
587 - def _HandleExpire(self, up):
588 # if we have no answer we have received none, before the expiration. 589 if up.salt in self._answers: 590 del self._answers[up.salt]
591
592 - def _HandleReply(self, up):
593 """Handle a single confd reply, and decide whether to filter it. 594 595 @rtype: boolean 596 @return: True if the reply should be filtered, False if it should be passed 597 on to the up-callback 598 599 """ 600 if up.salt in self._answers: 601 self._answers[up.salt] += 1
602
603 - def __call__(self, up):
604 """Filtering callback 605 606 @type up: L{ConfdUpcallPayload} 607 @param up: upper callback 608 609 """ 610 if up.type == UPCALL_REPLY: 611 self._HandleReply(up) 612 elif up.type == UPCALL_EXPIRE: 613 self._HandleExpire(up) 614 self._callback(up)
615
616 617 -class StoreResultCallback:
618 """Callback that simply stores the most recent answer. 619 620 @ivar _answers: dict of salt to (have_answer, reply) 621 622 """ 623 _NO_KEY = (False, None) 624
625 - def __init__(self):
626 """Constructor for StoreResultCallback 627 628 """ 629 # answers contains a dict of salt -> best result 630 self._answers = {}
631
632 - def GetResponse(self, salt):
633 """Return the best match for a salt 634 635 """ 636 return self._answers.get(salt, self._NO_KEY)
637
638 - def _HandleExpire(self, up):
639 """Expiration handler. 640 641 """ 642 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY: 643 del self._answers[up.salt]
644
645 - def _HandleReply(self, up):
646 """Handle a single confd reply, and decide whether to filter it. 647 648 """ 649 self._answers[up.salt] = (True, up)
650
651 - def __call__(self, up):
652 """Filtering callback 653 654 @type up: L{ConfdUpcallPayload} 655 @param up: upper callback 656 657 """ 658 if up.type == UPCALL_REPLY: 659 self._HandleReply(up) 660 elif up.type == UPCALL_EXPIRE: 661 self._HandleExpire(up)
662
663 664 -def GetConfdClient(callback):
665 """Return a client configured using the given callback. 666 667 This is handy to abstract the MC list and HMAC key reading. 668 669 @attention: This should only be called on nodes which are part of a 670 cluster, since it depends on a valid (ganeti) data directory; 671 for code running outside of a cluster, you need to create the 672 client manually 673 674 """ 675 ss = ssconf.SimpleStore() 676 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS) 677 mc_list = utils.ReadFile(mc_file).splitlines() 678 hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY) 679 return ConfdClient(hmac_key, mc_list, callback)
680