1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Inter-node RPC library.
23
24 """
25
26
27
28
29
30
31
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38 import copy
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51 from ganeti import pathutils
52 from ganeti import vcluster
53
54
55 from ganeti import _generated_rpc
56
57
58 import ganeti.http.client
59
60
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
63 "Expect:",
64 ]
65
66
67 _OFFLINE = object()
71 """Initializes the module-global HTTP client manager.
72
73 Must be called before using any RPC function and while exactly one thread is
74 running.
75
76 """
77
78
79
80 assert threading.activeCount() == 1, \
81 "Found more than one active thread when initializing pycURL"
82
83 logging.info("Using PycURL %s", pycurl.version)
84
85 pycurl.global_init(pycurl.GLOBAL_ALL)
86
89 """Stops the module-global HTTP client manager.
90
91 Must be called before quitting the program and while exactly one thread is
92 running.
93
94 """
95 pycurl.global_cleanup()
96
110
113 """RPC-wrapper decorator.
114
115 When applied to a function, it runs it with the RPC system
116 initialized, and it shutsdown the system afterwards. This means the
117 function must be called without RPC being initialized.
118
119 """
120 def wrapper(*args, **kwargs):
121 Init()
122 try:
123 return fn(*args, **kwargs)
124 finally:
125 Shutdown()
126 return wrapper
127
130 """Compresses a string for transport over RPC.
131
132 Small amounts of data are not compressed.
133
134 @type data: str
135 @param data: Data
136 @rtype: tuple
137 @return: Encoded data to send
138
139 """
140
141 if len(data) < 512:
142 return (constants.RPC_ENCODING_NONE, data)
143
144
145 return (constants.RPC_ENCODING_ZLIB_BASE64,
146 base64.b64encode(zlib.compress(data, 3)))
147
150 """RPC Result class.
151
152 This class holds an RPC result. It is needed since in multi-node
153 calls we can't raise an exception just because one out of many
154 failed, and therefore we use this class to encapsulate the result.
155
156 @ivar data: the data payload, for successful results, or None
157 @ivar call: the name of the RPC call
158 @ivar node: the name of the node to which we made the call
159 @ivar offline: whether the operation failed because the node was
160 offline, as opposed to actual failure; offline=True will always
161 imply failed=True, in order to allow simpler checking if
162 the user doesn't care about the exact failure mode
163 @ivar fail_msg: the error message if the call failed
164
165 """
166 - def __init__(self, data=None, failed=False, offline=False,
167 call=None, node=None):
168 self.offline = offline
169 self.call = call
170 self.node = node
171
172 if offline:
173 self.fail_msg = "Node is marked offline"
174 self.data = self.payload = None
175 elif failed:
176 self.fail_msg = self._EnsureErr(data)
177 self.data = self.payload = None
178 else:
179 self.data = data
180 if not isinstance(self.data, (tuple, list)):
181 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182 type(self.data))
183 self.payload = None
184 elif len(data) != 2:
185 self.fail_msg = ("RPC layer error: invalid result length (%d), "
186 "expected 2" % len(self.data))
187 self.payload = None
188 elif not self.data[0]:
189 self.fail_msg = self._EnsureErr(self.data[1])
190 self.payload = None
191 else:
192
193 self.fail_msg = None
194 self.payload = data[1]
195
196 for attr_name in ["call", "data", "fail_msg",
197 "node", "offline", "payload"]:
198 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199
200 @staticmethod
202 """Helper to ensure we return a 'True' value for error."""
203 if val:
204 return val
205 else:
206 return "No error information"
207
208 - def Raise(self, msg, prereq=False, ecode=None):
209 """If the result has failed, raise an OpExecError.
210
211 This is used so that LU code doesn't have to check for each
212 result, but instead can call this function.
213
214 """
215 if not self.fail_msg:
216 return
217
218 if not msg:
219 msg = ("Call '%s' to node '%s' has failed: %s" %
220 (self.call, self.node, self.fail_msg))
221 else:
222 msg = "%s: %s" % (msg, self.fail_msg)
223 if prereq:
224 ec = errors.OpPrereqError
225 else:
226 ec = errors.OpExecError
227 if ecode is not None:
228 args = (msg, ecode)
229 else:
230 args = (msg, )
231 raise ec(*args)
232
233 - def Warn(self, msg, feedback_fn):
234 """If the result has failed, call the feedback_fn.
235
236 This is used to in cases were LU wants to warn the
237 user about a failure, but continue anyway.
238
239 """
240 if not self.fail_msg:
241 return
242
243 msg = "%s: %s" % (msg, self.fail_msg)
244 feedback_fn(msg)
245
250 """Return addresses for given node names.
251
252 @type ssconf_ips: bool
253 @param ssconf_ips: Use the ssconf IPs
254 @type node_list: list
255 @param node_list: List of node names
256 @type ssc: class
257 @param ssc: SimpleStore class that is used to obtain node->ip mappings
258 @type nslookup_fn: callable
259 @param nslookup_fn: function use to do NS lookup
260 @rtype: list of tuple; (string, string)
261 @return: List of tuples containing node name and IP address
262
263 """
264 ss = ssc()
265 family = ss.GetPrimaryIPFamily()
266
267 if ssconf_ips:
268 iplist = ss.GetNodePrimaryIPList()
269 ipmap = dict(entry.split() for entry in iplist)
270 else:
271 ipmap = {}
272
273 result = []
274 for node in node_list:
275 ip = ipmap.get(node)
276 if ip is None:
277 ip = nslookup_fn(node, family=family)
278 result.append((node, ip, node))
279
280 return result
281
285 """Initializes this class.
286
287 """
288 self._addresses = addresses
289
291 """Returns static addresses for hosts.
292
293 """
294 assert len(hosts) == len(self._addresses)
295 return zip(hosts, self._addresses, hosts)
296
299 """Checks if a node is online.
300
301 @type node_uuid_or_name: string
302 @param node_uuid_or_name: Node UUID
303 @type node: L{objects.Node} or None
304 @param node: Node object
305
306 """
307 if node is None:
308
309
310 return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
311 else:
312 if node.offline and not accept_offline_node:
313 ip = _OFFLINE
314 else:
315 ip = node.primary_ip
316 return (node.name, ip, node_uuid_or_name)
317
320 """Calculate node addresses using configuration.
321
322 Note that strings in node_uuids are treated as node names if the UUID is not
323 found in the configuration.
324
325 """
326 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
327
328 assert accept_offline_node or opts is None, "Unknown option"
329
330
331 if len(node_uuids) == 1:
332 (uuid, ) = node_uuids
333 return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
334 else:
335 all_nodes = all_nodes_fn()
336 return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
337 accept_offline_node)
338 for uuid in node_uuids]
339
342 - def __init__(self, resolver, port, lock_monitor_cb=None):
343 """Initializes this class.
344
345 @param resolver: callable accepting a list of node UUIDs or hostnames,
346 returning a list of tuples containing name, IP address and original name
347 of the resolved node. IP address can be the name or the special value
348 L{_OFFLINE} to mark offline machines.
349 @type port: int
350 @param port: TCP port
351 @param lock_monitor_cb: Callable for registering with lock monitor
352
353 """
354 self._resolver = resolver
355 self._port = port
356 self._lock_monitor_cb = lock_monitor_cb
357
358 @staticmethod
360 """Prepares requests by sorting offline hosts into separate list.
361
362 @type body: dict
363 @param body: a dictionary with per-host body data
364
365 """
366 results = {}
367 requests = {}
368
369 assert isinstance(body, dict)
370 assert len(body) == len(hosts)
371 assert compat.all(isinstance(v, str) for v in body.values())
372 assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
373 "%s != %s" % (hosts, body.keys())
374
375 for (name, ip, original_name) in hosts:
376 if ip is _OFFLINE:
377
378 results[original_name] = RpcResult(node=name,
379 offline=True,
380 call=procedure)
381 else:
382 requests[original_name] = \
383 http.client.HttpClientRequest(str(ip), port,
384 http.HTTP_POST, str("/%s" % procedure),
385 headers=_RPC_CLIENT_HEADERS,
386 post_data=body[original_name],
387 read_timeout=read_timeout,
388 nicename="%s/%s" % (name, procedure),
389 curl_config_fn=_ConfigRpcCurl)
390
391 return (results, requests)
392
393 @staticmethod
395 """Combines pre-computed results for offline hosts with actual call results.
396
397 """
398 for name, req in requests.items():
399 if req.success and req.resp_status_code == http.HTTP_OK:
400 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
401 node=name, call=procedure)
402 else:
403
404 if req.error:
405 msg = req.error
406 else:
407 msg = req.resp_body
408
409 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
410 host_result = RpcResult(data=msg, failed=True, node=name,
411 call=procedure)
412
413 results[name] = host_result
414
415 return results
416
417 - def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
418 _req_process_fn=None):
419 """Makes an RPC request to a number of nodes.
420
421 @type nodes: sequence
422 @param nodes: node UUIDs or Hostnames
423 @type procedure: string
424 @param procedure: Request path
425 @type body: dictionary
426 @param body: dictionary with request bodies per host
427 @type read_timeout: int or None
428 @param read_timeout: Read timeout for request
429 @rtype: dictionary
430 @return: a dictionary mapping host names to rpc.RpcResult objects
431
432 """
433 assert read_timeout is not None, \
434 "Missing RPC read timeout for procedure '%s'" % procedure
435
436 if _req_process_fn is None:
437 _req_process_fn = http.client.ProcessRequests
438
439 (results, requests) = \
440 self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
441 procedure, body, read_timeout)
442
443 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
444
445 assert not frozenset(results).intersection(requests)
446
447 return self._CombineResults(results, requests, procedure)
448
451 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
452 _req_process_fn=None):
461
462 @staticmethod
464 """Encode argument.
465
466 """
467 if argkind is None:
468 return value
469 else:
470 return encoder_fn(argkind)(value)
471
472 - def _Call(self, cdef, node_list, args):
473 """Entry point for automatically generated RPC wrappers.
474
475 """
476 (procedure, _, resolver_opts, timeout, argdefs,
477 prep_fn, postproc_fn, _) = cdef
478
479 if callable(timeout):
480 read_timeout = timeout(args)
481 else:
482 read_timeout = timeout
483
484 if callable(resolver_opts):
485 req_resolver_opts = resolver_opts(args)
486 else:
487 req_resolver_opts = resolver_opts
488
489 if len(args) != len(argdefs):
490 raise errors.ProgrammerError("Number of passed arguments doesn't match")
491
492 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
493 if prep_fn is None:
494
495
496 body = serializer.DumpJson(enc_args)
497 pnbody = dict((n, body) for n in node_list)
498 else:
499
500
501 assert callable(prep_fn)
502 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
503 for n in node_list)
504
505 result = self._proc(node_list, procedure, pnbody, read_timeout,
506 req_resolver_opts)
507
508 if postproc_fn:
509 return dict(map(lambda (key, value): (key, postproc_fn(value)),
510 result.items()))
511 else:
512 return result
513
516 """Converts an object to a dictionary.
517
518 @note: See L{objects}.
519
520 """
521 return value.ToDict()
522
525 """Converts a list of L{objects} to dictionaries.
526
527 """
528 return map(_ObjectToDict, value)
529
532 """Encodes a dictionary with node name as key and disk objects as values.
533
534 """
535 return dict((name, _ObjectListToDict(disks))
536 for name, disks in value.items())
537
540 """Loads a file and prepares it for an upload to nodes.
541
542 """
543 statcb = utils.FileStatHelper()
544 data = _Compress(utils.ReadFile(filename, preread=statcb))
545 st = statcb.st
546
547 if getents_fn is None:
548 getents_fn = runtime.GetEnts
549
550 getents = getents_fn()
551
552 virt_filename = vcluster.MakeVirtualPath(filename)
553
554 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
555 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
556
559 """Encodes disks for finalizing export.
560
561 """
562 flat_disks = []
563
564 for disk in snap_disks:
565 if isinstance(disk, bool):
566 flat_disks.append(disk)
567 else:
568 flat_disks.append(disk.ToDict())
569
570 return flat_disks
571
574 """Encodes import/export I/O information.
575
576 """
577 if ieio == constants.IEIO_RAW_DISK:
578 assert len(ieioargs) == 1
579 return (ieio, (ieioargs[0].ToDict(), ))
580
581 if ieio == constants.IEIO_SCRIPT:
582 assert len(ieioargs) == 2
583 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
584
585 return (ieio, ieioargs)
586
589 """Encodes information for renaming block devices.
590
591 """
592 return [(d.ToDict(), uid) for d, uid in value]
593
596 """Extracts the spindle information from the space info and adds
597 it to the result dictionary.
598
599 @type result: dict of strings
600 @param result: dictionary holding the result of the legacy node info
601 @type space_info: list of dicts of strings
602 @param space_info: list, each row holding space information of one storage
603 unit
604 @rtype: None
605 @return: does not return anything, manipulates the C{result} variable
606
607 """
608 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
609 space_info, constants.ST_LVM_PV)
610 if lvm_pv_info:
611 result["spindles_free"] = lvm_pv_info["storage_free"]
612 result["spindles_total"] = lvm_pv_info["storage_size"]
613 else:
614 raise errors.OpExecError("No spindle storage information available.")
615
618 """Extracts the storage space information of the default storage type from
619 the space info and adds it to the result dictionary.
620
621 @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
622
623 """
624
625 no_defaults = (len(space_info) < 1) or \
626 (space_info[0]["type"] == constants.ST_LVM_PV and len(space_info) == 1)
627
628 default_space_info = None
629 if no_defaults:
630 logging.warning("No storage info provided for default storage type.")
631 else:
632 default_space_info = space_info[0]
633
634 if default_space_info:
635 result["name"] = default_space_info["name"]
636 result["storage_free"] = default_space_info["storage_free"]
637 result["storage_size"] = default_space_info["storage_size"]
638
641 """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
642
643 Converts the data into a single dictionary. This is fine for most use cases,
644 but some require information from more than one volume group or hypervisor.
645
646 @param require_spindles: add spindle storage information to the legacy node
647 info
648
649 """
650 (bootid, space_info, (hv_info, )) = data
651
652 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
653
654 if require_spindles:
655 _AddSpindlesToLegacyNodeInfo(ret, space_info)
656 _AddDefaultStorageInfoToLegacyNodeInfo(ret, space_info)
657
658 return ret
659
662 """Annotates just DRBD disks layouts.
663
664 """
665 assert disk.dev_type == constants.DT_DRBD8
666
667 disk.params = objects.FillDict(drbd_params, disk.params)
668 (dev_data, dev_meta) = disk.children
669 dev_data.params = objects.FillDict(data_params, dev_data.params)
670 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
671
672 return disk
673
676 """Generic disk parameter annotation routine.
677
678 """
679 assert disk.dev_type != constants.DT_DRBD8
680
681 disk.params = objects.FillDict(params, disk.params)
682
683 return disk
684
687 """Annotates the disk objects with the disk parameters.
688
689 @param template: The disk template used
690 @param disks: The list of disks objects to annotate
691 @param disk_params: The disk paramaters for annotation
692 @returns: A list of disk objects annotated
693
694 """
695 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
696
697 if template == constants.DT_DRBD8:
698 annotation_fn = _AnnotateDParamsDRBD
699 elif template == constants.DT_DISKLESS:
700 annotation_fn = lambda disk, _: disk
701 else:
702 annotation_fn = _AnnotateDParamsGeneric
703
704 return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
705
713
716 """Adds the exclusive storage flag to lvm units.
717
718 This function creates a copy of the storage_units lists, with the
719 es_flag being added to all lvm storage units.
720
721 @type storage_units: list of pairs (string, string)
722 @param storage_units: list of 'raw' storage units, consisting only of
723 (storage_type, storage_key)
724 @type es_flag: boolean
725 @param es_flag: exclusive storage flag
726 @rtype: list of tuples (string, string, list)
727 @return: list of storage units (storage_type, storage_key, params) with
728 the params containing the es_flag for lvm-vg storage units
729
730 """
731 result = []
732 for (storage_type, storage_key) in storage_units:
733 if storage_type in [constants.ST_LVM_VG, constants.ST_LVM_PV]:
734 result.append((storage_type, storage_key, [es_flag]))
735 else:
736 result.append((storage_type, storage_key, []))
737 return result
738
741 """Return the exclusive storage flag for all the given nodes.
742
743 @type cfg: L{config.ConfigWriter}
744 @param cfg: cluster configuration
745 @type node_uuids: list or tuple
746 @param node_uuids: node UUIDs for which to read the flag
747 @rtype: dict
748 @return: mapping from node uuids to exclusive storage flags
749 @raise errors.OpPrereqError: if any given node name has no corresponding
750 node
751
752 """
753 getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
754 flags = map(getflag, node_uuids)
755 return dict(zip(node_uuids, flags))
756
759 """Return the lvm storage unit for all the given nodes.
760
761 Main purpose of this function is to map the exclusive storage flag, which
762 can be different for each node, to the default LVM storage unit.
763
764 @type cfg: L{config.ConfigWriter}
765 @param cfg: cluster configuration
766 @type storage_units: list of pairs (string, string)
767 @param storage_units: list of 'raw' storage units, e.g. pairs of
768 (storage_type, storage_key)
769 @type node_uuids: list or tuple
770 @param node_uuids: node UUIDs for which to read the flag
771 @rtype: dict
772 @return: mapping from node uuids to a list of storage units which include
773 the exclusive storage flag for lvm storage
774 @raise errors.OpPrereqError: if any given node name has no corresponding
775 node
776
777 """
778 getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
779 storage_units, _GetExclusiveStorageFlag(cfg, n))
780 flags = map(getunit, node_uuids)
781 return dict(zip(node_uuids, flags))
782
783
784
785 _ENCODERS = {
786 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
787 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
788 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
789 rpc_defs.ED_COMPRESS: _Compress,
790 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
791 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
792 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
793 }
794
795
796 -class RpcRunner(_RpcClientBase,
797 _generated_rpc.RpcClientDefault,
798 _generated_rpc.RpcClientBootstrap,
799 _generated_rpc.RpcClientDnsOnly,
800 _generated_rpc.RpcClientConfig):
801 """RPC runner class.
802
803 """
804 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
805 """Initialized the RPC runner.
806
807 @type cfg: L{config.ConfigWriter}
808 @param cfg: Configuration
809 @type lock_monitor_cb: callable
810 @param lock_monitor_cb: Lock monitor callback
811
812 """
813 self._cfg = cfg
814
815 encoders = _ENCODERS.copy()
816
817 encoders.update({
818
819 rpc_defs.ED_INST_DICT: self._InstDict,
820 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
821 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
822 rpc_defs.ED_NIC_DICT: self._NicDict,
823
824
825 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
826 rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
827 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
828
829
830 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
831 })
832
833
834 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
835 cfg.GetAllNodesInfo)
836
837
838
839
840
841 _RpcClientBase.__init__(self, resolver, encoders.get,
842 lock_monitor_cb=lock_monitor_cb,
843 _req_process_fn=_req_process_fn)
844 _generated_rpc.RpcClientConfig.__init__(self)
845 _generated_rpc.RpcClientBootstrap.__init__(self)
846 _generated_rpc.RpcClientDnsOnly.__init__(self)
847 _generated_rpc.RpcClientDefault.__init__(self)
848
860
861 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
862 """Convert the given instance to a dict.
863
864 This is done via the instance's ToDict() method and additionally
865 we fill the hvparams with the cluster defaults.
866
867 @type instance: L{objects.Instance}
868 @param instance: an Instance object
869 @type hvp: dict or None
870 @param hvp: a dictionary with overridden hypervisor parameters
871 @type bep: dict or None
872 @param bep: a dictionary with overridden backend parameters
873 @type osp: dict or None
874 @param osp: a dictionary with overridden os parameters
875 @rtype: dict
876 @return: the instance dict, with the hvparams filled with the
877 cluster defaults
878
879 """
880 idict = instance.ToDict()
881 cluster = self._cfg.GetClusterInfo()
882 idict["hvparams"] = cluster.FillHV(instance)
883 if hvp is not None:
884 idict["hvparams"].update(hvp)
885 idict["beparams"] = cluster.FillBE(instance)
886 if bep is not None:
887 idict["beparams"].update(bep)
888 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
889 if osp is not None:
890 idict["osparams"].update(osp)
891 idict["disks"] = self._DisksDictDP((instance.disks, instance))
892 for nic in idict["nics"]:
893 nic["nicparams"] = objects.FillDict(
894 cluster.nicparams[constants.PP_DEFAULT],
895 nic["nicparams"])
896 network = nic.get("network", None)
897 if network:
898 net_uuid = self._cfg.LookupNetwork(network)
899 if net_uuid:
900 nobj = self._cfg.GetNetwork(net_uuid)
901 nic["netinfo"] = objects.Network.ToDict(nobj)
902 return idict
903
905 """Wrapper for L{_InstDict}.
906
907 """
908 return self._InstDict(instance, hvp=hvp, bep=bep)
909
911 """Wrapper for L{_InstDict}.
912
913 """
914 return self._InstDict(instance, osp=osparams)
915
924
926 """Wrapper for L{AnnotateDiskParams}.
927
928 Supports a list of (disk, instance) tuples.
929 """
930 return [disk for disk_inst in disks_insts
931 for disk in self._DisksDictDP(disk_inst)]
932
934 """Wrapper for L{AnnotateDiskParams}.
935
936 """
937 (anno_disk,) = self._DisksDictDP(([disk], instance))
938 return anno_disk
939
940
941 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
942 """RPC wrappers for job queue.
943
944 """
945 - def __init__(self, context, address_list):
958
959
960 -class BootstrapRunner(_RpcClientBase,
961 _generated_rpc.RpcClientBootstrap,
962 _generated_rpc.RpcClientDnsOnly):
963 """RPC wrappers for bootstrapping.
964
965 """
978
979
980 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
981 """RPC wrappers for calls using only DNS.
982
983 """
991
992
993 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
994 """RPC wrappers for L{config}.
995
996 """
997 - def __init__(self, context, address_list, _req_process_fn=None,
998 _getents=None):
1023