1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage::
33
34 client = ConfdClient(...) # includes callback specification
35 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 client.SendRequest(req)
37 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 # ... wait ...
39 # And your callback will be called by asyncore, when your query gets a
40 # response, or when it expires.
41
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
45
46 """
47
48
49
50
51
52
53 import time
54 import random
55
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
65 from ganeti import netutils
69 """Confd udp asyncore client
70
71 This is kept separate from the main ConfdClient to make sure it's easy to
72 implement a non-asyncore based client library.
73
74 """
84
85
88
91 """Request status structure.
92
93 @ivar request: the request data
94 @ivar args: any extra arguments for the callback
95 @ivar expiry: the expiry timestamp of the request
96 @ivar sent: the set of contacted peers
97 @ivar rcvd: the set of peers who replied
98
99 """
100 - def __init__(self, request, args, expiry, sent):
101 self.request = request
102 self.args = args
103 self.expiry = expiry
104 self.sent = frozenset(sent)
105 self.rcvd = set()
106
109 """Send queries to confd, and get back answers.
110
111 Since the confd model works by querying multiple master candidates, and
112 getting back answers, this is an asynchronous library. It can either work
113 through asyncore or with your own handling.
114
115 @type _requests: dict
116 @ivar _requests: dictionary indexes by salt, which contains data
117 about the outstanding requests; the values are objects of type
118 L{_Request}
119
120 """
121 - def __init__(self, hmac_key, peers, callback, port=None, logger=None):
122 """Constructor for ConfdClient
123
124 @type hmac_key: string
125 @param hmac_key: hmac key to talk to confd
126 @type peers: list
127 @param peers: list of peer nodes
128 @type callback: f(L{ConfdUpcallPayload})
129 @param callback: function to call when getting answers
130 @type port: integer
131 @param port: confd port (default: use GetDaemonPort)
132 @type logger: logging.Logger
133 @param logger: optional logger for internal conditions
134
135 """
136 if not callable(callback):
137 raise errors.ProgrammerError("callback must be callable")
138
139 self.UpdatePeerList(peers)
140 self._SetPeersAddressFamily()
141 self._hmac_key = hmac_key
142 self._socket = ConfdAsyncUDPClient(self, self._family)
143 self._callback = callback
144 self._confd_port = port
145 self._logger = logger
146 self._requests = {}
147
148 if self._confd_port is None:
149 self._confd_port = netutils.GetDaemonPort(constants.CONFD)
150
152 """Update the list of peers
153
154 @type peers: list
155 @param peers: list of peer nodes
156
157 """
158
159
160 if not isinstance(peers, list):
161 raise errors.ProgrammerError("peers must be a list")
162
163 self._peers = list(peers)
164
166 """Prepare a request to be sent on the wire.
167
168 This function puts a proper salt in a confd request, puts the proper salt,
169 and adds the correct magic number.
170
171 """
172 if now is None:
173 now = time.time()
174 tstamp = '%d' % now
175 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
176 return confd.PackMagic(req)
177
183
185 """Delete all the expired requests.
186
187 """
188 now = time.time()
189 for rsalt, rq in self._requests.items():
190 if now >= rq.expiry:
191 del self._requests[rsalt]
192 client_reply = ConfdUpcallPayload(salt=rsalt,
193 type=UPCALL_EXPIRE,
194 orig_request=rq.request,
195 extra_args=rq.args,
196 client=self,
197 )
198 self._callback(client_reply)
199
200 - def SendRequest(self, request, args=None, coverage=0, async=True):
201 """Send a confd request to some MCs
202
203 @type request: L{objects.ConfdRequest}
204 @param request: the request to send
205 @type args: tuple
206 @param args: additional callback arguments
207 @type coverage: integer
208 @param coverage: number of remote nodes to contact; if default
209 (0), it will use a reasonable default
210 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
211 passed, it will use the maximum number of peers, otherwise the
212 number passed in will be used
213 @type async: boolean
214 @param async: handle the write asynchronously
215
216 """
217 if coverage == 0:
218 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
219 elif coverage == -1:
220 coverage = len(self._peers)
221
222 if coverage > len(self._peers):
223 raise errors.ConfdClientError("Not enough MCs known to provide the"
224 " desired coverage")
225
226 if not request.rsalt:
227 raise errors.ConfdClientError("Missing request rsalt")
228
229 self.ExpireRequests()
230 if request.rsalt in self._requests:
231 raise errors.ConfdClientError("Duplicate request rsalt")
232
233 if request.type not in constants.CONFD_REQS:
234 raise errors.ConfdClientError("Invalid request type")
235
236 random.shuffle(self._peers)
237 targets = self._peers[:coverage]
238
239 now = time.time()
240 payload = self._PackRequest(request, now=now)
241
242 for target in targets:
243 try:
244 self._socket.enqueue_send(target, self._confd_port, payload)
245 except errors.UdpDataSizeError:
246 raise errors.ConfdClientError("Request too big")
247
248 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
249 self._requests[request.rsalt] = _Request(request, args, expire_time,
250 targets)
251
252 if not async:
253 self.FlushSendQueue()
254
256 """Asynchronous handler for a confd reply
257
258 Call the relevant callback associated to the current request.
259
260 """
261 try:
262 try:
263 answer, salt = self._UnpackReply(payload)
264 except (errors.SignatureError, errors.ConfdMagicError), err:
265 if self._logger:
266 self._logger.debug("Discarding broken package: %s" % err)
267 return
268
269 try:
270 rq = self._requests[salt]
271 except KeyError:
272 if self._logger:
273 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
274 return
275
276 rq.rcvd.add(ip)
277
278 client_reply = ConfdUpcallPayload(salt=salt,
279 type=UPCALL_REPLY,
280 server_reply=answer,
281 orig_request=rq.request,
282 server_ip=ip,
283 server_port=port,
284 extra_args=rq.args,
285 client=self,
286 )
287 self._callback(client_reply)
288
289 finally:
290 self.ExpireRequests()
291
293 """Send out all pending requests.
294
295 Can be used for synchronous client use.
296
297 """
298 while self._socket.writable():
299 self._socket.handle_write()
300
302 """Receive one reply.
303
304 @type timeout: float
305 @param timeout: how long to wait for the reply
306 @rtype: boolean
307 @return: True if some data has been handled, False otherwise
308
309 """
310 return self._socket.process_next_packet(timeout=timeout)
311
312 @staticmethod
314 """Compute the minimum safe number of replies for a query.
315
316 The algorithm is designed to work well for both small and big
317 number of peers:
318 - for less than three, we require all responses
319 - for less than five, we allow one miss
320 - otherwise, half the number plus one
321
322 This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
323 4->2, 5->3, 6->3, 7->4, etc.
324
325 @type peer_cnt: int
326 @param peer_cnt: the number of peers contacted
327 @rtype: int
328 @return: the number of replies which should give a safe coverage
329
330 """
331 if peer_cnt < 3:
332 return peer_cnt
333 elif peer_cnt < 5:
334 return peer_cnt - 1
335 else:
336 return int(peer_cnt / 2) + 1
337
339 """Wait for replies to a given request.
340
341 This method will wait until either the timeout expires or a
342 minimum number (computed using L{_NeededReplies}) of replies are
343 received for the given salt. It is useful when doing synchronous
344 calls to this library.
345
346 @param salt: the salt of the request we want responses for
347 @param timeout: the maximum timeout (should be less or equal to
348 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
349 @rtype: tuple
350 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
351 request is unknown, timed_out will be true and the counters
352 will be zero
353
354 """
355 def _CheckResponse():
356 if salt not in self._requests:
357
358 if self._logger:
359 self._logger.debug("Discarding unknown/expired request: %s" % salt)
360 return MISSING
361 rq = self._requests[salt]
362 if len(rq.rcvd) >= expected:
363
364 return (False, len(rq.sent), len(rq.rcvd))
365
366 self.ReceiveReply()
367 raise utils.RetryAgain()
368
369 MISSING = (True, 0, 0)
370
371 if salt not in self._requests:
372 return MISSING
373
374
375 rq = self._requests[salt]
376 rq.expiry += timeout
377 sent = len(rq.sent)
378 expected = self._NeededReplies(sent)
379
380 try:
381 return utils.Retry(_CheckResponse, 0, timeout)
382 except utils.RetryTimeout:
383 if salt in self._requests:
384 rq = self._requests[salt]
385 return (True, len(rq.sent), len(rq.rcvd))
386 else:
387 return MISSING
388
400
401
402
403
404 UPCALL_REPLY = 1
405
406
407 UPCALL_EXPIRE = 2
408 CONFD_UPCALL_TYPES = frozenset([
409 UPCALL_REPLY,
410 UPCALL_EXPIRE,
411 ])
415 """Callback argument for confd replies
416
417 @type salt: string
418 @ivar salt: salt associated with the query
419 @type type: one of confd.client.CONFD_UPCALL_TYPES
420 @ivar type: upcall type (server reply, expired request, ...)
421 @type orig_request: L{objects.ConfdRequest}
422 @ivar orig_request: original request
423 @type server_reply: L{objects.ConfdReply}
424 @ivar server_reply: server reply
425 @type server_ip: string
426 @ivar server_ip: answering server ip address
427 @type server_port: int
428 @ivar server_port: answering server port
429 @type extra_args: any
430 @ivar extra_args: 'args' argument of the SendRequest function
431 @type client: L{ConfdClient}
432 @ivar client: current confd client instance
433
434 """
435 __slots__ = [
436 "salt",
437 "type",
438 "orig_request",
439 "server_reply",
440 "server_ip",
441 "server_port",
442 "extra_args",
443 "client",
444 ]
445
448 """This is the client-side version of ConfdRequest.
449
450 This version of the class helps creating requests, on the client side, by
451 filling in some default values.
452
453 """
462
465 """Callback that calls another callback, but filters duplicate results.
466
467 @ivar consistent: a dictionary indexed by salt; for each salt, if
468 all responses ware identical, this will be True; this is the
469 expected state on a healthy cluster; on inconsistent or
470 partitioned clusters, this might be False, if we see answers
471 with the same serial but different contents
472
473 """
474 - def __init__(self, callback, logger=None):
475 """Constructor for ConfdFilterCallback
476
477 @type callback: f(L{ConfdUpcallPayload})
478 @param callback: function to call when getting answers
479 @type logger: logging.Logger
480 @param logger: optional logger for internal conditions
481
482 """
483 if not callable(callback):
484 raise errors.ProgrammerError("callback must be callable")
485
486 self._callback = callback
487 self._logger = logger
488
489 self._answers = {}
490 self.consistent = {}
491
492 - def _LogFilter(self, salt, new_reply, old_reply):
493 if not self._logger:
494 return
495
496 if new_reply.serial > old_reply.serial:
497 self._logger.debug("Filtering confirming answer, with newer"
498 " serial for query %s" % salt)
499 elif new_reply.serial == old_reply.serial:
500 if new_reply.answer != old_reply.answer:
501 self._logger.warning("Got incoherent answers for query %s"
502 " (serial: %s)" % (salt, new_reply.serial))
503 else:
504 self._logger.debug("Filtering confirming answer, with same"
505 " serial for query %s" % salt)
506 else:
507 self._logger.debug("Filtering outdated answer for query %s"
508 " serial: (%d < %d)" % (salt, old_reply.serial,
509 new_reply.serial))
510
512
513 if up.salt in self._answers:
514 del self._answers[up.salt]
515 if up.salt in self.consistent:
516 del self.consistent[up.salt]
517
519 """Handle a single confd reply, and decide whether to filter it.
520
521 @rtype: boolean
522 @return: True if the reply should be filtered, False if it should be passed
523 on to the up-callback
524
525 """
526 filter_upcall = False
527 salt = up.salt
528 if salt not in self.consistent:
529 self.consistent[salt] = True
530 if salt not in self._answers:
531
532 self._answers[salt] = up.server_reply
533 elif up.server_reply.serial > self._answers[salt].serial:
534
535 old_answer = self._answers[salt]
536 self._answers[salt] = up.server_reply
537 if up.server_reply.answer == old_answer.answer:
538
539 filter_upcall = True
540 self._LogFilter(salt, up.server_reply, old_answer)
541
542 else:
543
544 if (up.server_reply.serial == self._answers[salt].serial and
545 up.server_reply.answer != self._answers[salt].answer):
546 self.consistent[salt] = False
547 filter_upcall = True
548 self._LogFilter(salt, up.server_reply, self._answers[salt])
549
550 return filter_upcall
551
553 """Filtering callback
554
555 @type up: L{ConfdUpcallPayload}
556 @param up: upper callback
557
558 """
559 filter_upcall = False
560 if up.type == UPCALL_REPLY:
561 filter_upcall = self._HandleReply(up)
562 elif up.type == UPCALL_EXPIRE:
563 self._HandleExpire(up)
564
565 if not filter_upcall:
566 self._callback(up)
567
570 """Callback that calls another callback, and counts the answers
571
572 """
573 - def __init__(self, callback, logger=None):
574 """Constructor for ConfdCountingCallback
575
576 @type callback: f(L{ConfdUpcallPayload})
577 @param callback: function to call when getting answers
578 @type logger: logging.Logger
579 @param logger: optional logger for internal conditions
580
581 """
582 if not callable(callback):
583 raise errors.ProgrammerError("callback must be callable")
584
585 self._callback = callback
586 self._logger = logger
587
588 self._answers = {}
589
591 if salt in self._answers:
592 raise errors.ProgrammerError("query already registered")
593 self._answers[salt] = 0
594
596 """Have all the registered queries received at least an answer?
597
598 """
599 return compat.all(self._answers.values())
600
602
603 if up.salt in self._answers:
604 del self._answers[up.salt]
605
607 """Handle a single confd reply, and decide whether to filter it.
608
609 @rtype: boolean
610 @return: True if the reply should be filtered, False if it should be passed
611 on to the up-callback
612
613 """
614 if up.salt in self._answers:
615 self._answers[up.salt] += 1
616
618 """Filtering callback
619
620 @type up: L{ConfdUpcallPayload}
621 @param up: upper callback
622
623 """
624 if up.type == UPCALL_REPLY:
625 self._HandleReply(up)
626 elif up.type == UPCALL_EXPIRE:
627 self._HandleExpire(up)
628 self._callback(up)
629
632 """Callback that simply stores the most recent answer.
633
634 @ivar _answers: dict of salt to (have_answer, reply)
635
636 """
637 _NO_KEY = (False, None)
638
640 """Constructor for StoreResultCallback
641
642 """
643
644 self._answers = {}
645
647 """Return the best match for a salt
648
649 """
650 return self._answers.get(salt, self._NO_KEY)
651
653 """Expiration handler.
654
655 """
656 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
657 del self._answers[up.salt]
658
660 """Handle a single confd reply, and decide whether to filter it.
661
662 """
663 self._answers[up.salt] = (True, up)
664
676
679 """Return a client configured using the given callback.
680
681 This is handy to abstract the MC list and HMAC key reading.
682
683 @attention: This should only be called on nodes which are part of a
684 cluster, since it depends on a valid (ganeti) data directory;
685 for code running outside of a cluster, you need to create the
686 client manually
687
688 """
689 ss = ssconf.SimpleStore()
690 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
691 mc_list = utils.ReadFile(mc_file).splitlines()
692 hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
693 return ConfdClient(hmac_key, mc_list, callback)
694