1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage::
33
34 client = ConfdClient(...) # includes callback specification
35 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 client.SendRequest(req)
37 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 # ... wait ...
39 # And your callback will be called by asyncore, when your query gets a
40 # response, or when it expires.
41
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
45
46 """
47
48
49
50
51
52
53 import time
54 import random
55
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
68 """Confd udp asyncore client
69
70 This is kept separate from the main ConfdClient to make sure it's easy to
71 implement a non-asyncore based client library.
72
73 """
75 """Constructor for ConfdAsyncUDPClient
76
77 @type client: L{ConfdClient}
78 @param client: client library, to pass the datagrams to
79
80 """
81 daemon.AsyncUDPSocket.__init__(self)
82 self.client = client
83
84
87
90 """Request status structure.
91
92 @ivar request: the request data
93 @ivar args: any extra arguments for the callback
94 @ivar expiry: the expiry timestamp of the request
95 @ivar sent: the set of contacted peers
96 @ivar rcvd: the set of peers who replied
97
98 """
99 - def __init__(self, request, args, expiry, sent):
100 self.request = request
101 self.args = args
102 self.expiry = expiry
103 self.sent = frozenset(sent)
104 self.rcvd = set()
105
108 """Send queries to confd, and get back answers.
109
110 Since the confd model works by querying multiple master candidates, and
111 getting back answers, this is an asynchronous library. It can either work
112 through asyncore or with your own handling.
113
114 @type _requests: dict
115 @ivar _requests: dictionary indexes by salt, which contains data
116 about the outstanding requests; the values are objects of type
117 L{_Request}
118
119 """
120 - def __init__(self, hmac_key, peers, callback, port=None, logger=None):
121 """Constructor for ConfdClient
122
123 @type hmac_key: string
124 @param hmac_key: hmac key to talk to confd
125 @type peers: list
126 @param peers: list of peer nodes
127 @type callback: f(L{ConfdUpcallPayload})
128 @param callback: function to call when getting answers
129 @type port: integer
130 @param port: confd port (default: use GetDaemonPort)
131 @type logger: logging.Logger
132 @param logger: optional logger for internal conditions
133
134 """
135 if not callable(callback):
136 raise errors.ProgrammerError("callback must be callable")
137
138 self.UpdatePeerList(peers)
139 self._hmac_key = hmac_key
140 self._socket = ConfdAsyncUDPClient(self)
141 self._callback = callback
142 self._confd_port = port
143 self._logger = logger
144 self._requests = {}
145
146 if self._confd_port is None:
147 self._confd_port = utils.GetDaemonPort(constants.CONFD)
148
150 """Update the list of peers
151
152 @type peers: list
153 @param peers: list of peer nodes
154
155 """
156
157
158 if not isinstance(peers, list):
159 raise errors.ProgrammerError("peers must be a list")
160
161 self._peers = list(peers)
162
164 """Prepare a request to be sent on the wire.
165
166 This function puts a proper salt in a confd request, puts the proper salt,
167 and adds the correct magic number.
168
169 """
170 if now is None:
171 now = time.time()
172 tstamp = '%d' % now
173 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
174 return confd.PackMagic(req)
175
181
183 """Delete all the expired requests.
184
185 """
186 now = time.time()
187 for rsalt, rq in self._requests.items():
188 if now >= rq.expiry:
189 del self._requests[rsalt]
190 client_reply = ConfdUpcallPayload(salt=rsalt,
191 type=UPCALL_EXPIRE,
192 orig_request=rq.request,
193 extra_args=rq.args,
194 client=self,
195 )
196 self._callback(client_reply)
197
198 - def SendRequest(self, request, args=None, coverage=0, async=True):
199 """Send a confd request to some MCs
200
201 @type request: L{objects.ConfdRequest}
202 @param request: the request to send
203 @type args: tuple
204 @param args: additional callback arguments
205 @type coverage: integer
206 @param coverage: number of remote nodes to contact; if default
207 (0), it will use a reasonable default
208 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
209 passed, it will use the maximum number of peers, otherwise the
210 number passed in will be used
211 @type async: boolean
212 @param async: handle the write asynchronously
213
214 """
215 if coverage == 0:
216 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
217 elif coverage == -1:
218 coverage = len(self._peers)
219
220 if coverage > len(self._peers):
221 raise errors.ConfdClientError("Not enough MCs known to provide the"
222 " desired coverage")
223
224 if not request.rsalt:
225 raise errors.ConfdClientError("Missing request rsalt")
226
227 self.ExpireRequests()
228 if request.rsalt in self._requests:
229 raise errors.ConfdClientError("Duplicate request rsalt")
230
231 if request.type not in constants.CONFD_REQS:
232 raise errors.ConfdClientError("Invalid request type")
233
234 random.shuffle(self._peers)
235 targets = self._peers[:coverage]
236
237 now = time.time()
238 payload = self._PackRequest(request, now=now)
239
240 for target in targets:
241 try:
242 self._socket.enqueue_send(target, self._confd_port, payload)
243 except errors.UdpDataSizeError:
244 raise errors.ConfdClientError("Request too big")
245
246 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
247 self._requests[request.rsalt] = _Request(request, args, expire_time,
248 targets)
249
250 if not async:
251 self.FlushSendQueue()
252
254 """Asynchronous handler for a confd reply
255
256 Call the relevant callback associated to the current request.
257
258 """
259 try:
260 try:
261 answer, salt = self._UnpackReply(payload)
262 except (errors.SignatureError, errors.ConfdMagicError), err:
263 if self._logger:
264 self._logger.debug("Discarding broken package: %s" % err)
265 return
266
267 try:
268 rq = self._requests[salt]
269 except KeyError:
270 if self._logger:
271 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
272 return
273
274 rq.rcvd.add(ip)
275
276 client_reply = ConfdUpcallPayload(salt=salt,
277 type=UPCALL_REPLY,
278 server_reply=answer,
279 orig_request=rq.request,
280 server_ip=ip,
281 server_port=port,
282 extra_args=rq.args,
283 client=self,
284 )
285 self._callback(client_reply)
286
287 finally:
288 self.ExpireRequests()
289
291 """Send out all pending requests.
292
293 Can be used for synchronous client use.
294
295 """
296 while self._socket.writable():
297 self._socket.handle_write()
298
300 """Receive one reply.
301
302 @type timeout: float
303 @param timeout: how long to wait for the reply
304 @rtype: boolean
305 @return: True if some data has been handled, False otherwise
306
307 """
308 return self._socket.process_next_packet(timeout=timeout)
309
310 @staticmethod
312 """Compute the minimum safe number of replies for a query.
313
314 The algorithm is designed to work well for both small and big
315 number of peers:
316 - for less than three, we require all responses
317 - for less than five, we allow one miss
318 - otherwise, half the number plus one
319
320 This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
321 4->2, 5->3, 6->3, 7->4, etc.
322
323 @type peer_cnt: int
324 @param peer_cnt: the number of peers contacted
325 @rtype: int
326 @return: the number of replies which should give a safe coverage
327
328 """
329 if peer_cnt < 3:
330 return peer_cnt
331 elif peer_cnt < 5:
332 return peer_cnt - 1
333 else:
334 return int(peer_cnt/2) + 1
335
337 """Wait for replies to a given request.
338
339 This method will wait until either the timeout expires or a
340 minimum number (computed using L{_NeededReplies}) of replies are
341 received for the given salt. It is useful when doing synchronous
342 calls to this library.
343
344 @param salt: the salt of the request we want responses for
345 @param timeout: the maximum timeout (should be less or equal to
346 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
347 @rtype: tuple
348 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
349 request is unknown, timed_out will be true and the counters
350 will be zero
351
352 """
353 def _CheckResponse():
354 if salt not in self._requests:
355
356 if self._logger:
357 self._logger.debug("Discarding unknown/expired request: %s" % salt)
358 return MISSING
359 rq = self._requests[salt]
360 if len(rq.rcvd) >= expected:
361
362 return (False, len(rq.sent), len(rq.rcvd))
363
364 self.ReceiveReply()
365 raise utils.RetryAgain()
366
367 MISSING = (True, 0, 0)
368
369 if salt not in self._requests:
370 return MISSING
371
372
373 rq = self._requests[salt]
374 rq.expiry += timeout
375 sent = len(rq.sent)
376 expected = self._NeededReplies(sent)
377
378 try:
379 return utils.Retry(_CheckResponse, 0, timeout)
380 except utils.RetryTimeout:
381 if salt in self._requests:
382 rq = self._requests[salt]
383 return (True, len(rq.sent), len(rq.rcvd))
384 else:
385 return MISSING
386
387
388
389
390 UPCALL_REPLY = 1
391
392
393 UPCALL_EXPIRE = 2
394 CONFD_UPCALL_TYPES = frozenset([
395 UPCALL_REPLY,
396 UPCALL_EXPIRE,
397 ])
401 """Callback argument for confd replies
402
403 @type salt: string
404 @ivar salt: salt associated with the query
405 @type type: one of confd.client.CONFD_UPCALL_TYPES
406 @ivar type: upcall type (server reply, expired request, ...)
407 @type orig_request: L{objects.ConfdRequest}
408 @ivar orig_request: original request
409 @type server_reply: L{objects.ConfdReply}
410 @ivar server_reply: server reply
411 @type server_ip: string
412 @ivar server_ip: answering server ip address
413 @type server_port: int
414 @ivar server_port: answering server port
415 @type extra_args: any
416 @ivar extra_args: 'args' argument of the SendRequest function
417 @type client: L{ConfdClient}
418 @ivar client: current confd client instance
419
420 """
421 __slots__ = [
422 "salt",
423 "type",
424 "orig_request",
425 "server_reply",
426 "server_ip",
427 "server_port",
428 "extra_args",
429 "client",
430 ]
431
434 """This is the client-side version of ConfdRequest.
435
436 This version of the class helps creating requests, on the client side, by
437 filling in some default values.
438
439 """
448
451 """Callback that calls another callback, but filters duplicate results.
452
453 @ivar consistent: a dictionary indexed by salt; for each salt, if
454 all responses ware identical, this will be True; this is the
455 expected state on a healthy cluster; on inconsistent or
456 partitioned clusters, this might be False, if we see answers
457 with the same serial but different contents
458
459 """
460 - def __init__(self, callback, logger=None):
461 """Constructor for ConfdFilterCallback
462
463 @type callback: f(L{ConfdUpcallPayload})
464 @param callback: function to call when getting answers
465 @type logger: logging.Logger
466 @param logger: optional logger for internal conditions
467
468 """
469 if not callable(callback):
470 raise errors.ProgrammerError("callback must be callable")
471
472 self._callback = callback
473 self._logger = logger
474
475 self._answers = {}
476 self.consistent = {}
477
478 - def _LogFilter(self, salt, new_reply, old_reply):
479 if not self._logger:
480 return
481
482 if new_reply.serial > old_reply.serial:
483 self._logger.debug("Filtering confirming answer, with newer"
484 " serial for query %s" % salt)
485 elif new_reply.serial == old_reply.serial:
486 if new_reply.answer != old_reply.answer:
487 self._logger.warning("Got incoherent answers for query %s"
488 " (serial: %s)" % (salt, new_reply.serial))
489 else:
490 self._logger.debug("Filtering confirming answer, with same"
491 " serial for query %s" % salt)
492 else:
493 self._logger.debug("Filtering outdated answer for query %s"
494 " serial: (%d < %d)" % (salt, old_reply.serial,
495 new_reply.serial))
496
498
499 if up.salt in self._answers:
500 del self._answers[up.salt]
501 if up.salt in self.consistent:
502 del self.consistent[up.salt]
503
505 """Handle a single confd reply, and decide whether to filter it.
506
507 @rtype: boolean
508 @return: True if the reply should be filtered, False if it should be passed
509 on to the up-callback
510
511 """
512 filter_upcall = False
513 salt = up.salt
514 if salt not in self.consistent:
515 self.consistent[salt] = True
516 if salt not in self._answers:
517
518 self._answers[salt] = up.server_reply
519 elif up.server_reply.serial > self._answers[salt].serial:
520
521 old_answer = self._answers[salt]
522 self._answers[salt] = up.server_reply
523 if up.server_reply.answer == old_answer.answer:
524
525 filter_upcall = True
526 self._LogFilter(salt, up.server_reply, old_answer)
527
528 else:
529
530 if (up.server_reply.serial == self._answers[salt].serial and
531 up.server_reply.answer != self._answers[salt].answer):
532 self.consistent[salt] = False
533 filter_upcall = True
534 self._LogFilter(salt, up.server_reply, self._answers[salt])
535
536 return filter_upcall
537
539 """Filtering callback
540
541 @type up: L{ConfdUpcallPayload}
542 @param up: upper callback
543
544 """
545 filter_upcall = False
546 if up.type == UPCALL_REPLY:
547 filter_upcall = self._HandleReply(up)
548 elif up.type == UPCALL_EXPIRE:
549 self._HandleExpire(up)
550
551 if not filter_upcall:
552 self._callback(up)
553
556 """Callback that calls another callback, and counts the answers
557
558 """
559 - def __init__(self, callback, logger=None):
560 """Constructor for ConfdCountingCallback
561
562 @type callback: f(L{ConfdUpcallPayload})
563 @param callback: function to call when getting answers
564 @type logger: logging.Logger
565 @param logger: optional logger for internal conditions
566
567 """
568 if not callable(callback):
569 raise errors.ProgrammerError("callback must be callable")
570
571 self._callback = callback
572 self._logger = logger
573
574 self._answers = {}
575
577 if salt in self._answers:
578 raise errors.ProgrammerError("query already registered")
579 self._answers[salt] = 0
580
582 """Have all the registered queries received at least an answer?
583
584 """
585 return compat.all(self._answers.values())
586
588
589 if up.salt in self._answers:
590 del self._answers[up.salt]
591
593 """Handle a single confd reply, and decide whether to filter it.
594
595 @rtype: boolean
596 @return: True if the reply should be filtered, False if it should be passed
597 on to the up-callback
598
599 """
600 if up.salt in self._answers:
601 self._answers[up.salt] += 1
602
604 """Filtering callback
605
606 @type up: L{ConfdUpcallPayload}
607 @param up: upper callback
608
609 """
610 if up.type == UPCALL_REPLY:
611 self._HandleReply(up)
612 elif up.type == UPCALL_EXPIRE:
613 self._HandleExpire(up)
614 self._callback(up)
615
618 """Callback that simply stores the most recent answer.
619
620 @ivar _answers: dict of salt to (have_answer, reply)
621
622 """
623 _NO_KEY = (False, None)
624
626 """Constructor for StoreResultCallback
627
628 """
629
630 self._answers = {}
631
633 """Return the best match for a salt
634
635 """
636 return self._answers.get(salt, self._NO_KEY)
637
639 """Expiration handler.
640
641 """
642 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
643 del self._answers[up.salt]
644
646 """Handle a single confd reply, and decide whether to filter it.
647
648 """
649 self._answers[up.salt] = (True, up)
650
662
665 """Return a client configured using the given callback.
666
667 This is handy to abstract the MC list and HMAC key reading.
668
669 @attention: This should only be called on nodes which are part of a
670 cluster, since it depends on a valid (ganeti) data directory;
671 for code running outside of a cluster, you need to create the
672 client manually
673
674 """
675 ss = ssconf.SimpleStore()
676 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
677 mc_list = utils.ReadFile(mc_file).splitlines()
678 hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
679 return ConfdClient(hmac_key, mc_list, callback)
680