1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Inter-node RPC library.
32
33 """
34
35
36
37
38
39
40
41
42 import logging
43 import zlib
44 import base64
45 import pycurl
46 import threading
47 import copy
48 import os
49
50 from ganeti import utils
51 from ganeti import objects
52 from ganeti import http
53 from ganeti import serializer
54 from ganeti import constants
55 from ganeti import errors
56 from ganeti import netutils
57 from ganeti import ssconf
58 from ganeti import runtime
59 from ganeti import compat
60 from ganeti import rpc_defs
61 from ganeti import pathutils
62 from ganeti import vcluster
63
64
65 from ganeti import _generated_rpc
66
67
68 import ganeti.http.client
69
70
71 _RPC_CLIENT_HEADERS = [
72 "Content-type: %s" % http.HTTP_APP_JSON,
73 "Expect:",
74 ]
75
76
77 _OFFLINE = object()
81 """Initializes the module-global HTTP client manager.
82
83 Must be called before using any RPC function and while exactly one thread is
84 running.
85
86 """
87
88
89
90 assert threading.activeCount() == 1, \
91 "Found more than one active thread when initializing pycURL"
92
93 logging.info("Using PycURL %s", pycurl.version)
94
95 pycurl.global_init(pycurl.GLOBAL_ALL)
96
99 """Stops the module-global HTTP client manager.
100
101 Must be called before quitting the program and while exactly one thread is
102 running.
103
104 """
105 pycurl.global_cleanup()
106
109 noded_cert = str(pathutils.NODED_CERT_FILE)
110 noded_client_cert = str(pathutils.NODED_CLIENT_CERT_FILE)
111
112
113
114 if not os.path.exists(noded_client_cert):
115 logging.info("Using server certificate as client certificate for RPC"
116 "call.")
117 noded_client_cert = noded_cert
118
119 curl.setopt(pycurl.FOLLOWLOCATION, False)
120 curl.setopt(pycurl.CAINFO, noded_cert)
121 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
122 curl.setopt(pycurl.SSL_VERIFYPEER, True)
123 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
124 curl.setopt(pycurl.SSLCERT, noded_client_cert)
125 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
126 curl.setopt(pycurl.SSLKEY, noded_client_cert)
127 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
128
131 """RPC-wrapper decorator.
132
133 When applied to a function, it runs it with the RPC system
134 initialized, and it shutsdown the system afterwards. This means the
135 function must be called without RPC being initialized.
136
137 """
138 def wrapper(*args, **kwargs):
139 Init()
140 try:
141 return fn(*args, **kwargs)
142 finally:
143 Shutdown()
144 return wrapper
145
148 """Compresses a string for transport over RPC.
149
150 Small amounts of data are not compressed.
151
152 @type data: str
153 @param data: Data
154 @rtype: tuple
155 @return: Encoded data to send
156
157 """
158
159 if len(data) < 512:
160 return (constants.RPC_ENCODING_NONE, data)
161
162
163 return (constants.RPC_ENCODING_ZLIB_BASE64,
164 base64.b64encode(zlib.compress(data, 3)))
165
168 """RPC Result class.
169
170 This class holds an RPC result. It is needed since in multi-node
171 calls we can't raise an exception just because one out of many
172 failed, and therefore we use this class to encapsulate the result.
173
174 @ivar data: the data payload, for successful results, or None
175 @ivar call: the name of the RPC call
176 @ivar node: the name of the node to which we made the call
177 @ivar offline: whether the operation failed because the node was
178 offline, as opposed to actual failure; offline=True will always
179 imply failed=True, in order to allow simpler checking if
180 the user doesn't care about the exact failure mode
181 @ivar fail_msg: the error message if the call failed
182
183 """
184 - def __init__(self, data=None, failed=False, offline=False,
185 call=None, node=None):
186 self.offline = offline
187 self.call = call
188 self.node = node
189
190 if offline:
191 self.fail_msg = "Node is marked offline"
192 self.data = self.payload = None
193 elif failed:
194 self.fail_msg = self._EnsureErr(data)
195 self.data = self.payload = None
196 else:
197 self.data = data
198 if not isinstance(self.data, (tuple, list)):
199 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
200 type(self.data))
201 self.payload = None
202 elif len(data) != 2:
203 self.fail_msg = ("RPC layer error: invalid result length (%d), "
204 "expected 2" % len(self.data))
205 self.payload = None
206 elif not self.data[0]:
207 self.fail_msg = self._EnsureErr(self.data[1])
208 self.payload = None
209 else:
210
211 self.fail_msg = None
212 self.payload = data[1]
213
214 for attr_name in ["call", "data", "fail_msg",
215 "node", "offline", "payload"]:
216 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
217
219 return ("RpcResult(data=%s, call=%s, node=%s, offline=%s, fail_msg=%s)" %
220 (self.offline, self.call, self.node, self.offline, self.fail_msg))
221
222 @staticmethod
224 """Helper to ensure we return a 'True' value for error."""
225 if val:
226 return val
227 else:
228 return "No error information"
229
230 - def Raise(self, msg, prereq=False, ecode=None):
231 """If the result has failed, raise an OpExecError.
232
233 This is used so that LU code doesn't have to check for each
234 result, but instead can call this function.
235
236 """
237 if not self.fail_msg:
238 return
239
240 if not msg:
241 msg = ("Call '%s' to node '%s' has failed: %s" %
242 (self.call, self.node, self.fail_msg))
243 else:
244 msg = "%s: %s" % (msg, self.fail_msg)
245 if prereq:
246 ec = errors.OpPrereqError
247 else:
248 ec = errors.OpExecError
249 if ecode is not None:
250 args = (msg, ecode)
251 else:
252 args = (msg, )
253 raise ec(*args)
254
255 - def Warn(self, msg, feedback_fn):
256 """If the result has failed, call the feedback_fn.
257
258 This is used to in cases were LU wants to warn the
259 user about a failure, but continue anyway.
260
261 """
262 if not self.fail_msg:
263 return
264
265 msg = "%s: %s" % (msg, self.fail_msg)
266 feedback_fn(msg)
267
272 """Return addresses for given node names.
273
274 @type ssconf_ips: bool
275 @param ssconf_ips: Use the ssconf IPs
276 @type node_list: list
277 @param node_list: List of node names
278 @type ssc: class
279 @param ssc: SimpleStore class that is used to obtain node->ip mappings
280 @type nslookup_fn: callable
281 @param nslookup_fn: function use to do NS lookup
282 @rtype: list of tuple; (string, string)
283 @return: List of tuples containing node name and IP address
284
285 """
286 ss = ssc()
287 family = ss.GetPrimaryIPFamily()
288
289 if ssconf_ips:
290 iplist = ss.GetNodePrimaryIPList()
291 ipmap = dict(entry.split() for entry in iplist)
292 else:
293 ipmap = {}
294
295 result = []
296 for node in node_list:
297 ip = ipmap.get(node)
298 if ip is None:
299 ip = nslookup_fn(node, family=family)
300 result.append((node, ip, node))
301
302 return result
303
307 """Initializes this class.
308
309 """
310 self._addresses = addresses
311
313 """Returns static addresses for hosts.
314
315 """
316 assert len(hosts) == len(self._addresses)
317 return zip(hosts, self._addresses, hosts)
318
321 """Checks if a node is online.
322
323 @type node_uuid_or_name: string
324 @param node_uuid_or_name: Node UUID
325 @type node: L{objects.Node} or None
326 @param node: Node object
327
328 """
329 if node is None:
330
331
332 return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
333 else:
334 if node.offline and not accept_offline_node:
335 ip = _OFFLINE
336 else:
337 ip = node.primary_ip
338 return (node.name, ip, node_uuid_or_name)
339
342 """Calculate node addresses using configuration.
343
344 Note that strings in node_uuids are treated as node names if the UUID is not
345 found in the configuration.
346
347 """
348 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
349
350 assert accept_offline_node or opts is None, "Unknown option"
351
352
353 if len(node_uuids) == 1:
354 (uuid, ) = node_uuids
355 return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
356 else:
357 all_nodes = all_nodes_fn()
358 return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
359 accept_offline_node)
360 for uuid in node_uuids]
361
364 - def __init__(self, resolver, port, lock_monitor_cb=None):
365 """Initializes this class.
366
367 @param resolver: callable accepting a list of node UUIDs or hostnames,
368 returning a list of tuples containing name, IP address and original name
369 of the resolved node. IP address can be the name or the special value
370 L{_OFFLINE} to mark offline machines.
371 @type port: int
372 @param port: TCP port
373 @param lock_monitor_cb: Callable for registering with lock monitor
374
375 """
376 self._resolver = resolver
377 self._port = port
378 self._lock_monitor_cb = lock_monitor_cb
379
380 @staticmethod
382 """Prepares requests by sorting offline hosts into separate list.
383
384 @type body: dict
385 @param body: a dictionary with per-host body data
386
387 """
388 results = {}
389 requests = {}
390
391 assert isinstance(body, dict)
392 assert len(body) == len(hosts)
393 assert compat.all(isinstance(v, str) for v in body.values())
394 assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
395 "%s != %s" % (hosts, body.keys())
396
397 for (name, ip, original_name) in hosts:
398 if ip is _OFFLINE:
399
400 results[original_name] = RpcResult(node=name,
401 offline=True,
402 call=procedure)
403 else:
404 requests[original_name] = \
405 http.client.HttpClientRequest(str(ip), port,
406 http.HTTP_POST, str("/%s" % procedure),
407 headers=_RPC_CLIENT_HEADERS,
408 post_data=body[original_name],
409 read_timeout=read_timeout,
410 nicename="%s/%s" % (name, procedure),
411 curl_config_fn=_ConfigRpcCurl)
412
413 return (results, requests)
414
415 @staticmethod
417 """Combines pre-computed results for offline hosts with actual call results.
418
419 """
420 for name, req in requests.items():
421 if req.success and req.resp_status_code == http.HTTP_OK:
422 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
423 node=name, call=procedure)
424 else:
425
426 if req.error:
427 msg = req.error
428 else:
429 msg = req.resp_body
430
431 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
432 host_result = RpcResult(data=msg, failed=True, node=name,
433 call=procedure)
434
435 results[name] = host_result
436
437 return results
438
439 - def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
440 _req_process_fn=None):
441 """Makes an RPC request to a number of nodes.
442
443 @type nodes: sequence
444 @param nodes: node UUIDs or Hostnames
445 @type procedure: string
446 @param procedure: Request path
447 @type body: dictionary
448 @param body: dictionary with request bodies per host
449 @type read_timeout: int or None
450 @param read_timeout: Read timeout for request
451 @rtype: dictionary
452 @return: a dictionary mapping host names to rpc.RpcResult objects
453
454 """
455 assert read_timeout is not None, \
456 "Missing RPC read timeout for procedure '%s'" % procedure
457
458 if _req_process_fn is None:
459 _req_process_fn = http.client.ProcessRequests
460
461 (results, requests) = \
462 self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
463 procedure, body, read_timeout)
464
465 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
466
467 assert not frozenset(results).intersection(requests)
468
469 return self._CombineResults(results, requests, procedure)
470
473 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
474 _req_process_fn=None):
483
484 @staticmethod
485 - def _EncodeArg(encoder_fn, node, (argkind, value)):
486 """Encode argument.
487
488 """
489 if argkind is None:
490 return value
491 else:
492 return encoder_fn(argkind)(node, value)
493
494 - def _Call(self, cdef, node_list, args):
495 """Entry point for automatically generated RPC wrappers.
496
497 """
498 (procedure, _, resolver_opts, timeout, argdefs,
499 prep_fn, postproc_fn, _) = cdef
500
501 if callable(timeout):
502 read_timeout = timeout(args)
503 else:
504 read_timeout = timeout
505
506 if callable(resolver_opts):
507 req_resolver_opts = resolver_opts(args)
508 else:
509 req_resolver_opts = resolver_opts
510
511 if len(args) != len(argdefs):
512 raise errors.ProgrammerError("Number of passed arguments doesn't match")
513
514 if prep_fn is None:
515 prep_fn = lambda _, args: args
516 assert callable(prep_fn)
517
518
519
520 encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
521 zip(map(compat.snd, argdefs), args))
522 pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
523 for n in node_list)
524
525 result = self._proc(node_list, procedure, pnbody, read_timeout,
526 req_resolver_opts)
527
528 if postproc_fn:
529 return dict(map(lambda (key, value): (key, postproc_fn(value)),
530 result.items()))
531 else:
532 return result
533
536 """Converts an object to a dictionary.
537
538 @note: See L{objects}.
539
540 """
541 return value.ToDict()
542
549
568
571 """Encodes disks for finalizing export.
572
573 """
574 flat_disks = []
575
576 for disk in snap_disks:
577 if isinstance(disk, bool):
578 flat_disks.append(disk)
579 else:
580 flat_disks.append(disk.ToDict())
581
582 return flat_disks
583
586 """Encodes information for renaming block devices.
587
588 """
589 return [(d.ToDict(), uid) for d, uid in value]
590
593 """Extracts the spindle information from the space info and adds
594 it to the result dictionary.
595
596 @type result: dict of strings
597 @param result: dictionary holding the result of the legacy node info
598 @type space_info: list of dicts of strings
599 @param space_info: list, each row holding space information of one storage
600 unit
601 @rtype: None
602 @return: does not return anything, manipulates the C{result} variable
603
604 """
605 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
606 space_info, constants.ST_LVM_PV)
607 if lvm_pv_info:
608 result["spindles_free"] = lvm_pv_info["storage_free"]
609 result["spindles_total"] = lvm_pv_info["storage_size"]
610 else:
611 result["spindles_free"] = 0
612 result["spindles_total"] = 0
613
617 """Extracts the storage space information of the disk template from
618 the space info and adds it to the result dictionary.
619
620 @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
621
622 """
623 if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template):
624 disk_info = utils.storage.LookupSpaceInfoByDiskTemplate(
625 space_info, disk_template)
626 result["name"] = disk_info["name"]
627 result["storage_free"] = disk_info["storage_free"]
628 result["storage_size"] = disk_info["storage_size"]
629 else:
630
631 result["storage_free"] = 0
632 result["storage_size"] = 0
633
636 """Formats the data returned by call_node_info.
637
638 Converts the data into a single dictionary. This is fine for most use cases,
639 but some require information from more than one volume group or hypervisor.
640
641 """
642 (bootid, space_info, (hv_info, )) = data
643
644 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
645
646 _AddSpindlesToLegacyNodeInfo(ret, space_info)
647 _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template)
648
649 return ret
650
653 """Annotates just DRBD disks layouts.
654
655 """
656 assert disk.dev_type == constants.DT_DRBD8
657
658 disk.params = objects.FillDict(drbd_params, disk.params)
659 (dev_data, dev_meta) = disk.children
660 dev_data.params = objects.FillDict(data_params, dev_data.params)
661 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
662
663 return disk
664
667 """Generic disk parameter annotation routine.
668
669 """
670 assert disk.dev_type != constants.DT_DRBD8
671
672 disk.params = objects.FillDict(params, disk.params)
673
674 return disk
675
678 """Annotates the disk objects with the disk parameters.
679
680 @param disks: The list of disks objects to annotate
681 @param disk_params: The disk parameters for annotation
682 @returns: A list of disk objects annotated
683
684 """
685 def AnnotateDisk(disk):
686 if disk.dev_type == constants.DT_DISKLESS:
687 return disk
688
689 ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
690
691 if disk.dev_type == constants.DT_DRBD8:
692 return _AnnotateDParamsDRBD(disk, ld_params)
693 else:
694 return _AnnotateDParamsGeneric(disk, ld_params)
695
696 return [AnnotateDisk(disk.Copy()) for disk in disks]
697
705
708 """Adds the exclusive storage flag to lvm units.
709
710 This function creates a copy of the storage_units lists, with the
711 es_flag being added to all lvm storage units.
712
713 @type storage_units: list of pairs (string, string)
714 @param storage_units: list of 'raw' storage units, consisting only of
715 (storage_type, storage_key)
716 @type es_flag: boolean
717 @param es_flag: exclusive storage flag
718 @rtype: list of tuples (string, string, list)
719 @return: list of storage units (storage_type, storage_key, params) with
720 the params containing the es_flag for lvm-vg storage units
721
722 """
723 result = []
724 for (storage_type, storage_key) in storage_units:
725 if storage_type in [constants.ST_LVM_VG]:
726 result.append((storage_type, storage_key, [es_flag]))
727 if es_flag:
728 result.append((constants.ST_LVM_PV, storage_key, [es_flag]))
729 else:
730 result.append((storage_type, storage_key, []))
731 return result
732
735 """Return the exclusive storage flag for all the given nodes.
736
737 @type cfg: L{config.ConfigWriter}
738 @param cfg: cluster configuration
739 @type node_uuids: list or tuple
740 @param node_uuids: node UUIDs for which to read the flag
741 @rtype: dict
742 @return: mapping from node uuids to exclusive storage flags
743 @raise errors.OpPrereqError: if any given node name has no corresponding
744 node
745
746 """
747 getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
748 flags = map(getflag, node_uuids)
749 return dict(zip(node_uuids, flags))
750
753 """Return the lvm storage unit for all the given nodes.
754
755 Main purpose of this function is to map the exclusive storage flag, which
756 can be different for each node, to the default LVM storage unit.
757
758 @type cfg: L{config.ConfigWriter}
759 @param cfg: cluster configuration
760 @type storage_units: list of pairs (string, string)
761 @param storage_units: list of 'raw' storage units, e.g. pairs of
762 (storage_type, storage_key)
763 @type node_uuids: list or tuple
764 @param node_uuids: node UUIDs for which to read the flag
765 @rtype: dict
766 @return: mapping from node uuids to a list of storage units which include
767 the exclusive storage flag for lvm storage
768 @raise errors.OpPrereqError: if any given node name has no corresponding
769 node
770
771 """
772 getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
773 storage_units, _GetExclusiveStorageFlag(cfg, n))
774 flags = map(getunit, node_uuids)
775 return dict(zip(node_uuids, flags))
776
777
778
779 _ENCODERS = {
780 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
781 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
782 rpc_defs.ED_COMPRESS: _Compress,
783 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
784 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
785 }
786
787
788 -class RpcRunner(_RpcClientBase,
789 _generated_rpc.RpcClientDefault,
790 _generated_rpc.RpcClientBootstrap,
791 _generated_rpc.RpcClientDnsOnly,
792 _generated_rpc.RpcClientConfig):
793 """RPC runner class.
794
795 """
796 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
797 """Initialized the RPC runner.
798
799 @type cfg: L{config.ConfigWriter}
800 @param cfg: Configuration
801 @type lock_monitor_cb: callable
802 @param lock_monitor_cb: Lock monitor callback
803
804 """
805 self._cfg = cfg
806
807 encoders = _ENCODERS.copy()
808
809 encoders.update({
810
811 rpc_defs.ED_INST_DICT: self._InstDict,
812 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
813 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
814 rpc_defs.ED_NIC_DICT: self._NicDict,
815 rpc_defs.ED_DEVICE_DICT: self._DeviceDict,
816
817
818 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
819 rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
820 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
821 rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
822
823
824 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
825
826 rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
827 })
828
829
830 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
831 cfg.GetAllNodesInfo)
832
833
834
835
836
837 _RpcClientBase.__init__(self, resolver, encoders.get,
838 lock_monitor_cb=lock_monitor_cb,
839 _req_process_fn=_req_process_fn)
840 _generated_rpc.RpcClientConfig.__init__(self)
841 _generated_rpc.RpcClientBootstrap.__init__(self)
842 _generated_rpc.RpcClientDnsOnly.__init__(self)
843 _generated_rpc.RpcClientDefault.__init__(self)
844
856
862
863 - def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
864 """Convert the given instance to a dict.
865
866 This is done via the instance's ToDict() method and additionally
867 we fill the hvparams with the cluster defaults.
868
869 @type instance: L{objects.Instance}
870 @param instance: an Instance object
871 @type hvp: dict or None
872 @param hvp: a dictionary with overridden hypervisor parameters
873 @type bep: dict or None
874 @param bep: a dictionary with overridden backend parameters
875 @type osp: dict or None
876 @param osp: a dictionary with overridden os parameters
877 @rtype: dict
878 @return: the instance dict, with the hvparams filled with the
879 cluster defaults
880
881 """
882 idict = instance.ToDict()
883 cluster = self._cfg.GetClusterInfo()
884 idict["hvparams"] = cluster.FillHV(instance)
885 if hvp is not None:
886 idict["hvparams"].update(hvp)
887 idict["beparams"] = cluster.FillBE(instance)
888 if bep is not None:
889 idict["beparams"].update(bep)
890 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
891 if osp is not None:
892 idict["osparams"].update(osp)
893 idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
894 for nic in idict["nics"]:
895 nic["nicparams"] = objects.FillDict(
896 cluster.nicparams[constants.PP_DEFAULT],
897 nic["nicparams"])
898 network = nic.get("network", None)
899 if network:
900 net_uuid = self._cfg.LookupNetwork(network)
901 if net_uuid:
902 nobj = self._cfg.GetNetwork(net_uuid)
903 nic["netinfo"] = objects.Network.ToDict(nobj)
904 return idict
905
911
917
934
936 """Wrapper for L{AnnotateDiskParams}.
937
938 Supports a list of (disk, instance) tuples.
939 """
940 return [disk for disk_inst in disks_insts
941 for disk in self._DisksDictDP(node, disk_inst)]
942
944 """Wrapper for L{AnnotateDiskParams}.
945
946 """
947 (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
948 return anno_disk
949
951 """Encode dict of node name -> list of (disk, instance) tuples as values.
952
953 """
954 return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
955 for name, disks in value.items())
956
970
971
972 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
973 """RPC wrappers for job queue.
974
975 """
976 - def __init__(self, context, address_list):
989
990
991 -class BootstrapRunner(_RpcClientBase,
992 _generated_rpc.RpcClientBootstrap,
993 _generated_rpc.RpcClientDnsOnly):
994 """RPC wrappers for bootstrapping.
995
996 """
1009
1010
1011 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
1012 """RPC wrappers for calls using only DNS.
1013
1014 """
1022
1023
1024 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1025 """RPC wrappers for L{config}.
1026
1027 """
1028 - def __init__(self, context, address_list, _req_process_fn=None,
1029 _getents=None):
1054