Package ganeti :: Module rpc
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Inter-node RPC library. 
 23   
 24  """ 
 25   
 26  # pylint: disable=C0103,R0201,R0904 
 27  # C0103: Invalid name, since call_ are not valid 
 28  # R0201: Method could be a function, we keep all rpcs instance methods 
 29  # as not to change them back and forth between static/instance methods 
 30  # if they need to start using instance attributes 
 31  # R0904: Too many public methods 
 32   
 33  import logging 
 34  import zlib 
 35  import base64 
 36  import pycurl 
 37  import threading 
 38   
 39  from ganeti import utils 
 40  from ganeti import objects 
 41  from ganeti import http 
 42  from ganeti import serializer 
 43  from ganeti import constants 
 44  from ganeti import errors 
 45  from ganeti import netutils 
 46  from ganeti import ssconf 
 47  from ganeti import runtime 
 48  from ganeti import compat 
 49  from ganeti import rpc_defs 
 50   
 51  # Special module generated at build time 
 52  from ganeti import _generated_rpc 
 53   
 54  # pylint has a bug here, doesn't see this import 
 55  import ganeti.http.client  # pylint: disable=W0611 
 56   
 57   
 58  # Timeout for connecting to nodes (seconds) 
 59  _RPC_CONNECT_TIMEOUT = 5 
 60   
 61  _RPC_CLIENT_HEADERS = [ 
 62    "Content-type: %s" % http.HTTP_APP_JSON, 
 63    "Expect:", 
 64    ] 
 65   
 66  # Various time constants for the timeout table 
 67  _TMO_URGENT = 60 # one minute 
 68  _TMO_FAST = 5 * 60 # five minutes 
 69  _TMO_NORMAL = 15 * 60 # 15 minutes 
 70  _TMO_SLOW = 3600 # one hour 
 71  _TMO_4HRS = 4 * 3600 
 72  _TMO_1DAY = 86400 
 73   
 74  #: Special value to describe an offline host 
 75  _OFFLINE = object() 
76 77 78 -def Init():
79 """Initializes the module-global HTTP client manager. 80 81 Must be called before using any RPC function and while exactly one thread is 82 running. 83 84 """ 85 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 86 # one thread running. This check is just a safety measure -- it doesn't 87 # cover all cases. 88 assert threading.activeCount() == 1, \ 89 "Found more than one active thread when initializing pycURL" 90 91 logging.info("Using PycURL %s", pycurl.version) 92 93 pycurl.global_init(pycurl.GLOBAL_ALL)
94
95 96 -def Shutdown():
97 """Stops the module-global HTTP client manager. 98 99 Must be called before quitting the program and while exactly one thread is 100 running. 101 102 """ 103 pycurl.global_cleanup()
104
105 106 -def _ConfigRpcCurl(curl):
107 noded_cert = str(constants.NODED_CERT_FILE) 108 109 curl.setopt(pycurl.FOLLOWLOCATION, False) 110 curl.setopt(pycurl.CAINFO, noded_cert) 111 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 112 curl.setopt(pycurl.SSL_VERIFYPEER, True) 113 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 114 curl.setopt(pycurl.SSLCERT, noded_cert) 115 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 116 curl.setopt(pycurl.SSLKEY, noded_cert) 117 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118
119 120 -def RunWithRPC(fn):
121 """RPC-wrapper decorator. 122 123 When applied to a function, it runs it with the RPC system 124 initialized, and it shutsdown the system afterwards. This means the 125 function must be called without RPC being initialized. 126 127 """ 128 def wrapper(*args, **kwargs): 129 Init() 130 try: 131 return fn(*args, **kwargs) 132 finally: 133 Shutdown()
134 return wrapper 135
136 137 -def _Compress(data):
138 """Compresses a string for transport over RPC. 139 140 Small amounts of data are not compressed. 141 142 @type data: str 143 @param data: Data 144 @rtype: tuple 145 @return: Encoded data to send 146 147 """ 148 # Small amounts of data are not compressed 149 if len(data) < 512: 150 return (constants.RPC_ENCODING_NONE, data) 151 152 # Compress with zlib and encode in base64 153 return (constants.RPC_ENCODING_ZLIB_BASE64, 154 base64.b64encode(zlib.compress(data, 3)))
155
156 157 -class RpcResult(object):
158 """RPC Result class. 159 160 This class holds an RPC result. It is needed since in multi-node 161 calls we can't raise an exception just because one one out of many 162 failed, and therefore we use this class to encapsulate the result. 163 164 @ivar data: the data payload, for successful results, or None 165 @ivar call: the name of the RPC call 166 @ivar node: the name of the node to which we made the call 167 @ivar offline: whether the operation failed because the node was 168 offline, as opposed to actual failure; offline=True will always 169 imply failed=True, in order to allow simpler checking if 170 the user doesn't care about the exact failure mode 171 @ivar fail_msg: the error message if the call failed 172 173 """
174 - def __init__(self, data=None, failed=False, offline=False, 175 call=None, node=None):
176 self.offline = offline 177 self.call = call 178 self.node = node 179 180 if offline: 181 self.fail_msg = "Node is marked offline" 182 self.data = self.payload = None 183 elif failed: 184 self.fail_msg = self._EnsureErr(data) 185 self.data = self.payload = None 186 else: 187 self.data = data 188 if not isinstance(self.data, (tuple, list)): 189 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 190 type(self.data)) 191 self.payload = None 192 elif len(data) != 2: 193 self.fail_msg = ("RPC layer error: invalid result length (%d), " 194 "expected 2" % len(self.data)) 195 self.payload = None 196 elif not self.data[0]: 197 self.fail_msg = self._EnsureErr(self.data[1]) 198 self.payload = None 199 else: 200 # finally success 201 self.fail_msg = None 202 self.payload = data[1] 203 204 for attr_name in ["call", "data", "fail_msg", 205 "node", "offline", "payload"]: 206 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207 208 @staticmethod
209 - def _EnsureErr(val):
210 """Helper to ensure we return a 'True' value for error.""" 211 if val: 212 return val 213 else: 214 return "No error information"
215
216 - def Raise(self, msg, prereq=False, ecode=None):
217 """If the result has failed, raise an OpExecError. 218 219 This is used so that LU code doesn't have to check for each 220 result, but instead can call this function. 221 222 """ 223 if not self.fail_msg: 224 return 225 226 if not msg: # one could pass None for default message 227 msg = ("Call '%s' to node '%s' has failed: %s" % 228 (self.call, self.node, self.fail_msg)) 229 else: 230 msg = "%s: %s" % (msg, self.fail_msg) 231 if prereq: 232 ec = errors.OpPrereqError 233 else: 234 ec = errors.OpExecError 235 if ecode is not None: 236 args = (msg, ecode) 237 else: 238 args = (msg, ) 239 raise ec(*args) # pylint: disable=W0142
240
241 242 -def _SsconfResolver(ssconf_ips, node_list, _, 243 ssc=ssconf.SimpleStore, 244 nslookup_fn=netutils.Hostname.GetIP):
245 """Return addresses for given node names. 246 247 @type ssconf_ips: bool 248 @param ssconf_ips: Use the ssconf IPs 249 @type node_list: list 250 @param node_list: List of node names 251 @type ssc: class 252 @param ssc: SimpleStore class that is used to obtain node->ip mappings 253 @type nslookup_fn: callable 254 @param nslookup_fn: function use to do NS lookup 255 @rtype: list of tuple; (string, string) 256 @return: List of tuples containing node name and IP address 257 258 """ 259 ss = ssc() 260 family = ss.GetPrimaryIPFamily() 261 262 if ssconf_ips: 263 iplist = ss.GetNodePrimaryIPList() 264 ipmap = dict(entry.split() for entry in iplist) 265 else: 266 ipmap = {} 267 268 result = [] 269 for node in node_list: 270 ip = ipmap.get(node) 271 if ip is None: 272 ip = nslookup_fn(node, family=family) 273 result.append((node, ip)) 274 275 return result
276
277 278 -class _StaticResolver:
279 - def __init__(self, addresses):
280 """Initializes this class. 281 282 """ 283 self._addresses = addresses
284
285 - def __call__(self, hosts, _):
286 """Returns static addresses for hosts. 287 288 """ 289 assert len(hosts) == len(self._addresses) 290 return zip(hosts, self._addresses)
291
292 293 -def _CheckConfigNode(name, node, accept_offline_node):
294 """Checks if a node is online. 295 296 @type name: string 297 @param name: Node name 298 @type node: L{objects.Node} or None 299 @param node: Node object 300 301 """ 302 if node is None: 303 # Depend on DNS for name resolution 304 ip = name 305 elif node.offline and not accept_offline_node: 306 ip = _OFFLINE 307 else: 308 ip = node.primary_ip 309 return (name, ip)
310
311 312 -def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
313 """Calculate node addresses using configuration. 314 315 """ 316 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE) 317 318 assert accept_offline_node or opts is None, "Unknown option" 319 320 # Special case for single-host lookups 321 if len(hosts) == 1: 322 (name, ) = hosts 323 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)] 324 else: 325 all_nodes = all_nodes_fn() 326 return [_CheckConfigNode(name, all_nodes.get(name, None), 327 accept_offline_node) 328 for name in hosts]
329
330 331 -class _RpcProcessor:
332 - def __init__(self, resolver, port, lock_monitor_cb=None):
333 """Initializes this class. 334 335 @param resolver: callable accepting a list of hostnames, returning a list 336 of tuples containing name and IP address (IP address can be the name or 337 the special value L{_OFFLINE} to mark offline machines) 338 @type port: int 339 @param port: TCP port 340 @param lock_monitor_cb: Callable for registering with lock monitor 341 342 """ 343 self._resolver = resolver 344 self._port = port 345 self._lock_monitor_cb = lock_monitor_cb
346 347 @staticmethod
348 - def _PrepareRequests(hosts, port, procedure, body, read_timeout):
349 """Prepares requests by sorting offline hosts into separate list. 350 351 @type body: dict 352 @param body: a dictionary with per-host body data 353 354 """ 355 results = {} 356 requests = {} 357 358 assert isinstance(body, dict) 359 assert len(body) == len(hosts) 360 assert compat.all(isinstance(v, str) for v in body.values()) 361 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \ 362 "%s != %s" % (hosts, body.keys()) 363 364 for (name, ip) in hosts: 365 if ip is _OFFLINE: 366 # Node is marked as offline 367 results[name] = RpcResult(node=name, offline=True, call=procedure) 368 else: 369 requests[name] = \ 370 http.client.HttpClientRequest(str(ip), port, 371 http.HTTP_POST, str("/%s" % procedure), 372 headers=_RPC_CLIENT_HEADERS, 373 post_data=body[name], 374 read_timeout=read_timeout, 375 nicename="%s/%s" % (name, procedure), 376 curl_config_fn=_ConfigRpcCurl) 377 378 return (results, requests)
379 380 @staticmethod
381 - def _CombineResults(results, requests, procedure):
382 """Combines pre-computed results for offline hosts with actual call results. 383 384 """ 385 for name, req in requests.items(): 386 if req.success and req.resp_status_code == http.HTTP_OK: 387 host_result = RpcResult(data=serializer.LoadJson(req.resp_body), 388 node=name, call=procedure) 389 else: 390 # TODO: Better error reporting 391 if req.error: 392 msg = req.error 393 else: 394 msg = req.resp_body 395 396 logging.error("RPC error in %s on node %s: %s", procedure, name, msg) 397 host_result = RpcResult(data=msg, failed=True, node=name, 398 call=procedure) 399 400 results[name] = host_result 401 402 return results
403
404 - def __call__(self, hosts, procedure, body, read_timeout, resolver_opts, 405 _req_process_fn=None):
406 """Makes an RPC request to a number of nodes. 407 408 @type hosts: sequence 409 @param hosts: Hostnames 410 @type procedure: string 411 @param procedure: Request path 412 @type body: dictionary 413 @param body: dictionary with request bodies per host 414 @type read_timeout: int or None 415 @param read_timeout: Read timeout for request 416 417 """ 418 assert read_timeout is not None, \ 419 "Missing RPC read timeout for procedure '%s'" % procedure 420 421 if _req_process_fn is None: 422 _req_process_fn = http.client.ProcessRequests 423 424 (results, requests) = \ 425 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port, 426 procedure, body, read_timeout) 427 428 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) 429 430 assert not frozenset(results).intersection(requests) 431 432 return self._CombineResults(results, requests, procedure)
433
434 435 -class _RpcClientBase:
436 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None, 437 _req_process_fn=None):
438 """Initializes this class. 439 440 """ 441 proc = _RpcProcessor(resolver, 442 netutils.GetDaemonPort(constants.NODED), 443 lock_monitor_cb=lock_monitor_cb) 444 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn) 445 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
446 447 @staticmethod
448 - def _EncodeArg(encoder_fn, (argkind, value)):
449 """Encode argument. 450 451 """ 452 if argkind is None: 453 return value 454 else: 455 return encoder_fn(argkind)(value)
456
457 - def _Call(self, cdef, node_list, args):
458 """Entry point for automatically generated RPC wrappers. 459 460 """ 461 (procedure, _, resolver_opts, timeout, argdefs, 462 prep_fn, postproc_fn, _) = cdef 463 464 if callable(timeout): 465 read_timeout = timeout(args) 466 else: 467 read_timeout = timeout 468 469 if callable(resolver_opts): 470 req_resolver_opts = resolver_opts(args) 471 else: 472 req_resolver_opts = resolver_opts 473 474 if len(args) != len(argdefs): 475 raise errors.ProgrammerError("Number of passed arguments doesn't match") 476 477 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args)) 478 if prep_fn is None: 479 # for a no-op prep_fn, we serialise the body once, and then we 480 # reuse it in the dictionary values 481 body = serializer.DumpJson(enc_args) 482 pnbody = dict((n, body) for n in node_list) 483 else: 484 # for a custom prep_fn, we pass the encoded arguments and the 485 # node name to the prep_fn, and we serialise its return value 486 assert callable(prep_fn) 487 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args))) 488 for n in node_list) 489 490 result = self._proc(node_list, procedure, pnbody, read_timeout, 491 req_resolver_opts) 492 493 if postproc_fn: 494 return dict(map(lambda (key, value): (key, postproc_fn(value)), 495 result.items())) 496 else: 497 return result
498
499 500 -def _ObjectToDict(value):
501 """Converts an object to a dictionary. 502 503 @note: See L{objects}. 504 505 """ 506 return value.ToDict()
507
508 509 -def _ObjectListToDict(value):
510 """Converts a list of L{objects} to dictionaries. 511 512 """ 513 return map(_ObjectToDict, value)
514
515 516 -def _EncodeNodeToDiskDict(value):
517 """Encodes a dictionary with node name as key and disk objects as values. 518 519 """ 520 return dict((name, _ObjectListToDict(disks)) 521 for name, disks in value.items())
522
523 524 -def _PrepareFileUpload(getents_fn, filename):
525 """Loads a file and prepares it for an upload to nodes. 526 527 """ 528 statcb = utils.FileStatHelper() 529 data = _Compress(utils.ReadFile(filename, preread=statcb)) 530 st = statcb.st 531 532 if getents_fn is None: 533 getents_fn = runtime.GetEnts 534 535 getents = getents_fn() 536 537 return [filename, data, st.st_mode, getents.LookupUid(st.st_uid), 538 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
539
540 541 -def _PrepareFinalizeExportDisks(snap_disks):
542 """Encodes disks for finalizing export. 543 544 """ 545 flat_disks = [] 546 547 for disk in snap_disks: 548 if isinstance(disk, bool): 549 flat_disks.append(disk) 550 else: 551 flat_disks.append(disk.ToDict()) 552 553 return flat_disks
554
555 556 -def _EncodeImportExportIO((ieio, ieioargs)):
557 """Encodes import/export I/O information. 558 559 """ 560 if ieio == constants.IEIO_RAW_DISK: 561 assert len(ieioargs) == 1 562 return (ieio, (ieioargs[0].ToDict(), )) 563 564 if ieio == constants.IEIO_SCRIPT: 565 assert len(ieioargs) == 2 566 return (ieio, (ieioargs[0].ToDict(), ieioargs[1])) 567 568 return (ieio, ieioargs)
569
570 571 -def _EncodeBlockdevRename(value):
572 """Encodes information for renaming block devices. 573 574 """ 575 return [(d.ToDict(), uid) for d, uid in value]
576
577 578 -def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
579 """Annotates just DRBD disks layouts. 580 581 """ 582 assert disk.dev_type == constants.LD_DRBD8 583 584 disk.params = objects.FillDict(drbd_params, disk.params) 585 (dev_data, dev_meta) = disk.children 586 dev_data.params = objects.FillDict(data_params, dev_data.params) 587 dev_meta.params = objects.FillDict(meta_params, dev_meta.params) 588 589 return disk
590
591 592 -def _AnnotateDParamsGeneric(disk, (params, )):
593 """Generic disk parameter annotation routine. 594 595 """ 596 assert disk.dev_type != constants.LD_DRBD8 597 598 disk.params = objects.FillDict(params, disk.params) 599 600 return disk
601
602 603 -def AnnotateDiskParams(template, disks, disk_params):
604 """Annotates the disk objects with the disk parameters. 605 606 @param template: The disk template used 607 @param disks: The list of disks objects to annotate 608 @param disk_params: The disk paramaters for annotation 609 @returns: A list of disk objects annotated 610 611 """ 612 ld_params = objects.Disk.ComputeLDParams(template, disk_params) 613 614 if template == constants.DT_DRBD8: 615 annotation_fn = _AnnotateDParamsDRBD 616 elif template == constants.DT_DISKLESS: 617 annotation_fn = lambda disk, _: disk 618 else: 619 annotation_fn = _AnnotateDParamsGeneric 620 621 new_disks = [] 622 for disk in disks: 623 new_disks.append(annotation_fn(disk.Copy(), ld_params)) 624 625 return new_disks
626 627 628 #: Generic encoders 629 _ENCODERS = { 630 rpc_defs.ED_OBJECT_DICT: _ObjectToDict, 631 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, 632 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict, 633 rpc_defs.ED_COMPRESS: _Compress, 634 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, 635 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO, 636 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, 637 }
638 639 640 -class RpcRunner(_RpcClientBase, 641 _generated_rpc.RpcClientDefault, 642 _generated_rpc.RpcClientBootstrap, 643 _generated_rpc.RpcClientDnsOnly, 644 _generated_rpc.RpcClientConfig):
645 """RPC runner class. 646 647 """
648 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
649 """Initialized the RPC runner. 650 651 @type cfg: L{config.ConfigWriter} 652 @param cfg: Configuration 653 @type lock_monitor_cb: callable 654 @param lock_monitor_cb: Lock monitor callback 655 656 """ 657 self._cfg = cfg 658 659 encoders = _ENCODERS.copy() 660 661 encoders.update({ 662 # Encoders requiring configuration object 663 rpc_defs.ED_INST_DICT: self._InstDict, 664 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp, 665 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp, 666 667 # Encoders annotating disk parameters 668 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP, 669 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP, 670 671 # Encoders with special requirements 672 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 673 }) 674 675 # Resolver using configuration 676 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo, 677 cfg.GetAllNodesInfo) 678 679 # Pylint doesn't recognize multiple inheritance properly, see 680 # <http://www.logilab.org/ticket/36586> and 681 # <http://www.logilab.org/ticket/35642> 682 # pylint: disable=W0233 683 _RpcClientBase.__init__(self, resolver, encoders.get, 684 lock_monitor_cb=lock_monitor_cb, 685 _req_process_fn=_req_process_fn) 686 _generated_rpc.RpcClientConfig.__init__(self) 687 _generated_rpc.RpcClientBootstrap.__init__(self) 688 _generated_rpc.RpcClientDnsOnly.__init__(self) 689 _generated_rpc.RpcClientDefault.__init__(self)
690
691 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
692 """Convert the given instance to a dict. 693 694 This is done via the instance's ToDict() method and additionally 695 we fill the hvparams with the cluster defaults. 696 697 @type instance: L{objects.Instance} 698 @param instance: an Instance object 699 @type hvp: dict or None 700 @param hvp: a dictionary with overridden hypervisor parameters 701 @type bep: dict or None 702 @param bep: a dictionary with overridden backend parameters 703 @type osp: dict or None 704 @param osp: a dictionary with overridden os parameters 705 @rtype: dict 706 @return: the instance dict, with the hvparams filled with the 707 cluster defaults 708 709 """ 710 idict = instance.ToDict() 711 cluster = self._cfg.GetClusterInfo() 712 idict["hvparams"] = cluster.FillHV(instance) 713 if hvp is not None: 714 idict["hvparams"].update(hvp) 715 idict["beparams"] = cluster.FillBE(instance) 716 if bep is not None: 717 idict["beparams"].update(bep) 718 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 719 if osp is not None: 720 idict["osparams"].update(osp) 721 for nic in idict["nics"]: 722 nic["nicparams"] = objects.FillDict( 723 cluster.nicparams[constants.PP_DEFAULT], 724 nic["nicparams"]) 725 idict["disks"] = self._DisksDictDP((instance.disks, instance)) 726 return idict
727
728 - def _InstDictHvpBepDp(self, (instance, hvp, bep)):
729 """Wrapper for L{_InstDict}. 730 731 """ 732 return self._InstDict(instance, hvp=hvp, bep=bep)
733
734 - def _InstDictOspDp(self, (instance, osparams)):
735 """Wrapper for L{_InstDict}. 736 737 """ 738 return self._InstDict(instance, osp=osparams)
739
740 - def _DisksDictDP(self, (disks, instance)):
741 """Wrapper for L{AnnotateDiskParams}. 742 743 """ 744 diskparams = self._cfg.GetInstanceDiskParams(instance) 745 return [disk.ToDict() 746 for disk in AnnotateDiskParams(instance.disk_template, 747 disks, diskparams)]
748
749 - def _SingleDiskDictDP(self, (disk, instance)):
750 """Wrapper for L{AnnotateDiskParams}. 751 752 """ 753 (anno_disk,) = self._DisksDictDP(([disk], instance)) 754 return anno_disk
755
756 757 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
758 """RPC wrappers for job queue. 759 760 """
761 - def __init__(self, context, address_list):
762 """Initializes this class. 763 764 """ 765 if address_list is None: 766 resolver = compat.partial(_SsconfResolver, True) 767 else: 768 # Caller provided an address list 769 resolver = _StaticResolver(address_list) 770 771 _RpcClientBase.__init__(self, resolver, _ENCODERS.get, 772 lock_monitor_cb=context.glm.AddToLockMonitor) 773 _generated_rpc.RpcClientJobQueue.__init__(self)
774
775 776 -class BootstrapRunner(_RpcClientBase, 777 _generated_rpc.RpcClientBootstrap, 778 _generated_rpc.RpcClientDnsOnly):
779 """RPC wrappers for bootstrapping. 780 781 """
782 - def __init__(self):
783 """Initializes this class. 784 785 """ 786 # Pylint doesn't recognize multiple inheritance properly, see 787 # <http://www.logilab.org/ticket/36586> and 788 # <http://www.logilab.org/ticket/35642> 789 # pylint: disable=W0233 790 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True), 791 _ENCODERS.get) 792 _generated_rpc.RpcClientBootstrap.__init__(self) 793 _generated_rpc.RpcClientDnsOnly.__init__(self)
794
795 796 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
797 """RPC wrappers for calls using only DNS. 798 799 """
800 - def __init__(self):
801 """Initialize this class. 802 803 """ 804 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False), 805 _ENCODERS.get) 806 _generated_rpc.RpcClientDnsOnly.__init__(self)
807
808 809 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
810 """RPC wrappers for L{config}. 811 812 """
813 - def __init__(self, context, address_list, _req_process_fn=None, 814 _getents=None):
815 """Initializes this class. 816 817 """ 818 if context: 819 lock_monitor_cb = context.glm.AddToLockMonitor 820 else: 821 lock_monitor_cb = None 822 823 if address_list is None: 824 resolver = compat.partial(_SsconfResolver, True) 825 else: 826 # Caller provided an address list 827 resolver = _StaticResolver(address_list) 828 829 encoders = _ENCODERS.copy() 830 831 encoders.update({ 832 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 833 }) 834 835 _RpcClientBase.__init__(self, resolver, encoders.get, 836 lock_monitor_cb=lock_monitor_cb, 837 _req_process_fn=_req_process_fn) 838 _generated_rpc.RpcClientConfig.__init__(self)
839