1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage::
33
34 client = ConfdClient(...) # includes callback specification
35 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 client.SendRequest(req)
37 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 # ... wait ...
39 # And your callback will be called by asyncore, when your query gets a
40 # response, or when it expires.
41
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
45
46 """
47
48
49
50
51
52
53 import time
54 import random
55
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
65 from ganeti import netutils
66 from ganeti import pathutils
70 """Confd udp asyncore client
71
72 This is kept separate from the main ConfdClient to make sure it's easy to
73 implement a non-asyncore based client library.
74
75 """
85
86
89
92 """Request status structure.
93
94 @ivar request: the request data
95 @ivar args: any extra arguments for the callback
96 @ivar expiry: the expiry timestamp of the request
97 @ivar sent: the set of contacted peers
98 @ivar rcvd: the set of peers who replied
99
100 """
101 - def __init__(self, request, args, expiry, sent):
102 self.request = request
103 self.args = args
104 self.expiry = expiry
105 self.sent = frozenset(sent)
106 self.rcvd = set()
107
110 """Send queries to confd, and get back answers.
111
112 Since the confd model works by querying multiple master candidates, and
113 getting back answers, this is an asynchronous library. It can either work
114 through asyncore or with your own handling.
115
116 @type _requests: dict
117 @ivar _requests: dictionary indexes by salt, which contains data
118 about the outstanding requests; the values are objects of type
119 L{_Request}
120
121 """
122 - def __init__(self, hmac_key, peers, callback, port=None, logger=None):
123 """Constructor for ConfdClient
124
125 @type hmac_key: string
126 @param hmac_key: hmac key to talk to confd
127 @type peers: list
128 @param peers: list of peer nodes
129 @type callback: f(L{ConfdUpcallPayload})
130 @param callback: function to call when getting answers
131 @type port: integer
132 @param port: confd port (default: use GetDaemonPort)
133 @type logger: logging.Logger
134 @param logger: optional logger for internal conditions
135
136 """
137 if not callable(callback):
138 raise errors.ProgrammerError("callback must be callable")
139
140 self.UpdatePeerList(peers)
141 self._SetPeersAddressFamily()
142 self._hmac_key = hmac_key
143 self._socket = ConfdAsyncUDPClient(self, self._family)
144 self._callback = callback
145 self._confd_port = port
146 self._logger = logger
147 self._requests = {}
148
149 if self._confd_port is None:
150 self._confd_port = netutils.GetDaemonPort(constants.CONFD)
151
153 """Update the list of peers
154
155 @type peers: list
156 @param peers: list of peer nodes
157
158 """
159
160
161 if not isinstance(peers, list):
162 raise errors.ProgrammerError("peers must be a list")
163
164 self._peers = list(peers)
165
167 """Prepare a request to be sent on the wire.
168
169 This function puts a proper salt in a confd request, puts the proper salt,
170 and adds the correct magic number.
171
172 """
173 if now is None:
174 now = time.time()
175 tstamp = "%d" % now
176 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
177 return confd.PackMagic(req)
178
184
186 """Delete all the expired requests.
187
188 """
189 now = time.time()
190 for rsalt, rq in self._requests.items():
191 if now >= rq.expiry:
192 del self._requests[rsalt]
193 client_reply = ConfdUpcallPayload(salt=rsalt,
194 type=UPCALL_EXPIRE,
195 orig_request=rq.request,
196 extra_args=rq.args,
197 client=self,
198 )
199 self._callback(client_reply)
200
201 - def SendRequest(self, request, args=None, coverage=0, async=True):
202 """Send a confd request to some MCs
203
204 @type request: L{objects.ConfdRequest}
205 @param request: the request to send
206 @type args: tuple
207 @param args: additional callback arguments
208 @type coverage: integer
209 @param coverage: number of remote nodes to contact; if default
210 (0), it will use a reasonable default
211 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
212 passed, it will use the maximum number of peers, otherwise the
213 number passed in will be used
214 @type async: boolean
215 @param async: handle the write asynchronously
216
217 """
218 if coverage == 0:
219 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
220 elif coverage == -1:
221 coverage = len(self._peers)
222
223 if coverage > len(self._peers):
224 raise errors.ConfdClientError("Not enough MCs known to provide the"
225 " desired coverage")
226
227 if not request.rsalt:
228 raise errors.ConfdClientError("Missing request rsalt")
229
230 self.ExpireRequests()
231 if request.rsalt in self._requests:
232 raise errors.ConfdClientError("Duplicate request rsalt")
233
234 if request.type not in constants.CONFD_REQS:
235 raise errors.ConfdClientError("Invalid request type")
236
237 random.shuffle(self._peers)
238 targets = self._peers[:coverage]
239
240 now = time.time()
241 payload = self._PackRequest(request, now=now)
242
243 for target in targets:
244 try:
245 self._socket.enqueue_send(target, self._confd_port, payload)
246 except errors.UdpDataSizeError:
247 raise errors.ConfdClientError("Request too big")
248
249 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
250 self._requests[request.rsalt] = _Request(request, args, expire_time,
251 targets)
252
253 if not async:
254 self.FlushSendQueue()
255
257 """Asynchronous handler for a confd reply
258
259 Call the relevant callback associated to the current request.
260
261 """
262 try:
263 try:
264 answer, salt = self._UnpackReply(payload)
265 except (errors.SignatureError, errors.ConfdMagicError), err:
266 if self._logger:
267 self._logger.debug("Discarding broken package: %s" % err)
268 return
269
270 try:
271 rq = self._requests[salt]
272 except KeyError:
273 if self._logger:
274 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
275 return
276
277 rq.rcvd.add(ip)
278
279 client_reply = ConfdUpcallPayload(salt=salt,
280 type=UPCALL_REPLY,
281 server_reply=answer,
282 orig_request=rq.request,
283 server_ip=ip,
284 server_port=port,
285 extra_args=rq.args,
286 client=self,
287 )
288 self._callback(client_reply)
289
290 finally:
291 self.ExpireRequests()
292
294 """Send out all pending requests.
295
296 Can be used for synchronous client use.
297
298 """
299 while self._socket.writable():
300 self._socket.handle_write()
301
303 """Receive one reply.
304
305 @type timeout: float
306 @param timeout: how long to wait for the reply
307 @rtype: boolean
308 @return: True if some data has been handled, False otherwise
309
310 """
311 return self._socket.process_next_packet(timeout=timeout)
312
313 @staticmethod
315 """Compute the minimum safe number of replies for a query.
316
317 The algorithm is designed to work well for both small and big
318 number of peers:
319 - for less than three, we require all responses
320 - for less than five, we allow one miss
321 - otherwise, half the number plus one
322
323 This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
324 4->2, 5->3, 6->3, 7->4, etc.
325
326 @type peer_cnt: int
327 @param peer_cnt: the number of peers contacted
328 @rtype: int
329 @return: the number of replies which should give a safe coverage
330
331 """
332 if peer_cnt < 3:
333 return peer_cnt
334 elif peer_cnt < 5:
335 return peer_cnt - 1
336 else:
337 return int(peer_cnt / 2) + 1
338
340 """Wait for replies to a given request.
341
342 This method will wait until either the timeout expires or a
343 minimum number (computed using L{_NeededReplies}) of replies are
344 received for the given salt. It is useful when doing synchronous
345 calls to this library.
346
347 @param salt: the salt of the request we want responses for
348 @param timeout: the maximum timeout (should be less or equal to
349 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
350 @rtype: tuple
351 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
352 request is unknown, timed_out will be true and the counters
353 will be zero
354
355 """
356 def _CheckResponse():
357 if salt not in self._requests:
358
359 if self._logger:
360 self._logger.debug("Discarding unknown/expired request: %s" % salt)
361 return MISSING
362 rq = self._requests[salt]
363 if len(rq.rcvd) >= expected:
364
365 return (False, len(rq.sent), len(rq.rcvd))
366
367 self.ReceiveReply()
368 raise utils.RetryAgain()
369
370 MISSING = (True, 0, 0)
371
372 if salt not in self._requests:
373 return MISSING
374
375
376 rq = self._requests[salt]
377 rq.expiry += timeout
378 sent = len(rq.sent)
379 expected = self._NeededReplies(sent)
380
381 try:
382 return utils.Retry(_CheckResponse, 0, timeout)
383 except utils.RetryTimeout:
384 if salt in self._requests:
385 rq = self._requests[salt]
386 return (True, len(rq.sent), len(rq.rcvd))
387 else:
388 return MISSING
389
401
402
403
404
405 UPCALL_REPLY = 1
406
407
408 UPCALL_EXPIRE = 2
409 CONFD_UPCALL_TYPES = compat.UniqueFrozenset([
410 UPCALL_REPLY,
411 UPCALL_EXPIRE,
412 ])
416 """Callback argument for confd replies
417
418 @type salt: string
419 @ivar salt: salt associated with the query
420 @type type: one of confd.client.CONFD_UPCALL_TYPES
421 @ivar type: upcall type (server reply, expired request, ...)
422 @type orig_request: L{objects.ConfdRequest}
423 @ivar orig_request: original request
424 @type server_reply: L{objects.ConfdReply}
425 @ivar server_reply: server reply
426 @type server_ip: string
427 @ivar server_ip: answering server ip address
428 @type server_port: int
429 @ivar server_port: answering server port
430 @type extra_args: any
431 @ivar extra_args: 'args' argument of the SendRequest function
432 @type client: L{ConfdClient}
433 @ivar client: current confd client instance
434
435 """
436 __slots__ = [
437 "salt",
438 "type",
439 "orig_request",
440 "server_reply",
441 "server_ip",
442 "server_port",
443 "extra_args",
444 "client",
445 ]
446
449 """This is the client-side version of ConfdRequest.
450
451 This version of the class helps creating requests, on the client side, by
452 filling in some default values.
453
454 """
463
466 """Callback that calls another callback, but filters duplicate results.
467
468 @ivar consistent: a dictionary indexed by salt; for each salt, if
469 all responses ware identical, this will be True; this is the
470 expected state on a healthy cluster; on inconsistent or
471 partitioned clusters, this might be False, if we see answers
472 with the same serial but different contents
473
474 """
475 - def __init__(self, callback, logger=None):
476 """Constructor for ConfdFilterCallback
477
478 @type callback: f(L{ConfdUpcallPayload})
479 @param callback: function to call when getting answers
480 @type logger: logging.Logger
481 @param logger: optional logger for internal conditions
482
483 """
484 if not callable(callback):
485 raise errors.ProgrammerError("callback must be callable")
486
487 self._callback = callback
488 self._logger = logger
489
490 self._answers = {}
491 self.consistent = {}
492
493 - def _LogFilter(self, salt, new_reply, old_reply):
494 if not self._logger:
495 return
496
497 if new_reply.serial > old_reply.serial:
498 self._logger.debug("Filtering confirming answer, with newer"
499 " serial for query %s" % salt)
500 elif new_reply.serial == old_reply.serial:
501 if new_reply.answer != old_reply.answer:
502 self._logger.warning("Got incoherent answers for query %s"
503 " (serial: %s)" % (salt, new_reply.serial))
504 else:
505 self._logger.debug("Filtering confirming answer, with same"
506 " serial for query %s" % salt)
507 else:
508 self._logger.debug("Filtering outdated answer for query %s"
509 " serial: (%d < %d)" % (salt, old_reply.serial,
510 new_reply.serial))
511
513
514 if up.salt in self._answers:
515 del self._answers[up.salt]
516 if up.salt in self.consistent:
517 del self.consistent[up.salt]
518
520 """Handle a single confd reply, and decide whether to filter it.
521
522 @rtype: boolean
523 @return: True if the reply should be filtered, False if it should be passed
524 on to the up-callback
525
526 """
527 filter_upcall = False
528 salt = up.salt
529 if salt not in self.consistent:
530 self.consistent[salt] = True
531 if salt not in self._answers:
532
533 self._answers[salt] = up.server_reply
534 elif up.server_reply.serial > self._answers[salt].serial:
535
536 old_answer = self._answers[salt]
537 self._answers[salt] = up.server_reply
538 if up.server_reply.answer == old_answer.answer:
539
540 filter_upcall = True
541 self._LogFilter(salt, up.server_reply, old_answer)
542
543 else:
544
545 if (up.server_reply.serial == self._answers[salt].serial and
546 up.server_reply.answer != self._answers[salt].answer):
547 self.consistent[salt] = False
548 filter_upcall = True
549 self._LogFilter(salt, up.server_reply, self._answers[salt])
550
551 return filter_upcall
552
554 """Filtering callback
555
556 @type up: L{ConfdUpcallPayload}
557 @param up: upper callback
558
559 """
560 filter_upcall = False
561 if up.type == UPCALL_REPLY:
562 filter_upcall = self._HandleReply(up)
563 elif up.type == UPCALL_EXPIRE:
564 self._HandleExpire(up)
565
566 if not filter_upcall:
567 self._callback(up)
568
571 """Callback that calls another callback, and counts the answers
572
573 """
574 - def __init__(self, callback, logger=None):
575 """Constructor for ConfdCountingCallback
576
577 @type callback: f(L{ConfdUpcallPayload})
578 @param callback: function to call when getting answers
579 @type logger: logging.Logger
580 @param logger: optional logger for internal conditions
581
582 """
583 if not callable(callback):
584 raise errors.ProgrammerError("callback must be callable")
585
586 self._callback = callback
587 self._logger = logger
588
589 self._answers = {}
590
592 if salt in self._answers:
593 raise errors.ProgrammerError("query already registered")
594 self._answers[salt] = 0
595
597 """Have all the registered queries received at least an answer?
598
599 """
600 return compat.all(self._answers.values())
601
603
604 if up.salt in self._answers:
605 del self._answers[up.salt]
606
608 """Handle a single confd reply, and decide whether to filter it.
609
610 @rtype: boolean
611 @return: True if the reply should be filtered, False if it should be passed
612 on to the up-callback
613
614 """
615 if up.salt in self._answers:
616 self._answers[up.salt] += 1
617
619 """Filtering callback
620
621 @type up: L{ConfdUpcallPayload}
622 @param up: upper callback
623
624 """
625 if up.type == UPCALL_REPLY:
626 self._HandleReply(up)
627 elif up.type == UPCALL_EXPIRE:
628 self._HandleExpire(up)
629 self._callback(up)
630
633 """Callback that simply stores the most recent answer.
634
635 @ivar _answers: dict of salt to (have_answer, reply)
636
637 """
638 _NO_KEY = (False, None)
639
641 """Constructor for StoreResultCallback
642
643 """
644
645 self._answers = {}
646
648 """Return the best match for a salt
649
650 """
651 return self._answers.get(salt, self._NO_KEY)
652
654 """Expiration handler.
655
656 """
657 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
658 del self._answers[up.salt]
659
661 """Handle a single confd reply, and decide whether to filter it.
662
663 """
664 self._answers[up.salt] = (True, up)
665
677
680 """Return a client configured using the given callback.
681
682 This is handy to abstract the MC list and HMAC key reading.
683
684 @attention: This should only be called on nodes which are part of a
685 cluster, since it depends on a valid (ganeti) data directory;
686 for code running outside of a cluster, you need to create the
687 client manually
688
689 """
690 ss = ssconf.SimpleStore()
691 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
692 mc_list = utils.ReadFile(mc_file).splitlines()
693 hmac_key = utils.ReadFile(pathutils.CONFD_HMAC_KEY)
694 return ConfdClient(hmac_key, mc_list, callback)
695