1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Ganeti confd client
32
33 Clients can use the confd client library to send requests to a group of master
34 candidates running confd. The expected usage is through the asyncore framework,
35 by sending queries, and asynchronously receiving replies through a callback.
36
37 This way the client library doesn't ever need to "wait" on a particular answer,
38 and can proceed even if some udp packets are lost. It's up to the user to
39 reschedule queries if they haven't received responses and they need them.
40
41 Example usage::
42
43 client = ConfdClient(...) # includes callback specification
44 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
45 client.SendRequest(req)
46 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
47 # ... wait ...
48 # And your callback will be called by asyncore, when your query gets a
49 # response, or when it expires.
50
51 You can use the provided ConfdFilterCallback to act as a filter, only passing
52 "newer" answer to your callback, and filtering out outdated ones, or ones
53 confirming what you already got.
54
55 """
56
57
58
59
60
61
62 import time
63 import random
64
65 from ganeti import utils
66 from ganeti import constants
67 from ganeti import objects
68 from ganeti import serializer
69 from ganeti import daemon
70 from ganeti import errors
71 from ganeti import confd
72 from ganeti import ssconf
73 from ganeti import compat
74 from ganeti import netutils
75 from ganeti import pathutils
79 """Confd udp asyncore client
80
81 This is kept separate from the main ConfdClient to make sure it's easy to
82 implement a non-asyncore based client library.
83
84 """
94
95
98
101 """Request status structure.
102
103 @ivar request: the request data
104 @ivar args: any extra arguments for the callback
105 @ivar expiry: the expiry timestamp of the request
106 @ivar sent: the set of contacted peers
107 @ivar rcvd: the set of peers who replied
108
109 """
110 - def __init__(self, request, args, expiry, sent):
111 self.request = request
112 self.args = args
113 self.expiry = expiry
114 self.sent = frozenset(sent)
115 self.rcvd = set()
116
119 """Send queries to confd, and get back answers.
120
121 Since the confd model works by querying multiple master candidates, and
122 getting back answers, this is an asynchronous library. It can either work
123 through asyncore or with your own handling.
124
125 @type _requests: dict
126 @ivar _requests: dictionary indexes by salt, which contains data
127 about the outstanding requests; the values are objects of type
128 L{_Request}
129
130 """
131 - def __init__(self, hmac_key, peers, callback, port=None, logger=None):
132 """Constructor for ConfdClient
133
134 @type hmac_key: string
135 @param hmac_key: hmac key to talk to confd
136 @type peers: list
137 @param peers: list of peer nodes
138 @type callback: f(L{ConfdUpcallPayload})
139 @param callback: function to call when getting answers
140 @type port: integer
141 @param port: confd port (default: use GetDaemonPort)
142 @type logger: logging.Logger
143 @param logger: optional logger for internal conditions
144
145 """
146 if not callable(callback):
147 raise errors.ProgrammerError("callback must be callable")
148
149 self.UpdatePeerList(peers)
150 self._SetPeersAddressFamily()
151 self._hmac_key = hmac_key
152 self._socket = ConfdAsyncUDPClient(self, self._family)
153 self._callback = callback
154 self._confd_port = port
155 self._logger = logger
156 self._requests = {}
157
158 if self._confd_port is None:
159 self._confd_port = netutils.GetDaemonPort(constants.CONFD)
160
162 """Update the list of peers
163
164 @type peers: list
165 @param peers: list of peer nodes
166
167 """
168
169
170 if not isinstance(peers, list):
171 raise errors.ProgrammerError("peers must be a list")
172
173 self._peers = list(peers)
174
176 """Prepare a request to be sent on the wire.
177
178 This function puts a proper salt in a confd request, puts the proper salt,
179 and adds the correct magic number.
180
181 """
182 if now is None:
183 now = time.time()
184 tstamp = "%d" % now
185 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
186 return confd.PackMagic(req)
187
193
195 """Delete all the expired requests.
196
197 """
198 now = time.time()
199 for rsalt, rq in self._requests.items():
200 if now >= rq.expiry:
201 del self._requests[rsalt]
202 client_reply = ConfdUpcallPayload(salt=rsalt,
203 type=UPCALL_EXPIRE,
204 orig_request=rq.request,
205 extra_args=rq.args,
206 client=self,
207 )
208 self._callback(client_reply)
209
210 - def SendRequest(self, request, args=None, coverage=0, async=True):
211 """Send a confd request to some MCs
212
213 @type request: L{objects.ConfdRequest}
214 @param request: the request to send
215 @type args: tuple
216 @param args: additional callback arguments
217 @type coverage: integer
218 @param coverage: number of remote nodes to contact; if default
219 (0), it will use a reasonable default
220 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
221 passed, it will use the maximum number of peers, otherwise the
222 number passed in will be used
223 @type async: boolean
224 @param async: handle the write asynchronously
225
226 """
227 if coverage == 0:
228 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
229 elif coverage == -1:
230 coverage = len(self._peers)
231
232 if coverage > len(self._peers):
233 raise errors.ConfdClientError("Not enough MCs known to provide the"
234 " desired coverage")
235
236 if not request.rsalt:
237 raise errors.ConfdClientError("Missing request rsalt")
238
239 self.ExpireRequests()
240 if request.rsalt in self._requests:
241 raise errors.ConfdClientError("Duplicate request rsalt")
242
243 if request.type not in constants.CONFD_REQS:
244 raise errors.ConfdClientError("Invalid request type")
245
246 random.shuffle(self._peers)
247 targets = self._peers[:coverage]
248
249 now = time.time()
250 payload = self._PackRequest(request, now=now)
251
252 for target in targets:
253 try:
254 self._socket.enqueue_send(target, self._confd_port, payload)
255 except errors.UdpDataSizeError:
256 raise errors.ConfdClientError("Request too big")
257
258 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
259 self._requests[request.rsalt] = _Request(request, args, expire_time,
260 targets)
261
262 if not async:
263 self.FlushSendQueue()
264
266 """Asynchronous handler for a confd reply
267
268 Call the relevant callback associated to the current request.
269
270 """
271 try:
272 try:
273 answer, salt = self._UnpackReply(payload)
274 except (errors.SignatureError, errors.ConfdMagicError), err:
275 if self._logger:
276 self._logger.debug("Discarding broken package: %s" % err)
277 return
278
279 try:
280 rq = self._requests[salt]
281 except KeyError:
282 if self._logger:
283 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
284 return
285
286 rq.rcvd.add(ip)
287
288 client_reply = ConfdUpcallPayload(salt=salt,
289 type=UPCALL_REPLY,
290 server_reply=answer,
291 orig_request=rq.request,
292 server_ip=ip,
293 server_port=port,
294 extra_args=rq.args,
295 client=self,
296 )
297 self._callback(client_reply)
298
299 finally:
300 self.ExpireRequests()
301
303 """Send out all pending requests.
304
305 Can be used for synchronous client use.
306
307 """
308 while self._socket.writable():
309 self._socket.handle_write()
310
312 """Receive one reply.
313
314 @type timeout: float
315 @param timeout: how long to wait for the reply
316 @rtype: boolean
317 @return: True if some data has been handled, False otherwise
318
319 """
320 return self._socket.process_next_packet(timeout=timeout)
321
322 @staticmethod
324 """Compute the minimum safe number of replies for a query.
325
326 The algorithm is designed to work well for both small and big
327 number of peers:
328 - for less than three, we require all responses
329 - for less than five, we allow one miss
330 - otherwise, half the number plus one
331
332 This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
333 4->2, 5->3, 6->3, 7->4, etc.
334
335 @type peer_cnt: int
336 @param peer_cnt: the number of peers contacted
337 @rtype: int
338 @return: the number of replies which should give a safe coverage
339
340 """
341 if peer_cnt < 3:
342 return peer_cnt
343 elif peer_cnt < 5:
344 return peer_cnt - 1
345 else:
346 return int(peer_cnt / 2) + 1
347
349 """Wait for replies to a given request.
350
351 This method will wait until either the timeout expires or a
352 minimum number (computed using L{_NeededReplies}) of replies are
353 received for the given salt. It is useful when doing synchronous
354 calls to this library.
355
356 @param salt: the salt of the request we want responses for
357 @param timeout: the maximum timeout (should be less or equal to
358 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
359 @rtype: tuple
360 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
361 request is unknown, timed_out will be true and the counters
362 will be zero
363
364 """
365 def _CheckResponse():
366 if salt not in self._requests:
367
368 if self._logger:
369 self._logger.debug("Discarding unknown/expired request: %s" % salt)
370 return MISSING
371 rq = self._requests[salt]
372 if len(rq.rcvd) >= expected:
373
374 return (False, len(rq.sent), len(rq.rcvd))
375
376 self.ReceiveReply()
377 raise utils.RetryAgain()
378
379 MISSING = (True, 0, 0)
380
381 if salt not in self._requests:
382 return MISSING
383
384
385 rq = self._requests[salt]
386 rq.expiry += timeout
387 sent = len(rq.sent)
388 expected = self._NeededReplies(sent)
389
390 try:
391 return utils.Retry(_CheckResponse, 0, timeout)
392 except utils.RetryTimeout:
393 if salt in self._requests:
394 rq = self._requests[salt]
395 return (True, len(rq.sent), len(rq.rcvd))
396 else:
397 return MISSING
398
410
411
412
413
414 UPCALL_REPLY = 1
415
416
417 UPCALL_EXPIRE = 2
418 CONFD_UPCALL_TYPES = compat.UniqueFrozenset([
419 UPCALL_REPLY,
420 UPCALL_EXPIRE,
421 ])
425 """Callback argument for confd replies
426
427 @type salt: string
428 @ivar salt: salt associated with the query
429 @type type: one of confd.client.CONFD_UPCALL_TYPES
430 @ivar type: upcall type (server reply, expired request, ...)
431 @type orig_request: L{objects.ConfdRequest}
432 @ivar orig_request: original request
433 @type server_reply: L{objects.ConfdReply}
434 @ivar server_reply: server reply
435 @type server_ip: string
436 @ivar server_ip: answering server ip address
437 @type server_port: int
438 @ivar server_port: answering server port
439 @type extra_args: any
440 @ivar extra_args: 'args' argument of the SendRequest function
441 @type client: L{ConfdClient}
442 @ivar client: current confd client instance
443
444 """
445 __slots__ = [
446 "salt",
447 "type",
448 "orig_request",
449 "server_reply",
450 "server_ip",
451 "server_port",
452 "extra_args",
453 "client",
454 ]
455
458 """This is the client-side version of ConfdRequest.
459
460 This version of the class helps creating requests, on the client side, by
461 filling in some default values.
462
463 """
472
475 """Callback that calls another callback, but filters duplicate results.
476
477 @ivar consistent: a dictionary indexed by salt; for each salt, if
478 all responses ware identical, this will be True; this is the
479 expected state on a healthy cluster; on inconsistent or
480 partitioned clusters, this might be False, if we see answers
481 with the same serial but different contents
482
483 """
484 - def __init__(self, callback, logger=None):
485 """Constructor for ConfdFilterCallback
486
487 @type callback: f(L{ConfdUpcallPayload})
488 @param callback: function to call when getting answers
489 @type logger: logging.Logger
490 @param logger: optional logger for internal conditions
491
492 """
493 if not callable(callback):
494 raise errors.ProgrammerError("callback must be callable")
495
496 self._callback = callback
497 self._logger = logger
498
499 self._answers = {}
500 self.consistent = {}
501
502 - def _LogFilter(self, salt, new_reply, old_reply):
503 if not self._logger:
504 return
505
506 if new_reply.serial > old_reply.serial:
507 self._logger.debug("Filtering confirming answer, with newer"
508 " serial for query %s" % salt)
509 elif new_reply.serial == old_reply.serial:
510 if new_reply.answer != old_reply.answer:
511 self._logger.warning("Got incoherent answers for query %s"
512 " (serial: %s)" % (salt, new_reply.serial))
513 else:
514 self._logger.debug("Filtering confirming answer, with same"
515 " serial for query %s" % salt)
516 else:
517 self._logger.debug("Filtering outdated answer for query %s"
518 " serial: (%d < %d)" % (salt, old_reply.serial,
519 new_reply.serial))
520
522
523 if up.salt in self._answers:
524 del self._answers[up.salt]
525 if up.salt in self.consistent:
526 del self.consistent[up.salt]
527
529 """Handle a single confd reply, and decide whether to filter it.
530
531 @rtype: boolean
532 @return: True if the reply should be filtered, False if it should be passed
533 on to the up-callback
534
535 """
536 filter_upcall = False
537 salt = up.salt
538 if salt not in self.consistent:
539 self.consistent[salt] = True
540 if salt not in self._answers:
541
542 self._answers[salt] = up.server_reply
543 elif up.server_reply.serial > self._answers[salt].serial:
544
545 old_answer = self._answers[salt]
546 self._answers[salt] = up.server_reply
547 if up.server_reply.answer == old_answer.answer:
548
549 filter_upcall = True
550 self._LogFilter(salt, up.server_reply, old_answer)
551
552 else:
553
554 if (up.server_reply.serial == self._answers[salt].serial and
555 up.server_reply.answer != self._answers[salt].answer):
556 self.consistent[salt] = False
557 filter_upcall = True
558 self._LogFilter(salt, up.server_reply, self._answers[salt])
559
560 return filter_upcall
561
563 """Filtering callback
564
565 @type up: L{ConfdUpcallPayload}
566 @param up: upper callback
567
568 """
569 filter_upcall = False
570 if up.type == UPCALL_REPLY:
571 filter_upcall = self._HandleReply(up)
572 elif up.type == UPCALL_EXPIRE:
573 self._HandleExpire(up)
574
575 if not filter_upcall:
576 self._callback(up)
577
580 """Callback that calls another callback, and counts the answers
581
582 """
583 - def __init__(self, callback, logger=None):
584 """Constructor for ConfdCountingCallback
585
586 @type callback: f(L{ConfdUpcallPayload})
587 @param callback: function to call when getting answers
588 @type logger: logging.Logger
589 @param logger: optional logger for internal conditions
590
591 """
592 if not callable(callback):
593 raise errors.ProgrammerError("callback must be callable")
594
595 self._callback = callback
596 self._logger = logger
597
598 self._answers = {}
599
601 if salt in self._answers:
602 raise errors.ProgrammerError("query already registered")
603 self._answers[salt] = 0
604
606 """Have all the registered queries received at least an answer?
607
608 """
609 return compat.all(self._answers.values())
610
612
613 if up.salt in self._answers:
614 del self._answers[up.salt]
615
617 """Handle a single confd reply, and decide whether to filter it.
618
619 @rtype: boolean
620 @return: True if the reply should be filtered, False if it should be passed
621 on to the up-callback
622
623 """
624 if up.salt in self._answers:
625 self._answers[up.salt] += 1
626
628 """Filtering callback
629
630 @type up: L{ConfdUpcallPayload}
631 @param up: upper callback
632
633 """
634 if up.type == UPCALL_REPLY:
635 self._HandleReply(up)
636 elif up.type == UPCALL_EXPIRE:
637 self._HandleExpire(up)
638 self._callback(up)
639
642 """Callback that simply stores the most recent answer.
643
644 @ivar _answers: dict of salt to (have_answer, reply)
645
646 """
647 _NO_KEY = (False, None)
648
650 """Constructor for StoreResultCallback
651
652 """
653
654 self._answers = {}
655
657 """Return the best match for a salt
658
659 """
660 return self._answers.get(salt, self._NO_KEY)
661
663 """Expiration handler.
664
665 """
666 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
667 del self._answers[up.salt]
668
670 """Handle a single confd reply, and decide whether to filter it.
671
672 """
673 self._answers[up.salt] = (True, up)
674
686
689 """Return a client configured using the given callback.
690
691 This is handy to abstract the MC list and HMAC key reading.
692
693 @attention: This should only be called on nodes which are part of a
694 cluster, since it depends on a valid (ganeti) data directory;
695 for code running outside of a cluster, you need to create the
696 client manually
697
698 """
699 ss = ssconf.SimpleStore()
700 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
701 mc_list = utils.ReadFile(mc_file).splitlines()
702 hmac_key = utils.ReadFile(pathutils.CONFD_HMAC_KEY)
703 return ConfdClient(hmac_key, mc_list, callback)
704