1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Inter-node RPC library.
23
24 """
25
26
27
28
29
30
31
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
50
51
52 from ganeti import _generated_rpc
53
54
55 import ganeti.http.client
56
57
58
59 _RPC_CONNECT_TIMEOUT = 5
60
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
63 "Expect:",
64 ]
65
66
67 _TMO_URGENT = 60
68 _TMO_FAST = 5 * 60
69 _TMO_NORMAL = 15 * 60
70 _TMO_SLOW = 3600
71 _TMO_4HRS = 4 * 3600
72 _TMO_1DAY = 86400
73
74
75 _OFFLINE = object()
79 """Initializes the module-global HTTP client manager.
80
81 Must be called before using any RPC function and while exactly one thread is
82 running.
83
84 """
85
86
87
88 assert threading.activeCount() == 1, \
89 "Found more than one active thread when initializing pycURL"
90
91 logging.info("Using PycURL %s", pycurl.version)
92
93 pycurl.global_init(pycurl.GLOBAL_ALL)
94
97 """Stops the module-global HTTP client manager.
98
99 Must be called before quitting the program and while exactly one thread is
100 running.
101
102 """
103 pycurl.global_cleanup()
104
118
121 """RPC-wrapper decorator.
122
123 When applied to a function, it runs it with the RPC system
124 initialized, and it shutsdown the system afterwards. This means the
125 function must be called without RPC being initialized.
126
127 """
128 def wrapper(*args, **kwargs):
129 Init()
130 try:
131 return fn(*args, **kwargs)
132 finally:
133 Shutdown()
134 return wrapper
135
138 """Compresses a string for transport over RPC.
139
140 Small amounts of data are not compressed.
141
142 @type data: str
143 @param data: Data
144 @rtype: tuple
145 @return: Encoded data to send
146
147 """
148
149 if len(data) < 512:
150 return (constants.RPC_ENCODING_NONE, data)
151
152
153 return (constants.RPC_ENCODING_ZLIB_BASE64,
154 base64.b64encode(zlib.compress(data, 3)))
155
158 """RPC Result class.
159
160 This class holds an RPC result. It is needed since in multi-node
161 calls we can't raise an exception just because one one out of many
162 failed, and therefore we use this class to encapsulate the result.
163
164 @ivar data: the data payload, for successful results, or None
165 @ivar call: the name of the RPC call
166 @ivar node: the name of the node to which we made the call
167 @ivar offline: whether the operation failed because the node was
168 offline, as opposed to actual failure; offline=True will always
169 imply failed=True, in order to allow simpler checking if
170 the user doesn't care about the exact failure mode
171 @ivar fail_msg: the error message if the call failed
172
173 """
174 - def __init__(self, data=None, failed=False, offline=False,
175 call=None, node=None):
176 self.offline = offline
177 self.call = call
178 self.node = node
179
180 if offline:
181 self.fail_msg = "Node is marked offline"
182 self.data = self.payload = None
183 elif failed:
184 self.fail_msg = self._EnsureErr(data)
185 self.data = self.payload = None
186 else:
187 self.data = data
188 if not isinstance(self.data, (tuple, list)):
189 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190 type(self.data))
191 self.payload = None
192 elif len(data) != 2:
193 self.fail_msg = ("RPC layer error: invalid result length (%d), "
194 "expected 2" % len(self.data))
195 self.payload = None
196 elif not self.data[0]:
197 self.fail_msg = self._EnsureErr(self.data[1])
198 self.payload = None
199 else:
200
201 self.fail_msg = None
202 self.payload = data[1]
203
204 for attr_name in ["call", "data", "fail_msg",
205 "node", "offline", "payload"]:
206 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207
208 @staticmethod
210 """Helper to ensure we return a 'True' value for error."""
211 if val:
212 return val
213 else:
214 return "No error information"
215
216 - def Raise(self, msg, prereq=False, ecode=None):
217 """If the result has failed, raise an OpExecError.
218
219 This is used so that LU code doesn't have to check for each
220 result, but instead can call this function.
221
222 """
223 if not self.fail_msg:
224 return
225
226 if not msg:
227 msg = ("Call '%s' to node '%s' has failed: %s" %
228 (self.call, self.node, self.fail_msg))
229 else:
230 msg = "%s: %s" % (msg, self.fail_msg)
231 if prereq:
232 ec = errors.OpPrereqError
233 else:
234 ec = errors.OpExecError
235 if ecode is not None:
236 args = (msg, ecode)
237 else:
238 args = (msg, )
239 raise ec(*args)
240
245 """Return addresses for given node names.
246
247 @type ssconf_ips: bool
248 @param ssconf_ips: Use the ssconf IPs
249 @type node_list: list
250 @param node_list: List of node names
251 @type ssc: class
252 @param ssc: SimpleStore class that is used to obtain node->ip mappings
253 @type nslookup_fn: callable
254 @param nslookup_fn: function use to do NS lookup
255 @rtype: list of tuple; (string, string)
256 @return: List of tuples containing node name and IP address
257
258 """
259 ss = ssc()
260 family = ss.GetPrimaryIPFamily()
261
262 if ssconf_ips:
263 iplist = ss.GetNodePrimaryIPList()
264 ipmap = dict(entry.split() for entry in iplist)
265 else:
266 ipmap = {}
267
268 result = []
269 for node in node_list:
270 ip = ipmap.get(node)
271 if ip is None:
272 ip = nslookup_fn(node, family=family)
273 result.append((node, ip))
274
275 return result
276
280 """Initializes this class.
281
282 """
283 self._addresses = addresses
284
286 """Returns static addresses for hosts.
287
288 """
289 assert len(hosts) == len(self._addresses)
290 return zip(hosts, self._addresses)
291
294 """Checks if a node is online.
295
296 @type name: string
297 @param name: Node name
298 @type node: L{objects.Node} or None
299 @param node: Node object
300
301 """
302 if node is None:
303
304 ip = name
305 elif node.offline and not accept_offline_node:
306 ip = _OFFLINE
307 else:
308 ip = node.primary_ip
309 return (name, ip)
310
329
332 - def __init__(self, resolver, port, lock_monitor_cb=None):
333 """Initializes this class.
334
335 @param resolver: callable accepting a list of hostnames, returning a list
336 of tuples containing name and IP address (IP address can be the name or
337 the special value L{_OFFLINE} to mark offline machines)
338 @type port: int
339 @param port: TCP port
340 @param lock_monitor_cb: Callable for registering with lock monitor
341
342 """
343 self._resolver = resolver
344 self._port = port
345 self._lock_monitor_cb = lock_monitor_cb
346
347 @staticmethod
349 """Prepares requests by sorting offline hosts into separate list.
350
351 @type body: dict
352 @param body: a dictionary with per-host body data
353
354 """
355 results = {}
356 requests = {}
357
358 assert isinstance(body, dict)
359 assert len(body) == len(hosts)
360 assert compat.all(isinstance(v, str) for v in body.values())
361 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
362 "%s != %s" % (hosts, body.keys())
363
364 for (name, ip) in hosts:
365 if ip is _OFFLINE:
366
367 results[name] = RpcResult(node=name, offline=True, call=procedure)
368 else:
369 requests[name] = \
370 http.client.HttpClientRequest(str(ip), port,
371 http.HTTP_POST, str("/%s" % procedure),
372 headers=_RPC_CLIENT_HEADERS,
373 post_data=body[name],
374 read_timeout=read_timeout,
375 nicename="%s/%s" % (name, procedure),
376 curl_config_fn=_ConfigRpcCurl)
377
378 return (results, requests)
379
380 @staticmethod
382 """Combines pre-computed results for offline hosts with actual call results.
383
384 """
385 for name, req in requests.items():
386 if req.success and req.resp_status_code == http.HTTP_OK:
387 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
388 node=name, call=procedure)
389 else:
390
391 if req.error:
392 msg = req.error
393 else:
394 msg = req.resp_body
395
396 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
397 host_result = RpcResult(data=msg, failed=True, node=name,
398 call=procedure)
399
400 results[name] = host_result
401
402 return results
403
404 - def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
405 _req_process_fn=None):
406 """Makes an RPC request to a number of nodes.
407
408 @type hosts: sequence
409 @param hosts: Hostnames
410 @type procedure: string
411 @param procedure: Request path
412 @type body: dictionary
413 @param body: dictionary with request bodies per host
414 @type read_timeout: int or None
415 @param read_timeout: Read timeout for request
416
417 """
418 assert read_timeout is not None, \
419 "Missing RPC read timeout for procedure '%s'" % procedure
420
421 if _req_process_fn is None:
422 _req_process_fn = http.client.ProcessRequests
423
424 (results, requests) = \
425 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
426 procedure, body, read_timeout)
427
428 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
429
430 assert not frozenset(results).intersection(requests)
431
432 return self._CombineResults(results, requests, procedure)
433
436 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
437 _req_process_fn=None):
446
447 @staticmethod
449 """Encode argument.
450
451 """
452 if argkind is None:
453 return value
454 else:
455 return encoder_fn(argkind)(value)
456
457 - def _Call(self, cdef, node_list, args):
458 """Entry point for automatically generated RPC wrappers.
459
460 """
461 (procedure, _, resolver_opts, timeout, argdefs,
462 prep_fn, postproc_fn, _) = cdef
463
464 if callable(timeout):
465 read_timeout = timeout(args)
466 else:
467 read_timeout = timeout
468
469 if callable(resolver_opts):
470 req_resolver_opts = resolver_opts(args)
471 else:
472 req_resolver_opts = resolver_opts
473
474 if len(args) != len(argdefs):
475 raise errors.ProgrammerError("Number of passed arguments doesn't match")
476
477 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
478 if prep_fn is None:
479
480
481 body = serializer.DumpJson(enc_args)
482 pnbody = dict((n, body) for n in node_list)
483 else:
484
485
486 assert callable(prep_fn)
487 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
488 for n in node_list)
489
490 result = self._proc(node_list, procedure, pnbody, read_timeout,
491 req_resolver_opts)
492
493 if postproc_fn:
494 return dict(map(lambda (key, value): (key, postproc_fn(value)),
495 result.items()))
496 else:
497 return result
498
501 """Converts an object to a dictionary.
502
503 @note: See L{objects}.
504
505 """
506 return value.ToDict()
507
510 """Converts a list of L{objects} to dictionaries.
511
512 """
513 return map(_ObjectToDict, value)
514
517 """Encodes a dictionary with node name as key and disk objects as values.
518
519 """
520 return dict((name, _ObjectListToDict(disks))
521 for name, disks in value.items())
522
525 """Loads a file and prepares it for an upload to nodes.
526
527 """
528 statcb = utils.FileStatHelper()
529 data = _Compress(utils.ReadFile(filename, preread=statcb))
530 st = statcb.st
531
532 if getents_fn is None:
533 getents_fn = runtime.GetEnts
534
535 getents = getents_fn()
536
537 return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
538 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
539
542 """Encodes disks for finalizing export.
543
544 """
545 flat_disks = []
546
547 for disk in snap_disks:
548 if isinstance(disk, bool):
549 flat_disks.append(disk)
550 else:
551 flat_disks.append(disk.ToDict())
552
553 return flat_disks
554
557 """Encodes import/export I/O information.
558
559 """
560 if ieio == constants.IEIO_RAW_DISK:
561 assert len(ieioargs) == 1
562 return (ieio, (ieioargs[0].ToDict(), ))
563
564 if ieio == constants.IEIO_SCRIPT:
565 assert len(ieioargs) == 2
566 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
567
568 return (ieio, ieioargs)
569
572 """Encodes information for renaming block devices.
573
574 """
575 return [(d.ToDict(), uid) for d, uid in value]
576
579 """Annotates just DRBD disks layouts.
580
581 """
582 assert disk.dev_type == constants.LD_DRBD8
583
584 disk.params = objects.FillDict(drbd_params, disk.params)
585 (dev_data, dev_meta) = disk.children
586 dev_data.params = objects.FillDict(data_params, dev_data.params)
587 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
588
589 return disk
590
593 """Generic disk parameter annotation routine.
594
595 """
596 assert disk.dev_type != constants.LD_DRBD8
597
598 disk.params = objects.FillDict(params, disk.params)
599
600 return disk
601
604 """Annotates the disk objects with the disk parameters.
605
606 @param template: The disk template used
607 @param disks: The list of disks objects to annotate
608 @param disk_params: The disk paramaters for annotation
609 @returns: A list of disk objects annotated
610
611 """
612 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
613
614 if template == constants.DT_DRBD8:
615 annotation_fn = _AnnotateDParamsDRBD
616 elif template == constants.DT_DISKLESS:
617 annotation_fn = lambda disk, _: disk
618 else:
619 annotation_fn = _AnnotateDParamsGeneric
620
621 new_disks = []
622 for disk in disks:
623 new_disks.append(annotation_fn(disk.Copy(), ld_params))
624
625 return new_disks
626
627
628
629 _ENCODERS = {
630 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
631 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
632 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
633 rpc_defs.ED_COMPRESS: _Compress,
634 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
635 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
636 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
637 }
638
639
640 -class RpcRunner(_RpcClientBase,
641 _generated_rpc.RpcClientDefault,
642 _generated_rpc.RpcClientBootstrap,
643 _generated_rpc.RpcClientDnsOnly,
644 _generated_rpc.RpcClientConfig):
645 """RPC runner class.
646
647 """
648 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
649 """Initialized the RPC runner.
650
651 @type cfg: L{config.ConfigWriter}
652 @param cfg: Configuration
653 @type lock_monitor_cb: callable
654 @param lock_monitor_cb: Lock monitor callback
655
656 """
657 self._cfg = cfg
658
659 encoders = _ENCODERS.copy()
660
661 encoders.update({
662
663 rpc_defs.ED_INST_DICT: self._InstDict,
664 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
665 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
666
667
668 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
669 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
670
671
672 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
673 })
674
675
676 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
677 cfg.GetAllNodesInfo)
678
679
680
681
682
683 _RpcClientBase.__init__(self, resolver, encoders.get,
684 lock_monitor_cb=lock_monitor_cb,
685 _req_process_fn=_req_process_fn)
686 _generated_rpc.RpcClientConfig.__init__(self)
687 _generated_rpc.RpcClientBootstrap.__init__(self)
688 _generated_rpc.RpcClientDnsOnly.__init__(self)
689 _generated_rpc.RpcClientDefault.__init__(self)
690
691 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
692 """Convert the given instance to a dict.
693
694 This is done via the instance's ToDict() method and additionally
695 we fill the hvparams with the cluster defaults.
696
697 @type instance: L{objects.Instance}
698 @param instance: an Instance object
699 @type hvp: dict or None
700 @param hvp: a dictionary with overridden hypervisor parameters
701 @type bep: dict or None
702 @param bep: a dictionary with overridden backend parameters
703 @type osp: dict or None
704 @param osp: a dictionary with overridden os parameters
705 @rtype: dict
706 @return: the instance dict, with the hvparams filled with the
707 cluster defaults
708
709 """
710 idict = instance.ToDict()
711 cluster = self._cfg.GetClusterInfo()
712 idict["hvparams"] = cluster.FillHV(instance)
713 if hvp is not None:
714 idict["hvparams"].update(hvp)
715 idict["beparams"] = cluster.FillBE(instance)
716 if bep is not None:
717 idict["beparams"].update(bep)
718 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
719 if osp is not None:
720 idict["osparams"].update(osp)
721 for nic in idict["nics"]:
722 nic["nicparams"] = objects.FillDict(
723 cluster.nicparams[constants.PP_DEFAULT],
724 nic["nicparams"])
725 idict["disks"] = self._DisksDictDP((instance.disks, instance))
726 return idict
727
729 """Wrapper for L{_InstDict}.
730
731 """
732 return self._InstDict(instance, hvp=hvp, bep=bep)
733
735 """Wrapper for L{_InstDict}.
736
737 """
738 return self._InstDict(instance, osp=osparams)
739
748
750 """Wrapper for L{AnnotateDiskParams}.
751
752 """
753 (anno_disk,) = self._DisksDictDP(([disk], instance))
754 return anno_disk
755
756
757 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
758 """RPC wrappers for job queue.
759
760 """
761 - def __init__(self, context, address_list):
774
775
776 -class BootstrapRunner(_RpcClientBase,
777 _generated_rpc.RpcClientBootstrap,
778 _generated_rpc.RpcClientDnsOnly):
779 """RPC wrappers for bootstrapping.
780
781 """
794
795
796 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
797 """RPC wrappers for calls using only DNS.
798
799 """
807
808
809 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
810 """RPC wrappers for L{config}.
811
812 """
813 - def __init__(self, context, address_list, _req_process_fn=None,
814 _getents=None):
839