Package ganeti :: Module rpc
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Inter-node RPC library. 
 23   
 24  """ 
 25   
 26  # pylint: disable=C0103,R0201,R0904 
 27  # C0103: Invalid name, since call_ are not valid 
 28  # R0201: Method could be a function, we keep all rpcs instance methods 
 29  # as not to change them back and forth between static/instance methods 
 30  # if they need to start using instance attributes 
 31  # R0904: Too many public methods 
 32   
 33  import logging 
 34  import zlib 
 35  import base64 
 36  import pycurl 
 37  import threading 
 38  import copy 
 39   
 40  from ganeti import utils 
 41  from ganeti import objects 
 42  from ganeti import http 
 43  from ganeti import serializer 
 44  from ganeti import constants 
 45  from ganeti import errors 
 46  from ganeti import netutils 
 47  from ganeti import ssconf 
 48  from ganeti import runtime 
 49  from ganeti import compat 
 50  from ganeti import rpc_defs 
 51  from ganeti import pathutils 
 52  from ganeti import vcluster 
 53   
 54  # Special module generated at build time 
 55  from ganeti import _generated_rpc 
 56   
 57  # pylint has a bug here, doesn't see this import 
 58  import ganeti.http.client  # pylint: disable=W0611 
 59   
 60   
 61  _RPC_CLIENT_HEADERS = [ 
 62    "Content-type: %s" % http.HTTP_APP_JSON, 
 63    "Expect:", 
 64    ] 
 65   
 66  #: Special value to describe an offline host 
 67  _OFFLINE = object() 
68 69 70 -def Init():
71 """Initializes the module-global HTTP client manager. 72 73 Must be called before using any RPC function and while exactly one thread is 74 running. 75 76 """ 77 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 78 # one thread running. This check is just a safety measure -- it doesn't 79 # cover all cases. 80 assert threading.activeCount() == 1, \ 81 "Found more than one active thread when initializing pycURL" 82 83 logging.info("Using PycURL %s", pycurl.version) 84 85 pycurl.global_init(pycurl.GLOBAL_ALL)
86
87 88 -def Shutdown():
89 """Stops the module-global HTTP client manager. 90 91 Must be called before quitting the program and while exactly one thread is 92 running. 93 94 """ 95 pycurl.global_cleanup()
96
97 98 -def _ConfigRpcCurl(curl):
99 noded_cert = str(pathutils.NODED_CERT_FILE) 100 101 curl.setopt(pycurl.FOLLOWLOCATION, False) 102 curl.setopt(pycurl.CAINFO, noded_cert) 103 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 104 curl.setopt(pycurl.SSL_VERIFYPEER, True) 105 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 106 curl.setopt(pycurl.SSLCERT, noded_cert) 107 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 108 curl.setopt(pycurl.SSLKEY, noded_cert) 109 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110
111 112 -def RunWithRPC(fn):
113 """RPC-wrapper decorator. 114 115 When applied to a function, it runs it with the RPC system 116 initialized, and it shutsdown the system afterwards. This means the 117 function must be called without RPC being initialized. 118 119 """ 120 def wrapper(*args, **kwargs): 121 Init() 122 try: 123 return fn(*args, **kwargs) 124 finally: 125 Shutdown()
126 return wrapper 127
128 129 -def _Compress(data):
130 """Compresses a string for transport over RPC. 131 132 Small amounts of data are not compressed. 133 134 @type data: str 135 @param data: Data 136 @rtype: tuple 137 @return: Encoded data to send 138 139 """ 140 # Small amounts of data are not compressed 141 if len(data) < 512: 142 return (constants.RPC_ENCODING_NONE, data) 143 144 # Compress with zlib and encode in base64 145 return (constants.RPC_ENCODING_ZLIB_BASE64, 146 base64.b64encode(zlib.compress(data, 3)))
147
148 149 -class RpcResult(object):
150 """RPC Result class. 151 152 This class holds an RPC result. It is needed since in multi-node 153 calls we can't raise an exception just because one out of many 154 failed, and therefore we use this class to encapsulate the result. 155 156 @ivar data: the data payload, for successful results, or None 157 @ivar call: the name of the RPC call 158 @ivar node: the name of the node to which we made the call 159 @ivar offline: whether the operation failed because the node was 160 offline, as opposed to actual failure; offline=True will always 161 imply failed=True, in order to allow simpler checking if 162 the user doesn't care about the exact failure mode 163 @ivar fail_msg: the error message if the call failed 164 165 """
166 - def __init__(self, data=None, failed=False, offline=False, 167 call=None, node=None):
168 self.offline = offline 169 self.call = call 170 self.node = node 171 172 if offline: 173 self.fail_msg = "Node is marked offline" 174 self.data = self.payload = None 175 elif failed: 176 self.fail_msg = self._EnsureErr(data) 177 self.data = self.payload = None 178 else: 179 self.data = data 180 if not isinstance(self.data, (tuple, list)): 181 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 182 type(self.data)) 183 self.payload = None 184 elif len(data) != 2: 185 self.fail_msg = ("RPC layer error: invalid result length (%d), " 186 "expected 2" % len(self.data)) 187 self.payload = None 188 elif not self.data[0]: 189 self.fail_msg = self._EnsureErr(self.data[1]) 190 self.payload = None 191 else: 192 # finally success 193 self.fail_msg = None 194 self.payload = data[1] 195 196 for attr_name in ["call", "data", "fail_msg", 197 "node", "offline", "payload"]: 198 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199 200 @staticmethod
201 - def _EnsureErr(val):
202 """Helper to ensure we return a 'True' value for error.""" 203 if val: 204 return val 205 else: 206 return "No error information"
207
208 - def Raise(self, msg, prereq=False, ecode=None):
209 """If the result has failed, raise an OpExecError. 210 211 This is used so that LU code doesn't have to check for each 212 result, but instead can call this function. 213 214 """ 215 if not self.fail_msg: 216 return 217 218 if not msg: # one could pass None for default message 219 msg = ("Call '%s' to node '%s' has failed: %s" % 220 (self.call, self.node, self.fail_msg)) 221 else: 222 msg = "%s: %s" % (msg, self.fail_msg) 223 if prereq: 224 ec = errors.OpPrereqError 225 else: 226 ec = errors.OpExecError 227 if ecode is not None: 228 args = (msg, ecode) 229 else: 230 args = (msg, ) 231 raise ec(*args) # pylint: disable=W0142
232
233 234 -def _SsconfResolver(ssconf_ips, node_list, _, 235 ssc=ssconf.SimpleStore, 236 nslookup_fn=netutils.Hostname.GetIP):
237 """Return addresses for given node names. 238 239 @type ssconf_ips: bool 240 @param ssconf_ips: Use the ssconf IPs 241 @type node_list: list 242 @param node_list: List of node names 243 @type ssc: class 244 @param ssc: SimpleStore class that is used to obtain node->ip mappings 245 @type nslookup_fn: callable 246 @param nslookup_fn: function use to do NS lookup 247 @rtype: list of tuple; (string, string) 248 @return: List of tuples containing node name and IP address 249 250 """ 251 ss = ssc() 252 family = ss.GetPrimaryIPFamily() 253 254 if ssconf_ips: 255 iplist = ss.GetNodePrimaryIPList() 256 ipmap = dict(entry.split() for entry in iplist) 257 else: 258 ipmap = {} 259 260 result = [] 261 for node in node_list: 262 ip = ipmap.get(node) 263 if ip is None: 264 ip = nslookup_fn(node, family=family) 265 result.append((node, ip)) 266 267 return result
268
269 270 -class _StaticResolver:
271 - def __init__(self, addresses):
272 """Initializes this class. 273 274 """ 275 self._addresses = addresses
276
277 - def __call__(self, hosts, _):
278 """Returns static addresses for hosts. 279 280 """ 281 assert len(hosts) == len(self._addresses) 282 return zip(hosts, self._addresses)
283
284 285 -def _CheckConfigNode(name, node, accept_offline_node):
286 """Checks if a node is online. 287 288 @type name: string 289 @param name: Node name 290 @type node: L{objects.Node} or None 291 @param node: Node object 292 293 """ 294 if node is None: 295 # Depend on DNS for name resolution 296 ip = name 297 elif node.offline and not accept_offline_node: 298 ip = _OFFLINE 299 else: 300 ip = node.primary_ip 301 return (name, ip)
302
303 304 -def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
305 """Calculate node addresses using configuration. 306 307 """ 308 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE) 309 310 assert accept_offline_node or opts is None, "Unknown option" 311 312 # Special case for single-host lookups 313 if len(hosts) == 1: 314 (name, ) = hosts 315 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)] 316 else: 317 all_nodes = all_nodes_fn() 318 return [_CheckConfigNode(name, all_nodes.get(name, None), 319 accept_offline_node) 320 for name in hosts]
321
322 323 -class _RpcProcessor:
324 - def __init__(self, resolver, port, lock_monitor_cb=None):
325 """Initializes this class. 326 327 @param resolver: callable accepting a list of hostnames, returning a list 328 of tuples containing name and IP address (IP address can be the name or 329 the special value L{_OFFLINE} to mark offline machines) 330 @type port: int 331 @param port: TCP port 332 @param lock_monitor_cb: Callable for registering with lock monitor 333 334 """ 335 self._resolver = resolver 336 self._port = port 337 self._lock_monitor_cb = lock_monitor_cb
338 339 @staticmethod
340 - def _PrepareRequests(hosts, port, procedure, body, read_timeout):
341 """Prepares requests by sorting offline hosts into separate list. 342 343 @type body: dict 344 @param body: a dictionary with per-host body data 345 346 """ 347 results = {} 348 requests = {} 349 350 assert isinstance(body, dict) 351 assert len(body) == len(hosts) 352 assert compat.all(isinstance(v, str) for v in body.values()) 353 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \ 354 "%s != %s" % (hosts, body.keys()) 355 356 for (name, ip) in hosts: 357 if ip is _OFFLINE: 358 # Node is marked as offline 359 results[name] = RpcResult(node=name, offline=True, call=procedure) 360 else: 361 requests[name] = \ 362 http.client.HttpClientRequest(str(ip), port, 363 http.HTTP_POST, str("/%s" % procedure), 364 headers=_RPC_CLIENT_HEADERS, 365 post_data=body[name], 366 read_timeout=read_timeout, 367 nicename="%s/%s" % (name, procedure), 368 curl_config_fn=_ConfigRpcCurl) 369 370 return (results, requests)
371 372 @staticmethod
373 - def _CombineResults(results, requests, procedure):
374 """Combines pre-computed results for offline hosts with actual call results. 375 376 """ 377 for name, req in requests.items(): 378 if req.success and req.resp_status_code == http.HTTP_OK: 379 host_result = RpcResult(data=serializer.LoadJson(req.resp_body), 380 node=name, call=procedure) 381 else: 382 # TODO: Better error reporting 383 if req.error: 384 msg = req.error 385 else: 386 msg = req.resp_body 387 388 logging.error("RPC error in %s on node %s: %s", procedure, name, msg) 389 host_result = RpcResult(data=msg, failed=True, node=name, 390 call=procedure) 391 392 results[name] = host_result 393 394 return results
395
396 - def __call__(self, hosts, procedure, body, read_timeout, resolver_opts, 397 _req_process_fn=None):
398 """Makes an RPC request to a number of nodes. 399 400 @type hosts: sequence 401 @param hosts: Hostnames 402 @type procedure: string 403 @param procedure: Request path 404 @type body: dictionary 405 @param body: dictionary with request bodies per host 406 @type read_timeout: int or None 407 @param read_timeout: Read timeout for request 408 @rtype: dictionary 409 @return: a dictionary mapping host names to rpc.RpcResult objects 410 411 """ 412 assert read_timeout is not None, \ 413 "Missing RPC read timeout for procedure '%s'" % procedure 414 415 if _req_process_fn is None: 416 _req_process_fn = http.client.ProcessRequests 417 418 (results, requests) = \ 419 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port, 420 procedure, body, read_timeout) 421 422 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) 423 424 assert not frozenset(results).intersection(requests) 425 426 return self._CombineResults(results, requests, procedure)
427
428 429 -class _RpcClientBase:
430 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None, 431 _req_process_fn=None):
432 """Initializes this class. 433 434 """ 435 proc = _RpcProcessor(resolver, 436 netutils.GetDaemonPort(constants.NODED), 437 lock_monitor_cb=lock_monitor_cb) 438 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn) 439 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
440 441 @staticmethod
442 - def _EncodeArg(encoder_fn, (argkind, value)):
443 """Encode argument. 444 445 """ 446 if argkind is None: 447 return value 448 else: 449 return encoder_fn(argkind)(value)
450
451 - def _Call(self, cdef, node_list, args):
452 """Entry point for automatically generated RPC wrappers. 453 454 """ 455 (procedure, _, resolver_opts, timeout, argdefs, 456 prep_fn, postproc_fn, _) = cdef 457 458 if callable(timeout): 459 read_timeout = timeout(args) 460 else: 461 read_timeout = timeout 462 463 if callable(resolver_opts): 464 req_resolver_opts = resolver_opts(args) 465 else: 466 req_resolver_opts = resolver_opts 467 468 if len(args) != len(argdefs): 469 raise errors.ProgrammerError("Number of passed arguments doesn't match") 470 471 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args)) 472 if prep_fn is None: 473 # for a no-op prep_fn, we serialise the body once, and then we 474 # reuse it in the dictionary values 475 body = serializer.DumpJson(enc_args) 476 pnbody = dict((n, body) for n in node_list) 477 else: 478 # for a custom prep_fn, we pass the encoded arguments and the 479 # node name to the prep_fn, and we serialise its return value 480 assert callable(prep_fn) 481 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args))) 482 for n in node_list) 483 484 result = self._proc(node_list, procedure, pnbody, read_timeout, 485 req_resolver_opts) 486 487 if postproc_fn: 488 return dict(map(lambda (key, value): (key, postproc_fn(value)), 489 result.items())) 490 else: 491 return result
492
493 494 -def _ObjectToDict(value):
495 """Converts an object to a dictionary. 496 497 @note: See L{objects}. 498 499 """ 500 return value.ToDict()
501
502 503 -def _ObjectListToDict(value):
504 """Converts a list of L{objects} to dictionaries. 505 506 """ 507 return map(_ObjectToDict, value)
508
509 510 -def _EncodeNodeToDiskDict(value):
511 """Encodes a dictionary with node name as key and disk objects as values. 512 513 """ 514 return dict((name, _ObjectListToDict(disks)) 515 for name, disks in value.items())
516
517 518 -def _PrepareFileUpload(getents_fn, filename):
519 """Loads a file and prepares it for an upload to nodes. 520 521 """ 522 statcb = utils.FileStatHelper() 523 data = _Compress(utils.ReadFile(filename, preread=statcb)) 524 st = statcb.st 525 526 if getents_fn is None: 527 getents_fn = runtime.GetEnts 528 529 getents = getents_fn() 530 531 virt_filename = vcluster.MakeVirtualPath(filename) 532 533 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid), 534 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
535
536 537 -def _PrepareFinalizeExportDisks(snap_disks):
538 """Encodes disks for finalizing export. 539 540 """ 541 flat_disks = [] 542 543 for disk in snap_disks: 544 if isinstance(disk, bool): 545 flat_disks.append(disk) 546 else: 547 flat_disks.append(disk.ToDict()) 548 549 return flat_disks
550
551 552 -def _EncodeImportExportIO((ieio, ieioargs)):
553 """Encodes import/export I/O information. 554 555 """ 556 if ieio == constants.IEIO_RAW_DISK: 557 assert len(ieioargs) == 1 558 return (ieio, (ieioargs[0].ToDict(), )) 559 560 if ieio == constants.IEIO_SCRIPT: 561 assert len(ieioargs) == 2 562 return (ieio, (ieioargs[0].ToDict(), ieioargs[1])) 563 564 return (ieio, ieioargs)
565
566 567 -def _EncodeBlockdevRename(value):
568 """Encodes information for renaming block devices. 569 570 """ 571 return [(d.ToDict(), uid) for d, uid in value]
572
573 574 -def MakeLegacyNodeInfo(data, require_vg_info=True):
575 """Formats the data returned by L{rpc.RpcRunner.call_node_info}. 576 577 Converts the data into a single dictionary. This is fine for most use cases, 578 but some require information from more than one volume group or hypervisor. 579 580 @param require_vg_info: raise an error if the returnd vg_info 581 doesn't have any values 582 583 """ 584 (bootid, vgs_info, (hv_info, )) = data 585 586 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid}) 587 588 if require_vg_info or vgs_info: 589 (vg0_info, ) = vgs_info 590 ret = utils.JoinDisjointDicts(vg0_info, ret) 591 592 return ret
593
594 595 -def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
596 """Annotates just DRBD disks layouts. 597 598 """ 599 assert disk.dev_type == constants.LD_DRBD8 600 601 disk.params = objects.FillDict(drbd_params, disk.params) 602 (dev_data, dev_meta) = disk.children 603 dev_data.params = objects.FillDict(data_params, dev_data.params) 604 dev_meta.params = objects.FillDict(meta_params, dev_meta.params) 605 606 return disk
607
608 609 -def _AnnotateDParamsGeneric(disk, (params, )):
610 """Generic disk parameter annotation routine. 611 612 """ 613 assert disk.dev_type != constants.LD_DRBD8 614 615 disk.params = objects.FillDict(params, disk.params) 616 617 return disk
618
619 620 -def AnnotateDiskParams(template, disks, disk_params):
621 """Annotates the disk objects with the disk parameters. 622 623 @param template: The disk template used 624 @param disks: The list of disks objects to annotate 625 @param disk_params: The disk paramaters for annotation 626 @returns: A list of disk objects annotated 627 628 """ 629 ld_params = objects.Disk.ComputeLDParams(template, disk_params) 630 631 if template == constants.DT_DRBD8: 632 annotation_fn = _AnnotateDParamsDRBD 633 elif template == constants.DT_DISKLESS: 634 annotation_fn = lambda disk, _: disk 635 else: 636 annotation_fn = _AnnotateDParamsGeneric 637 638 return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
639
640 641 -def _GetESFlag(cfg, nodename):
642 ni = cfg.GetNodeInfo(nodename) 643 if ni is None: 644 raise errors.OpPrereqError("Invalid node name %s" % nodename, 645 errors.ECODE_NOENT) 646 return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
647
648 649 -def GetExclusiveStorageForNodeNames(cfg, nodelist):
650 """Return the exclusive storage flag for all the given nodes. 651 652 @type cfg: L{config.ConfigWriter} 653 @param cfg: cluster configuration 654 @type nodelist: list or tuple 655 @param nodelist: node names for which to read the flag 656 @rtype: dict 657 @return: mapping from node names to exclusive storage flags 658 @raise errors.OpPrereqError: if any given node name has no corresponding node 659 660 """ 661 getflag = lambda n: _GetESFlag(cfg, n) 662 flags = map(getflag, nodelist) 663 return dict(zip(nodelist, flags))
664 665 666 #: Generic encoders 667 _ENCODERS = { 668 rpc_defs.ED_OBJECT_DICT: _ObjectToDict, 669 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, 670 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict, 671 rpc_defs.ED_COMPRESS: _Compress, 672 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, 673 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO, 674 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, 675 }
676 677 678 -class RpcRunner(_RpcClientBase, 679 _generated_rpc.RpcClientDefault, 680 _generated_rpc.RpcClientBootstrap, 681 _generated_rpc.RpcClientDnsOnly, 682 _generated_rpc.RpcClientConfig):
683 """RPC runner class. 684 685 """
686 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
687 """Initialized the RPC runner. 688 689 @type cfg: L{config.ConfigWriter} 690 @param cfg: Configuration 691 @type lock_monitor_cb: callable 692 @param lock_monitor_cb: Lock monitor callback 693 694 """ 695 self._cfg = cfg 696 697 encoders = _ENCODERS.copy() 698 699 encoders.update({ 700 # Encoders requiring configuration object 701 rpc_defs.ED_INST_DICT: self._InstDict, 702 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp, 703 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp, 704 rpc_defs.ED_NIC_DICT: self._NicDict, 705 706 # Encoders annotating disk parameters 707 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP, 708 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP, 709 710 # Encoders with special requirements 711 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 712 }) 713 714 # Resolver using configuration 715 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo, 716 cfg.GetAllNodesInfo) 717 718 # Pylint doesn't recognize multiple inheritance properly, see 719 # <http://www.logilab.org/ticket/36586> and 720 # <http://www.logilab.org/ticket/35642> 721 # pylint: disable=W0233 722 _RpcClientBase.__init__(self, resolver, encoders.get, 723 lock_monitor_cb=lock_monitor_cb, 724 _req_process_fn=_req_process_fn) 725 _generated_rpc.RpcClientConfig.__init__(self) 726 _generated_rpc.RpcClientBootstrap.__init__(self) 727 _generated_rpc.RpcClientDnsOnly.__init__(self) 728 _generated_rpc.RpcClientDefault.__init__(self)
729
730 - def _NicDict(self, nic):
731 """Convert the given nic to a dict and encapsulate netinfo 732 733 """ 734 n = copy.deepcopy(nic) 735 if n.network: 736 net_uuid = self._cfg.LookupNetwork(n.network) 737 if net_uuid: 738 nobj = self._cfg.GetNetwork(net_uuid) 739 n.netinfo = objects.Network.ToDict(nobj) 740 return n.ToDict()
741
742 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
743 """Convert the given instance to a dict. 744 745 This is done via the instance's ToDict() method and additionally 746 we fill the hvparams with the cluster defaults. 747 748 @type instance: L{objects.Instance} 749 @param instance: an Instance object 750 @type hvp: dict or None 751 @param hvp: a dictionary with overridden hypervisor parameters 752 @type bep: dict or None 753 @param bep: a dictionary with overridden backend parameters 754 @type osp: dict or None 755 @param osp: a dictionary with overridden os parameters 756 @rtype: dict 757 @return: the instance dict, with the hvparams filled with the 758 cluster defaults 759 760 """ 761 idict = instance.ToDict() 762 cluster = self._cfg.GetClusterInfo() 763 idict["hvparams"] = cluster.FillHV(instance) 764 if hvp is not None: 765 idict["hvparams"].update(hvp) 766 idict["beparams"] = cluster.FillBE(instance) 767 if bep is not None: 768 idict["beparams"].update(bep) 769 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 770 if osp is not None: 771 idict["osparams"].update(osp) 772 idict["disks"] = self._DisksDictDP((instance.disks, instance)) 773 for nic in idict["nics"]: 774 nic["nicparams"] = objects.FillDict( 775 cluster.nicparams[constants.PP_DEFAULT], 776 nic["nicparams"]) 777 network = nic.get("network", None) 778 if network: 779 net_uuid = self._cfg.LookupNetwork(network) 780 if net_uuid: 781 nobj = self._cfg.GetNetwork(net_uuid) 782 nic["netinfo"] = objects.Network.ToDict(nobj) 783 return idict
784
785 - def _InstDictHvpBepDp(self, (instance, hvp, bep)):
786 """Wrapper for L{_InstDict}. 787 788 """ 789 return self._InstDict(instance, hvp=hvp, bep=bep)
790
791 - def _InstDictOspDp(self, (instance, osparams)):
792 """Wrapper for L{_InstDict}. 793 794 """ 795 return self._InstDict(instance, osp=osparams)
796
797 - def _DisksDictDP(self, (disks, instance)):
798 """Wrapper for L{AnnotateDiskParams}. 799 800 """ 801 diskparams = self._cfg.GetInstanceDiskParams(instance) 802 return [disk.ToDict() 803 for disk in AnnotateDiskParams(instance.disk_template, 804 disks, diskparams)]
805
806 - def _SingleDiskDictDP(self, (disk, instance)):
807 """Wrapper for L{AnnotateDiskParams}. 808 809 """ 810 (anno_disk,) = self._DisksDictDP(([disk], instance)) 811 return anno_disk
812
813 814 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
815 """RPC wrappers for job queue. 816 817 """
818 - def __init__(self, context, address_list):
819 """Initializes this class. 820 821 """ 822 if address_list is None: 823 resolver = compat.partial(_SsconfResolver, True) 824 else: 825 # Caller provided an address list 826 resolver = _StaticResolver(address_list) 827 828 _RpcClientBase.__init__(self, resolver, _ENCODERS.get, 829 lock_monitor_cb=context.glm.AddToLockMonitor) 830 _generated_rpc.RpcClientJobQueue.__init__(self)
831
832 833 -class BootstrapRunner(_RpcClientBase, 834 _generated_rpc.RpcClientBootstrap, 835 _generated_rpc.RpcClientDnsOnly):
836 """RPC wrappers for bootstrapping. 837 838 """
839 - def __init__(self):
840 """Initializes this class. 841 842 """ 843 # Pylint doesn't recognize multiple inheritance properly, see 844 # <http://www.logilab.org/ticket/36586> and 845 # <http://www.logilab.org/ticket/35642> 846 # pylint: disable=W0233 847 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True), 848 _ENCODERS.get) 849 _generated_rpc.RpcClientBootstrap.__init__(self) 850 _generated_rpc.RpcClientDnsOnly.__init__(self)
851
852 853 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
854 """RPC wrappers for calls using only DNS. 855 856 """
857 - def __init__(self):
858 """Initialize this class. 859 860 """ 861 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False), 862 _ENCODERS.get) 863 _generated_rpc.RpcClientDnsOnly.__init__(self)
864
865 866 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
867 """RPC wrappers for L{config}. 868 869 """
870 - def __init__(self, context, address_list, _req_process_fn=None, 871 _getents=None):
872 """Initializes this class. 873 874 """ 875 if context: 876 lock_monitor_cb = context.glm.AddToLockMonitor 877 else: 878 lock_monitor_cb = None 879 880 if address_list is None: 881 resolver = compat.partial(_SsconfResolver, True) 882 else: 883 # Caller provided an address list 884 resolver = _StaticResolver(address_list) 885 886 encoders = _ENCODERS.copy() 887 888 encoders.update({ 889 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 890 }) 891 892 _RpcClientBase.__init__(self, resolver, encoders.get, 893 lock_monitor_cb=lock_monitor_cb, 894 _req_process_fn=_req_process_fn) 895 _generated_rpc.RpcClientConfig.__init__(self)
896