Package ganeti :: Module rpc
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Inter-node RPC library. 
  32   
  33  """ 
  34   
  35  # pylint: disable=C0103,R0201,R0904 
  36  # C0103: Invalid name, since call_ are not valid 
  37  # R0201: Method could be a function, we keep all rpcs instance methods 
  38  # as not to change them back and forth between static/instance methods 
  39  # if they need to start using instance attributes 
  40  # R0904: Too many public methods 
  41   
  42  import logging 
  43  import zlib 
  44  import base64 
  45  import pycurl 
  46  import threading 
  47  import copy 
  48   
  49  from ganeti import utils 
  50  from ganeti import objects 
  51  from ganeti import http 
  52  from ganeti import serializer 
  53  from ganeti import constants 
  54  from ganeti import errors 
  55  from ganeti import netutils 
  56  from ganeti import ssconf 
  57  from ganeti import runtime 
  58  from ganeti import compat 
  59  from ganeti import rpc_defs 
  60  from ganeti import pathutils 
  61  from ganeti import vcluster 
  62   
  63  # Special module generated at build time 
  64  from ganeti import _generated_rpc 
  65   
  66  # pylint has a bug here, doesn't see this import 
  67  import ganeti.http.client  # pylint: disable=W0611 
  68   
  69   
  70  _RPC_CLIENT_HEADERS = [ 
  71    "Content-type: %s" % http.HTTP_APP_JSON, 
  72    "Expect:", 
  73    ] 
  74   
  75  #: Special value to describe an offline host 
  76  _OFFLINE = object() 
77 78 79 -def Init():
80 """Initializes the module-global HTTP client manager. 81 82 Must be called before using any RPC function and while exactly one thread is 83 running. 84 85 """ 86 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 87 # one thread running. This check is just a safety measure -- it doesn't 88 # cover all cases. 89 assert threading.activeCount() == 1, \ 90 "Found more than one active thread when initializing pycURL" 91 92 logging.info("Using PycURL %s", pycurl.version) 93 94 pycurl.global_init(pycurl.GLOBAL_ALL)
95
96 97 -def Shutdown():
98 """Stops the module-global HTTP client manager. 99 100 Must be called before quitting the program and while exactly one thread is 101 running. 102 103 """ 104 pycurl.global_cleanup()
105
106 107 -def _ConfigRpcCurl(curl):
108 noded_cert = str(pathutils.NODED_CERT_FILE) 109 110 curl.setopt(pycurl.FOLLOWLOCATION, False) 111 curl.setopt(pycurl.CAINFO, noded_cert) 112 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 113 curl.setopt(pycurl.SSL_VERIFYPEER, True) 114 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 115 curl.setopt(pycurl.SSLCERT, noded_cert) 116 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 117 curl.setopt(pycurl.SSLKEY, noded_cert) 118 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
119
120 121 -def RunWithRPC(fn):
122 """RPC-wrapper decorator. 123 124 When applied to a function, it runs it with the RPC system 125 initialized, and it shutsdown the system afterwards. This means the 126 function must be called without RPC being initialized. 127 128 """ 129 def wrapper(*args, **kwargs): 130 Init() 131 try: 132 return fn(*args, **kwargs) 133 finally: 134 Shutdown()
135 return wrapper 136
137 138 -def _Compress(_, data):
139 """Compresses a string for transport over RPC. 140 141 Small amounts of data are not compressed. 142 143 @type data: str 144 @param data: Data 145 @rtype: tuple 146 @return: Encoded data to send 147 148 """ 149 # Small amounts of data are not compressed 150 if len(data) < 512: 151 return (constants.RPC_ENCODING_NONE, data) 152 153 # Compress with zlib and encode in base64 154 return (constants.RPC_ENCODING_ZLIB_BASE64, 155 base64.b64encode(zlib.compress(data, 3)))
156
157 158 -class RpcResult(object):
159 """RPC Result class. 160 161 This class holds an RPC result. It is needed since in multi-node 162 calls we can't raise an exception just because one out of many 163 failed, and therefore we use this class to encapsulate the result. 164 165 @ivar data: the data payload, for successful results, or None 166 @ivar call: the name of the RPC call 167 @ivar node: the name of the node to which we made the call 168 @ivar offline: whether the operation failed because the node was 169 offline, as opposed to actual failure; offline=True will always 170 imply failed=True, in order to allow simpler checking if 171 the user doesn't care about the exact failure mode 172 @ivar fail_msg: the error message if the call failed 173 174 """
175 - def __init__(self, data=None, failed=False, offline=False, 176 call=None, node=None):
177 self.offline = offline 178 self.call = call 179 self.node = node 180 181 if offline: 182 self.fail_msg = "Node is marked offline" 183 self.data = self.payload = None 184 elif failed: 185 self.fail_msg = self._EnsureErr(data) 186 self.data = self.payload = None 187 else: 188 self.data = data 189 if not isinstance(self.data, (tuple, list)): 190 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 191 type(self.data)) 192 self.payload = None 193 elif len(data) != 2: 194 self.fail_msg = ("RPC layer error: invalid result length (%d), " 195 "expected 2" % len(self.data)) 196 self.payload = None 197 elif not self.data[0]: 198 self.fail_msg = self._EnsureErr(self.data[1]) 199 self.payload = None 200 else: 201 # finally success 202 self.fail_msg = None 203 self.payload = data[1] 204 205 for attr_name in ["call", "data", "fail_msg", 206 "node", "offline", "payload"]: 207 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
208 209 @staticmethod
210 - def _EnsureErr(val):
211 """Helper to ensure we return a 'True' value for error.""" 212 if val: 213 return val 214 else: 215 return "No error information"
216
217 - def Raise(self, msg, prereq=False, ecode=None):
218 """If the result has failed, raise an OpExecError. 219 220 This is used so that LU code doesn't have to check for each 221 result, but instead can call this function. 222 223 """ 224 if not self.fail_msg: 225 return 226 227 if not msg: # one could pass None for default message 228 msg = ("Call '%s' to node '%s' has failed: %s" % 229 (self.call, self.node, self.fail_msg)) 230 else: 231 msg = "%s: %s" % (msg, self.fail_msg) 232 if prereq: 233 ec = errors.OpPrereqError 234 else: 235 ec = errors.OpExecError 236 if ecode is not None: 237 args = (msg, ecode) 238 else: 239 args = (msg, ) 240 raise ec(*args) # pylint: disable=W0142
241
242 - def Warn(self, msg, feedback_fn):
243 """If the result has failed, call the feedback_fn. 244 245 This is used to in cases were LU wants to warn the 246 user about a failure, but continue anyway. 247 248 """ 249 if not self.fail_msg: 250 return 251 252 msg = "%s: %s" % (msg, self.fail_msg) 253 feedback_fn(msg)
254
255 256 -def _SsconfResolver(ssconf_ips, node_list, _, 257 ssc=ssconf.SimpleStore, 258 nslookup_fn=netutils.Hostname.GetIP):
259 """Return addresses for given node names. 260 261 @type ssconf_ips: bool 262 @param ssconf_ips: Use the ssconf IPs 263 @type node_list: list 264 @param node_list: List of node names 265 @type ssc: class 266 @param ssc: SimpleStore class that is used to obtain node->ip mappings 267 @type nslookup_fn: callable 268 @param nslookup_fn: function use to do NS lookup 269 @rtype: list of tuple; (string, string) 270 @return: List of tuples containing node name and IP address 271 272 """ 273 ss = ssc() 274 family = ss.GetPrimaryIPFamily() 275 276 if ssconf_ips: 277 iplist = ss.GetNodePrimaryIPList() 278 ipmap = dict(entry.split() for entry in iplist) 279 else: 280 ipmap = {} 281 282 result = [] 283 for node in node_list: 284 ip = ipmap.get(node) 285 if ip is None: 286 ip = nslookup_fn(node, family=family) 287 result.append((node, ip, node)) 288 289 return result
290
291 292 -class _StaticResolver:
293 - def __init__(self, addresses):
294 """Initializes this class. 295 296 """ 297 self._addresses = addresses
298
299 - def __call__(self, hosts, _):
300 """Returns static addresses for hosts. 301 302 """ 303 assert len(hosts) == len(self._addresses) 304 return zip(hosts, self._addresses, hosts)
305
306 307 -def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
308 """Checks if a node is online. 309 310 @type node_uuid_or_name: string 311 @param node_uuid_or_name: Node UUID 312 @type node: L{objects.Node} or None 313 @param node: Node object 314 315 """ 316 if node is None: 317 # Assume that the passed parameter was actually a node name, so depend on 318 # DNS for name resolution 319 return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name) 320 else: 321 if node.offline and not accept_offline_node: 322 ip = _OFFLINE 323 else: 324 ip = node.primary_ip 325 return (node.name, ip, node_uuid_or_name)
326
327 328 -def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
329 """Calculate node addresses using configuration. 330 331 Note that strings in node_uuids are treated as node names if the UUID is not 332 found in the configuration. 333 334 """ 335 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE) 336 337 assert accept_offline_node or opts is None, "Unknown option" 338 339 # Special case for single-host lookups 340 if len(node_uuids) == 1: 341 (uuid, ) = node_uuids 342 return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)] 343 else: 344 all_nodes = all_nodes_fn() 345 return [_CheckConfigNode(uuid, all_nodes.get(uuid, None), 346 accept_offline_node) 347 for uuid in node_uuids]
348
349 350 -class _RpcProcessor:
351 - def __init__(self, resolver, port, lock_monitor_cb=None):
352 """Initializes this class. 353 354 @param resolver: callable accepting a list of node UUIDs or hostnames, 355 returning a list of tuples containing name, IP address and original name 356 of the resolved node. IP address can be the name or the special value 357 L{_OFFLINE} to mark offline machines. 358 @type port: int 359 @param port: TCP port 360 @param lock_monitor_cb: Callable for registering with lock monitor 361 362 """ 363 self._resolver = resolver 364 self._port = port 365 self._lock_monitor_cb = lock_monitor_cb
366 367 @staticmethod
368 - def _PrepareRequests(hosts, port, procedure, body, read_timeout):
369 """Prepares requests by sorting offline hosts into separate list. 370 371 @type body: dict 372 @param body: a dictionary with per-host body data 373 374 """ 375 results = {} 376 requests = {} 377 378 assert isinstance(body, dict) 379 assert len(body) == len(hosts) 380 assert compat.all(isinstance(v, str) for v in body.values()) 381 assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \ 382 "%s != %s" % (hosts, body.keys()) 383 384 for (name, ip, original_name) in hosts: 385 if ip is _OFFLINE: 386 # Node is marked as offline 387 results[original_name] = RpcResult(node=name, 388 offline=True, 389 call=procedure) 390 else: 391 requests[original_name] = \ 392 http.client.HttpClientRequest(str(ip), port, 393 http.HTTP_POST, str("/%s" % procedure), 394 headers=_RPC_CLIENT_HEADERS, 395 post_data=body[original_name], 396 read_timeout=read_timeout, 397 nicename="%s/%s" % (name, procedure), 398 curl_config_fn=_ConfigRpcCurl) 399 400 return (results, requests)
401 402 @staticmethod
403 - def _CombineResults(results, requests, procedure):
404 """Combines pre-computed results for offline hosts with actual call results. 405 406 """ 407 for name, req in requests.items(): 408 if req.success and req.resp_status_code == http.HTTP_OK: 409 host_result = RpcResult(data=serializer.LoadJson(req.resp_body), 410 node=name, call=procedure) 411 else: 412 # TODO: Better error reporting 413 if req.error: 414 msg = req.error 415 else: 416 msg = req.resp_body 417 418 logging.error("RPC error in %s on node %s: %s", procedure, name, msg) 419 host_result = RpcResult(data=msg, failed=True, node=name, 420 call=procedure) 421 422 results[name] = host_result 423 424 return results
425
426 - def __call__(self, nodes, procedure, body, read_timeout, resolver_opts, 427 _req_process_fn=None):
428 """Makes an RPC request to a number of nodes. 429 430 @type nodes: sequence 431 @param nodes: node UUIDs or Hostnames 432 @type procedure: string 433 @param procedure: Request path 434 @type body: dictionary 435 @param body: dictionary with request bodies per host 436 @type read_timeout: int or None 437 @param read_timeout: Read timeout for request 438 @rtype: dictionary 439 @return: a dictionary mapping host names to rpc.RpcResult objects 440 441 """ 442 assert read_timeout is not None, \ 443 "Missing RPC read timeout for procedure '%s'" % procedure 444 445 if _req_process_fn is None: 446 _req_process_fn = http.client.ProcessRequests 447 448 (results, requests) = \ 449 self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port, 450 procedure, body, read_timeout) 451 452 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) 453 454 assert not frozenset(results).intersection(requests) 455 456 return self._CombineResults(results, requests, procedure)
457
458 459 -class _RpcClientBase:
460 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None, 461 _req_process_fn=None):
462 """Initializes this class. 463 464 """ 465 proc = _RpcProcessor(resolver, 466 netutils.GetDaemonPort(constants.NODED), 467 lock_monitor_cb=lock_monitor_cb) 468 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn) 469 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
470 471 @staticmethod
472 - def _EncodeArg(encoder_fn, node, (argkind, value)):
473 """Encode argument. 474 475 """ 476 if argkind is None: 477 return value 478 else: 479 return encoder_fn(argkind)(node, value)
480
481 - def _Call(self, cdef, node_list, args):
482 """Entry point for automatically generated RPC wrappers. 483 484 """ 485 (procedure, _, resolver_opts, timeout, argdefs, 486 prep_fn, postproc_fn, _) = cdef 487 488 if callable(timeout): 489 read_timeout = timeout(args) 490 else: 491 read_timeout = timeout 492 493 if callable(resolver_opts): 494 req_resolver_opts = resolver_opts(args) 495 else: 496 req_resolver_opts = resolver_opts 497 498 if len(args) != len(argdefs): 499 raise errors.ProgrammerError("Number of passed arguments doesn't match") 500 501 if prep_fn is None: 502 prep_fn = lambda _, args: args 503 assert callable(prep_fn) 504 505 # encode the arguments for each node individually, pass them and the node 506 # name to the prep_fn, and serialise its return value 507 encode_args_fn = lambda node: map(compat.partial(self._encoder, node), 508 zip(map(compat.snd, argdefs), args)) 509 pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n)))) 510 for n in node_list) 511 512 result = self._proc(node_list, procedure, pnbody, read_timeout, 513 req_resolver_opts) 514 515 if postproc_fn: 516 return dict(map(lambda (key, value): (key, postproc_fn(value)), 517 result.items())) 518 else: 519 return result
520
521 522 -def _ObjectToDict(_, value):
523 """Converts an object to a dictionary. 524 525 @note: See L{objects}. 526 527 """ 528 return value.ToDict()
529
530 531 -def _ObjectListToDict(node, value):
532 """Converts a list of L{objects} to dictionaries. 533 534 """ 535 return map(compat.partial(_ObjectToDict, node), value)
536
537 538 -def _PrepareFileUpload(getents_fn, node, filename):
539 """Loads a file and prepares it for an upload to nodes. 540 541 """ 542 statcb = utils.FileStatHelper() 543 data = _Compress(node, utils.ReadFile(filename, preread=statcb)) 544 st = statcb.st 545 546 if getents_fn is None: 547 getents_fn = runtime.GetEnts 548 549 getents = getents_fn() 550 551 virt_filename = vcluster.MakeVirtualPath(filename) 552 553 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid), 554 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
555
556 557 -def _PrepareFinalizeExportDisks(_, snap_disks):
558 """Encodes disks for finalizing export. 559 560 """ 561 flat_disks = [] 562 563 for disk in snap_disks: 564 if isinstance(disk, bool): 565 flat_disks.append(disk) 566 else: 567 flat_disks.append(disk.ToDict()) 568 569 return flat_disks
570
571 572 -def _EncodeBlockdevRename(_, value):
573 """Encodes information for renaming block devices. 574 575 """ 576 return [(d.ToDict(), uid) for d, uid in value]
577
578 579 -def _AddSpindlesToLegacyNodeInfo(result, space_info):
580 """Extracts the spindle information from the space info and adds 581 it to the result dictionary. 582 583 @type result: dict of strings 584 @param result: dictionary holding the result of the legacy node info 585 @type space_info: list of dicts of strings 586 @param space_info: list, each row holding space information of one storage 587 unit 588 @rtype: None 589 @return: does not return anything, manipulates the C{result} variable 590 591 """ 592 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType( 593 space_info, constants.ST_LVM_PV) 594 if lvm_pv_info: 595 result["spindles_free"] = lvm_pv_info["storage_free"] 596 result["spindles_total"] = lvm_pv_info["storage_size"] 597 else: 598 result["spindles_free"] = 0 599 result["spindles_total"] = 0
600
601 602 -def _AddStorageInfoToLegacyNodeInfoByTemplate( 603 result, space_info, disk_template):
604 """Extracts the storage space information of the disk template from 605 the space info and adds it to the result dictionary. 606 607 @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information. 608 609 """ 610 if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template): 611 disk_info = utils.storage.LookupSpaceInfoByDiskTemplate( 612 space_info, disk_template) 613 result["name"] = disk_info["name"] 614 result["storage_free"] = disk_info["storage_free"] 615 result["storage_size"] = disk_info["storage_size"] 616 else: 617 # FIXME: consider displaying '-' in this case 618 result["storage_free"] = 0 619 result["storage_size"] = 0
620
621 622 -def MakeLegacyNodeInfo(data, disk_template):
623 """Formats the data returned by L{rpc.RpcRunner.call_node_info}. 624 625 Converts the data into a single dictionary. This is fine for most use cases, 626 but some require information from more than one volume group or hypervisor. 627 628 """ 629 (bootid, space_info, (hv_info, )) = data 630 631 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid}) 632 633 _AddSpindlesToLegacyNodeInfo(ret, space_info) 634 _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template) 635 636 return ret
637
638 639 -def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
640 """Annotates just DRBD disks layouts. 641 642 """ 643 assert disk.dev_type == constants.DT_DRBD8 644 645 disk.params = objects.FillDict(drbd_params, disk.params) 646 (dev_data, dev_meta) = disk.children 647 dev_data.params = objects.FillDict(data_params, dev_data.params) 648 dev_meta.params = objects.FillDict(meta_params, dev_meta.params) 649 650 return disk
651
652 653 -def _AnnotateDParamsGeneric(disk, (params, )):
654 """Generic disk parameter annotation routine. 655 656 """ 657 assert disk.dev_type != constants.DT_DRBD8 658 659 disk.params = objects.FillDict(params, disk.params) 660 661 return disk
662
663 664 -def AnnotateDiskParams(disks, disk_params):
665 """Annotates the disk objects with the disk parameters. 666 667 @param disks: The list of disks objects to annotate 668 @param disk_params: The disk parameters for annotation 669 @returns: A list of disk objects annotated 670 671 """ 672 def AnnotateDisk(disk): 673 if disk.dev_type == constants.DT_DISKLESS: 674 return disk 675 676 ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params) 677 678 if disk.dev_type == constants.DT_DRBD8: 679 return _AnnotateDParamsDRBD(disk, ld_params) 680 else: 681 return _AnnotateDParamsGeneric(disk, ld_params)
682 683 return [AnnotateDisk(disk.Copy()) for disk in disks] 684
685 686 -def _GetExclusiveStorageFlag(cfg, node_uuid):
687 ni = cfg.GetNodeInfo(node_uuid) 688 if ni is None: 689 raise errors.OpPrereqError("Invalid node name %s" % node_uuid, 690 errors.ECODE_NOENT) 691 return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
692
693 694 -def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
695 """Adds the exclusive storage flag to lvm units. 696 697 This function creates a copy of the storage_units lists, with the 698 es_flag being added to all lvm storage units. 699 700 @type storage_units: list of pairs (string, string) 701 @param storage_units: list of 'raw' storage units, consisting only of 702 (storage_type, storage_key) 703 @type es_flag: boolean 704 @param es_flag: exclusive storage flag 705 @rtype: list of tuples (string, string, list) 706 @return: list of storage units (storage_type, storage_key, params) with 707 the params containing the es_flag for lvm-vg storage units 708 709 """ 710 result = [] 711 for (storage_type, storage_key) in storage_units: 712 if storage_type in [constants.ST_LVM_VG]: 713 result.append((storage_type, storage_key, [es_flag])) 714 if es_flag: 715 result.append((constants.ST_LVM_PV, storage_key, [es_flag])) 716 else: 717 result.append((storage_type, storage_key, [])) 718 return result
719
720 721 -def GetExclusiveStorageForNodes(cfg, node_uuids):
722 """Return the exclusive storage flag for all the given nodes. 723 724 @type cfg: L{config.ConfigWriter} 725 @param cfg: cluster configuration 726 @type node_uuids: list or tuple 727 @param node_uuids: node UUIDs for which to read the flag 728 @rtype: dict 729 @return: mapping from node uuids to exclusive storage flags 730 @raise errors.OpPrereqError: if any given node name has no corresponding 731 node 732 733 """ 734 getflag = lambda n: _GetExclusiveStorageFlag(cfg, n) 735 flags = map(getflag, node_uuids) 736 return dict(zip(node_uuids, flags))
737
738 739 -def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
740 """Return the lvm storage unit for all the given nodes. 741 742 Main purpose of this function is to map the exclusive storage flag, which 743 can be different for each node, to the default LVM storage unit. 744 745 @type cfg: L{config.ConfigWriter} 746 @param cfg: cluster configuration 747 @type storage_units: list of pairs (string, string) 748 @param storage_units: list of 'raw' storage units, e.g. pairs of 749 (storage_type, storage_key) 750 @type node_uuids: list or tuple 751 @param node_uuids: node UUIDs for which to read the flag 752 @rtype: dict 753 @return: mapping from node uuids to a list of storage units which include 754 the exclusive storage flag for lvm storage 755 @raise errors.OpPrereqError: if any given node name has no corresponding 756 node 757 758 """ 759 getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits( 760 storage_units, _GetExclusiveStorageFlag(cfg, n)) 761 flags = map(getunit, node_uuids) 762 return dict(zip(node_uuids, flags))
763 764 765 #: Generic encoders 766 _ENCODERS = { 767 rpc_defs.ED_OBJECT_DICT: _ObjectToDict, 768 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, 769 rpc_defs.ED_COMPRESS: _Compress, 770 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, 771 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, 772 }
773 774 775 -class RpcRunner(_RpcClientBase, 776 _generated_rpc.RpcClientDefault, 777 _generated_rpc.RpcClientBootstrap, 778 _generated_rpc.RpcClientDnsOnly, 779 _generated_rpc.RpcClientConfig):
780 """RPC runner class. 781 782 """
783 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
784 """Initialized the RPC runner. 785 786 @type cfg: L{config.ConfigWriter} 787 @param cfg: Configuration 788 @type lock_monitor_cb: callable 789 @param lock_monitor_cb: Lock monitor callback 790 791 """ 792 self._cfg = cfg 793 794 encoders = _ENCODERS.copy() 795 796 encoders.update({ 797 # Encoders requiring configuration object 798 rpc_defs.ED_INST_DICT: self._InstDict, 799 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp, 800 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp, 801 rpc_defs.ED_NIC_DICT: self._NicDict, 802 rpc_defs.ED_DEVICE_DICT: self._DeviceDict, 803 804 # Encoders annotating disk parameters 805 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP, 806 rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP, 807 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP, 808 rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP, 809 810 # Encoders with special requirements 811 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 812 813 rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO, 814 }) 815 816 # Resolver using configuration 817 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo, 818 cfg.GetAllNodesInfo) 819 820 # Pylint doesn't recognize multiple inheritance properly, see 821 # <http://www.logilab.org/ticket/36586> and 822 # <http://www.logilab.org/ticket/35642> 823 # pylint: disable=W0233 824 _RpcClientBase.__init__(self, resolver, encoders.get, 825 lock_monitor_cb=lock_monitor_cb, 826 _req_process_fn=_req_process_fn) 827 _generated_rpc.RpcClientConfig.__init__(self) 828 _generated_rpc.RpcClientBootstrap.__init__(self) 829 _generated_rpc.RpcClientDnsOnly.__init__(self) 830 _generated_rpc.RpcClientDefault.__init__(self)
831
832 - def _NicDict(self, _, nic):
833 """Convert the given nic to a dict and encapsulate netinfo 834 835 """ 836 n = copy.deepcopy(nic) 837 if n.network: 838 net_uuid = self._cfg.LookupNetwork(n.network) 839 if net_uuid: 840 nobj = self._cfg.GetNetwork(net_uuid) 841 n.netinfo = objects.Network.ToDict(nobj) 842 return n.ToDict()
843
844 - def _DeviceDict(self, _, (device, instance)):
845 if isinstance(device, objects.NIC): 846 return self._NicDict(None, device) 847 elif isinstance(device, objects.Disk): 848 return self._SingleDiskDictDP(None, (device, instance))
849
850 - def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
851 """Convert the given instance to a dict. 852 853 This is done via the instance's ToDict() method and additionally 854 we fill the hvparams with the cluster defaults. 855 856 @type instance: L{objects.Instance} 857 @param instance: an Instance object 858 @type hvp: dict or None 859 @param hvp: a dictionary with overridden hypervisor parameters 860 @type bep: dict or None 861 @param bep: a dictionary with overridden backend parameters 862 @type osp: dict or None 863 @param osp: a dictionary with overridden os parameters 864 @rtype: dict 865 @return: the instance dict, with the hvparams filled with the 866 cluster defaults 867 868 """ 869 idict = instance.ToDict() 870 cluster = self._cfg.GetClusterInfo() 871 idict["hvparams"] = cluster.FillHV(instance) 872 if hvp is not None: 873 idict["hvparams"].update(hvp) 874 idict["beparams"] = cluster.FillBE(instance) 875 if bep is not None: 876 idict["beparams"].update(bep) 877 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 878 if osp is not None: 879 idict["osparams"].update(osp) 880 idict["disks"] = self._DisksDictDP(node, (instance.disks, instance)) 881 for nic in idict["nics"]: 882 nic["nicparams"] = objects.FillDict( 883 cluster.nicparams[constants.PP_DEFAULT], 884 nic["nicparams"]) 885 network = nic.get("network", None) 886 if network: 887 net_uuid = self._cfg.LookupNetwork(network) 888 if net_uuid: 889 nobj = self._cfg.GetNetwork(net_uuid) 890 nic["netinfo"] = objects.Network.ToDict(nobj) 891 return idict
892
893 - def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
894 """Wrapper for L{_InstDict}. 895 896 """ 897 return self._InstDict(node, instance, hvp=hvp, bep=bep)
898
899 - def _InstDictOspDp(self, node, (instance, osparams)):
900 """Wrapper for L{_InstDict}. 901 902 """ 903 return self._InstDict(node, instance, osp=osparams)
904
905 - def _DisksDictDP(self, node, (disks, instance)):
906 """Wrapper for L{AnnotateDiskParams}. 907 908 """ 909 diskparams = self._cfg.GetInstanceDiskParams(instance) 910 ret = [] 911 for disk in AnnotateDiskParams(disks, diskparams): 912 disk_node_uuids = disk.GetNodes(instance.primary_node) 913 node_ips = dict((uuid, node.secondary_ip) for (uuid, node) 914 in self._cfg.GetMultiNodeInfo(disk_node_uuids)) 915 916 disk.UpdateDynamicDiskParams(node, node_ips) 917 918 ret.append(disk.ToDict(include_dynamic_params=True)) 919 920 return ret
921
922 - def _MultiDiskDictDP(self, node, disks_insts):
923 """Wrapper for L{AnnotateDiskParams}. 924 925 Supports a list of (disk, instance) tuples. 926 """ 927 return [disk for disk_inst in disks_insts 928 for disk in self._DisksDictDP(node, disk_inst)]
929
930 - def _SingleDiskDictDP(self, node, (disk, instance)):
931 """Wrapper for L{AnnotateDiskParams}. 932 933 """ 934 (anno_disk,) = self._DisksDictDP(node, ([disk], instance)) 935 return anno_disk
936
937 - def _EncodeNodeToDiskDictDP(self, node, value):
938 """Encode dict of node name -> list of (disk, instance) tuples as values. 939 940 """ 941 return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks]) 942 for name, disks in value.items())
943
944 - def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
945 """Encodes import/export I/O information. 946 947 """ 948 if ieio == constants.IEIO_RAW_DISK: 949 assert len(ieioargs) == 1 950 return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), )) 951 952 if ieio == constants.IEIO_SCRIPT: 953 assert len(ieioargs) == 2 954 return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1])) 955 956 return (ieio, ieioargs)
957
958 959 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
960 """RPC wrappers for job queue. 961 962 """
963 - def __init__(self, context, address_list):
964 """Initializes this class. 965 966 """ 967 if address_list is None: 968 resolver = compat.partial(_SsconfResolver, True) 969 else: 970 # Caller provided an address list 971 resolver = _StaticResolver(address_list) 972 973 _RpcClientBase.__init__(self, resolver, _ENCODERS.get, 974 lock_monitor_cb=context.glm.AddToLockMonitor) 975 _generated_rpc.RpcClientJobQueue.__init__(self)
976
977 978 -class BootstrapRunner(_RpcClientBase, 979 _generated_rpc.RpcClientBootstrap, 980 _generated_rpc.RpcClientDnsOnly):
981 """RPC wrappers for bootstrapping. 982 983 """
984 - def __init__(self):
985 """Initializes this class. 986 987 """ 988 # Pylint doesn't recognize multiple inheritance properly, see 989 # <http://www.logilab.org/ticket/36586> and 990 # <http://www.logilab.org/ticket/35642> 991 # pylint: disable=W0233 992 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True), 993 _ENCODERS.get) 994 _generated_rpc.RpcClientBootstrap.__init__(self) 995 _generated_rpc.RpcClientDnsOnly.__init__(self)
996
997 998 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
999 """RPC wrappers for calls using only DNS. 1000 1001 """
1002 - def __init__(self):
1003 """Initialize this class. 1004 1005 """ 1006 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False), 1007 _ENCODERS.get) 1008 _generated_rpc.RpcClientDnsOnly.__init__(self)
1009
1010 1011 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1012 """RPC wrappers for L{config}. 1013 1014 """
1015 - def __init__(self, context, address_list, _req_process_fn=None, 1016 _getents=None):
1017 """Initializes this class. 1018 1019 """ 1020 if context: 1021 lock_monitor_cb = context.glm.AddToLockMonitor 1022 else: 1023 lock_monitor_cb = None 1024 1025 if address_list is None: 1026 resolver = compat.partial(_SsconfResolver, True) 1027 else: 1028 # Caller provided an address list 1029 resolver = _StaticResolver(address_list) 1030 1031 encoders = _ENCODERS.copy() 1032 1033 encoders.update({ 1034 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 1035 }) 1036 1037 _RpcClientBase.__init__(self, resolver, encoders.get, 1038 lock_monitor_cb=lock_monitor_cb, 1039 _req_process_fn=_req_process_fn) 1040 _generated_rpc.RpcClientConfig.__init__(self)
1041