1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Inter-node RPC library.
23
24 """
25
26
27
28
29
30
31
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38 import copy
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51 from ganeti import pathutils
52 from ganeti import vcluster
53
54
55 from ganeti import _generated_rpc
56
57
58 import ganeti.http.client
59
60
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
63 "Expect:",
64 ]
65
66
67 _OFFLINE = object()
71 """Initializes the module-global HTTP client manager.
72
73 Must be called before using any RPC function and while exactly one thread is
74 running.
75
76 """
77
78
79
80 assert threading.activeCount() == 1, \
81 "Found more than one active thread when initializing pycURL"
82
83 logging.info("Using PycURL %s", pycurl.version)
84
85 pycurl.global_init(pycurl.GLOBAL_ALL)
86
89 """Stops the module-global HTTP client manager.
90
91 Must be called before quitting the program and while exactly one thread is
92 running.
93
94 """
95 pycurl.global_cleanup()
96
110
113 """RPC-wrapper decorator.
114
115 When applied to a function, it runs it with the RPC system
116 initialized, and it shutsdown the system afterwards. This means the
117 function must be called without RPC being initialized.
118
119 """
120 def wrapper(*args, **kwargs):
121 Init()
122 try:
123 return fn(*args, **kwargs)
124 finally:
125 Shutdown()
126 return wrapper
127
130 """Compresses a string for transport over RPC.
131
132 Small amounts of data are not compressed.
133
134 @type data: str
135 @param data: Data
136 @rtype: tuple
137 @return: Encoded data to send
138
139 """
140
141 if len(data) < 512:
142 return (constants.RPC_ENCODING_NONE, data)
143
144
145 return (constants.RPC_ENCODING_ZLIB_BASE64,
146 base64.b64encode(zlib.compress(data, 3)))
147
150 """RPC Result class.
151
152 This class holds an RPC result. It is needed since in multi-node
153 calls we can't raise an exception just because one out of many
154 failed, and therefore we use this class to encapsulate the result.
155
156 @ivar data: the data payload, for successful results, or None
157 @ivar call: the name of the RPC call
158 @ivar node: the name of the node to which we made the call
159 @ivar offline: whether the operation failed because the node was
160 offline, as opposed to actual failure; offline=True will always
161 imply failed=True, in order to allow simpler checking if
162 the user doesn't care about the exact failure mode
163 @ivar fail_msg: the error message if the call failed
164
165 """
166 - def __init__(self, data=None, failed=False, offline=False,
167 call=None, node=None):
168 self.offline = offline
169 self.call = call
170 self.node = node
171
172 if offline:
173 self.fail_msg = "Node is marked offline"
174 self.data = self.payload = None
175 elif failed:
176 self.fail_msg = self._EnsureErr(data)
177 self.data = self.payload = None
178 else:
179 self.data = data
180 if not isinstance(self.data, (tuple, list)):
181 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182 type(self.data))
183 self.payload = None
184 elif len(data) != 2:
185 self.fail_msg = ("RPC layer error: invalid result length (%d), "
186 "expected 2" % len(self.data))
187 self.payload = None
188 elif not self.data[0]:
189 self.fail_msg = self._EnsureErr(self.data[1])
190 self.payload = None
191 else:
192
193 self.fail_msg = None
194 self.payload = data[1]
195
196 for attr_name in ["call", "data", "fail_msg",
197 "node", "offline", "payload"]:
198 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199
200 @staticmethod
202 """Helper to ensure we return a 'True' value for error."""
203 if val:
204 return val
205 else:
206 return "No error information"
207
208 - def Raise(self, msg, prereq=False, ecode=None):
209 """If the result has failed, raise an OpExecError.
210
211 This is used so that LU code doesn't have to check for each
212 result, but instead can call this function.
213
214 """
215 if not self.fail_msg:
216 return
217
218 if not msg:
219 msg = ("Call '%s' to node '%s' has failed: %s" %
220 (self.call, self.node, self.fail_msg))
221 else:
222 msg = "%s: %s" % (msg, self.fail_msg)
223 if prereq:
224 ec = errors.OpPrereqError
225 else:
226 ec = errors.OpExecError
227 if ecode is not None:
228 args = (msg, ecode)
229 else:
230 args = (msg, )
231 raise ec(*args)
232
237 """Return addresses for given node names.
238
239 @type ssconf_ips: bool
240 @param ssconf_ips: Use the ssconf IPs
241 @type node_list: list
242 @param node_list: List of node names
243 @type ssc: class
244 @param ssc: SimpleStore class that is used to obtain node->ip mappings
245 @type nslookup_fn: callable
246 @param nslookup_fn: function use to do NS lookup
247 @rtype: list of tuple; (string, string)
248 @return: List of tuples containing node name and IP address
249
250 """
251 ss = ssc()
252 family = ss.GetPrimaryIPFamily()
253
254 if ssconf_ips:
255 iplist = ss.GetNodePrimaryIPList()
256 ipmap = dict(entry.split() for entry in iplist)
257 else:
258 ipmap = {}
259
260 result = []
261 for node in node_list:
262 ip = ipmap.get(node)
263 if ip is None:
264 ip = nslookup_fn(node, family=family)
265 result.append((node, ip))
266
267 return result
268
272 """Initializes this class.
273
274 """
275 self._addresses = addresses
276
278 """Returns static addresses for hosts.
279
280 """
281 assert len(hosts) == len(self._addresses)
282 return zip(hosts, self._addresses)
283
286 """Checks if a node is online.
287
288 @type name: string
289 @param name: Node name
290 @type node: L{objects.Node} or None
291 @param node: Node object
292
293 """
294 if node is None:
295
296 ip = name
297 elif node.offline and not accept_offline_node:
298 ip = _OFFLINE
299 else:
300 ip = node.primary_ip
301 return (name, ip)
302
321
324 - def __init__(self, resolver, port, lock_monitor_cb=None):
325 """Initializes this class.
326
327 @param resolver: callable accepting a list of hostnames, returning a list
328 of tuples containing name and IP address (IP address can be the name or
329 the special value L{_OFFLINE} to mark offline machines)
330 @type port: int
331 @param port: TCP port
332 @param lock_monitor_cb: Callable for registering with lock monitor
333
334 """
335 self._resolver = resolver
336 self._port = port
337 self._lock_monitor_cb = lock_monitor_cb
338
339 @staticmethod
341 """Prepares requests by sorting offline hosts into separate list.
342
343 @type body: dict
344 @param body: a dictionary with per-host body data
345
346 """
347 results = {}
348 requests = {}
349
350 assert isinstance(body, dict)
351 assert len(body) == len(hosts)
352 assert compat.all(isinstance(v, str) for v in body.values())
353 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
354 "%s != %s" % (hosts, body.keys())
355
356 for (name, ip) in hosts:
357 if ip is _OFFLINE:
358
359 results[name] = RpcResult(node=name, offline=True, call=procedure)
360 else:
361 requests[name] = \
362 http.client.HttpClientRequest(str(ip), port,
363 http.HTTP_POST, str("/%s" % procedure),
364 headers=_RPC_CLIENT_HEADERS,
365 post_data=body[name],
366 read_timeout=read_timeout,
367 nicename="%s/%s" % (name, procedure),
368 curl_config_fn=_ConfigRpcCurl)
369
370 return (results, requests)
371
372 @staticmethod
374 """Combines pre-computed results for offline hosts with actual call results.
375
376 """
377 for name, req in requests.items():
378 if req.success and req.resp_status_code == http.HTTP_OK:
379 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
380 node=name, call=procedure)
381 else:
382
383 if req.error:
384 msg = req.error
385 else:
386 msg = req.resp_body
387
388 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
389 host_result = RpcResult(data=msg, failed=True, node=name,
390 call=procedure)
391
392 results[name] = host_result
393
394 return results
395
396 - def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
397 _req_process_fn=None):
398 """Makes an RPC request to a number of nodes.
399
400 @type hosts: sequence
401 @param hosts: Hostnames
402 @type procedure: string
403 @param procedure: Request path
404 @type body: dictionary
405 @param body: dictionary with request bodies per host
406 @type read_timeout: int or None
407 @param read_timeout: Read timeout for request
408 @rtype: dictionary
409 @return: a dictionary mapping host names to rpc.RpcResult objects
410
411 """
412 assert read_timeout is not None, \
413 "Missing RPC read timeout for procedure '%s'" % procedure
414
415 if _req_process_fn is None:
416 _req_process_fn = http.client.ProcessRequests
417
418 (results, requests) = \
419 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
420 procedure, body, read_timeout)
421
422 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
423
424 assert not frozenset(results).intersection(requests)
425
426 return self._CombineResults(results, requests, procedure)
427
430 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
431 _req_process_fn=None):
440
441 @staticmethod
443 """Encode argument.
444
445 """
446 if argkind is None:
447 return value
448 else:
449 return encoder_fn(argkind)(value)
450
451 - def _Call(self, cdef, node_list, args):
452 """Entry point for automatically generated RPC wrappers.
453
454 """
455 (procedure, _, resolver_opts, timeout, argdefs,
456 prep_fn, postproc_fn, _) = cdef
457
458 if callable(timeout):
459 read_timeout = timeout(args)
460 else:
461 read_timeout = timeout
462
463 if callable(resolver_opts):
464 req_resolver_opts = resolver_opts(args)
465 else:
466 req_resolver_opts = resolver_opts
467
468 if len(args) != len(argdefs):
469 raise errors.ProgrammerError("Number of passed arguments doesn't match")
470
471 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
472 if prep_fn is None:
473
474
475 body = serializer.DumpJson(enc_args)
476 pnbody = dict((n, body) for n in node_list)
477 else:
478
479
480 assert callable(prep_fn)
481 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
482 for n in node_list)
483
484 result = self._proc(node_list, procedure, pnbody, read_timeout,
485 req_resolver_opts)
486
487 if postproc_fn:
488 return dict(map(lambda (key, value): (key, postproc_fn(value)),
489 result.items()))
490 else:
491 return result
492
495 """Converts an object to a dictionary.
496
497 @note: See L{objects}.
498
499 """
500 return value.ToDict()
501
504 """Converts a list of L{objects} to dictionaries.
505
506 """
507 return map(_ObjectToDict, value)
508
511 """Encodes a dictionary with node name as key and disk objects as values.
512
513 """
514 return dict((name, _ObjectListToDict(disks))
515 for name, disks in value.items())
516
519 """Loads a file and prepares it for an upload to nodes.
520
521 """
522 statcb = utils.FileStatHelper()
523 data = _Compress(utils.ReadFile(filename, preread=statcb))
524 st = statcb.st
525
526 if getents_fn is None:
527 getents_fn = runtime.GetEnts
528
529 getents = getents_fn()
530
531 virt_filename = vcluster.MakeVirtualPath(filename)
532
533 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
534 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
535
538 """Encodes disks for finalizing export.
539
540 """
541 flat_disks = []
542
543 for disk in snap_disks:
544 if isinstance(disk, bool):
545 flat_disks.append(disk)
546 else:
547 flat_disks.append(disk.ToDict())
548
549 return flat_disks
550
553 """Encodes import/export I/O information.
554
555 """
556 if ieio == constants.IEIO_RAW_DISK:
557 assert len(ieioargs) == 1
558 return (ieio, (ieioargs[0].ToDict(), ))
559
560 if ieio == constants.IEIO_SCRIPT:
561 assert len(ieioargs) == 2
562 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
563
564 return (ieio, ieioargs)
565
568 """Encodes information for renaming block devices.
569
570 """
571 return [(d.ToDict(), uid) for d, uid in value]
572
575 """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
576
577 Converts the data into a single dictionary. This is fine for most use cases,
578 but some require information from more than one volume group or hypervisor.
579
580 @param require_vg_info: raise an error if the returnd vg_info
581 doesn't have any values
582
583 """
584 (bootid, vgs_info, (hv_info, )) = data
585
586 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
587
588 if require_vg_info or vgs_info:
589 (vg0_info, ) = vgs_info
590 ret = utils.JoinDisjointDicts(vg0_info, ret)
591
592 return ret
593
596 """Annotates just DRBD disks layouts.
597
598 """
599 assert disk.dev_type == constants.LD_DRBD8
600
601 disk.params = objects.FillDict(drbd_params, disk.params)
602 (dev_data, dev_meta) = disk.children
603 dev_data.params = objects.FillDict(data_params, dev_data.params)
604 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
605
606 return disk
607
610 """Generic disk parameter annotation routine.
611
612 """
613 assert disk.dev_type != constants.LD_DRBD8
614
615 disk.params = objects.FillDict(params, disk.params)
616
617 return disk
618
621 """Annotates the disk objects with the disk parameters.
622
623 @param template: The disk template used
624 @param disks: The list of disks objects to annotate
625 @param disk_params: The disk paramaters for annotation
626 @returns: A list of disk objects annotated
627
628 """
629 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
630
631 if template == constants.DT_DRBD8:
632 annotation_fn = _AnnotateDParamsDRBD
633 elif template == constants.DT_DISKLESS:
634 annotation_fn = lambda disk, _: disk
635 else:
636 annotation_fn = _AnnotateDParamsGeneric
637
638 return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
639
647
650 """Return the exclusive storage flag for all the given nodes.
651
652 @type cfg: L{config.ConfigWriter}
653 @param cfg: cluster configuration
654 @type nodelist: list or tuple
655 @param nodelist: node names for which to read the flag
656 @rtype: dict
657 @return: mapping from node names to exclusive storage flags
658 @raise errors.OpPrereqError: if any given node name has no corresponding node
659
660 """
661 getflag = lambda n: _GetESFlag(cfg, n)
662 flags = map(getflag, nodelist)
663 return dict(zip(nodelist, flags))
664
665
666
667 _ENCODERS = {
668 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
669 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
670 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
671 rpc_defs.ED_COMPRESS: _Compress,
672 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
673 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
674 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
675 }
676
677
678 -class RpcRunner(_RpcClientBase,
679 _generated_rpc.RpcClientDefault,
680 _generated_rpc.RpcClientBootstrap,
681 _generated_rpc.RpcClientDnsOnly,
682 _generated_rpc.RpcClientConfig):
683 """RPC runner class.
684
685 """
686 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
687 """Initialized the RPC runner.
688
689 @type cfg: L{config.ConfigWriter}
690 @param cfg: Configuration
691 @type lock_monitor_cb: callable
692 @param lock_monitor_cb: Lock monitor callback
693
694 """
695 self._cfg = cfg
696
697 encoders = _ENCODERS.copy()
698
699 encoders.update({
700
701 rpc_defs.ED_INST_DICT: self._InstDict,
702 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
703 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
704 rpc_defs.ED_NIC_DICT: self._NicDict,
705
706
707 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
708 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
709
710
711 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
712 })
713
714
715 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
716 cfg.GetAllNodesInfo)
717
718
719
720
721
722 _RpcClientBase.__init__(self, resolver, encoders.get,
723 lock_monitor_cb=lock_monitor_cb,
724 _req_process_fn=_req_process_fn)
725 _generated_rpc.RpcClientConfig.__init__(self)
726 _generated_rpc.RpcClientBootstrap.__init__(self)
727 _generated_rpc.RpcClientDnsOnly.__init__(self)
728 _generated_rpc.RpcClientDefault.__init__(self)
729
741
742 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
743 """Convert the given instance to a dict.
744
745 This is done via the instance's ToDict() method and additionally
746 we fill the hvparams with the cluster defaults.
747
748 @type instance: L{objects.Instance}
749 @param instance: an Instance object
750 @type hvp: dict or None
751 @param hvp: a dictionary with overridden hypervisor parameters
752 @type bep: dict or None
753 @param bep: a dictionary with overridden backend parameters
754 @type osp: dict or None
755 @param osp: a dictionary with overridden os parameters
756 @rtype: dict
757 @return: the instance dict, with the hvparams filled with the
758 cluster defaults
759
760 """
761 idict = instance.ToDict()
762 cluster = self._cfg.GetClusterInfo()
763 idict["hvparams"] = cluster.FillHV(instance)
764 if hvp is not None:
765 idict["hvparams"].update(hvp)
766 idict["beparams"] = cluster.FillBE(instance)
767 if bep is not None:
768 idict["beparams"].update(bep)
769 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
770 if osp is not None:
771 idict["osparams"].update(osp)
772 idict["disks"] = self._DisksDictDP((instance.disks, instance))
773 for nic in idict["nics"]:
774 nic["nicparams"] = objects.FillDict(
775 cluster.nicparams[constants.PP_DEFAULT],
776 nic["nicparams"])
777 network = nic.get("network", None)
778 if network:
779 net_uuid = self._cfg.LookupNetwork(network)
780 if net_uuid:
781 nobj = self._cfg.GetNetwork(net_uuid)
782 nic["netinfo"] = objects.Network.ToDict(nobj)
783 return idict
784
786 """Wrapper for L{_InstDict}.
787
788 """
789 return self._InstDict(instance, hvp=hvp, bep=bep)
790
792 """Wrapper for L{_InstDict}.
793
794 """
795 return self._InstDict(instance, osp=osparams)
796
805
807 """Wrapper for L{AnnotateDiskParams}.
808
809 """
810 (anno_disk,) = self._DisksDictDP(([disk], instance))
811 return anno_disk
812
813
814 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
815 """RPC wrappers for job queue.
816
817 """
818 - def __init__(self, context, address_list):
831
832
833 -class BootstrapRunner(_RpcClientBase,
834 _generated_rpc.RpcClientBootstrap,
835 _generated_rpc.RpcClientDnsOnly):
836 """RPC wrappers for bootstrapping.
837
838 """
851
852
853 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
854 """RPC wrappers for calls using only DNS.
855
856 """
864
865
866 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
867 """RPC wrappers for L{config}.
868
869 """
870 - def __init__(self, context, address_list, _req_process_fn=None,
871 _getents=None):
896