Package ganeti :: Module rpc
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Inter-node RPC library. 
  23   
  24  """ 
  25   
  26  # pylint: disable=C0103,R0201,R0904 
  27  # C0103: Invalid name, since call_ are not valid 
  28  # R0201: Method could be a function, we keep all rpcs instance methods 
  29  # as not to change them back and forth between static/instance methods 
  30  # if they need to start using instance attributes 
  31  # R0904: Too many public methods 
  32   
  33  import logging 
  34  import zlib 
  35  import base64 
  36  import pycurl 
  37  import threading 
  38  import copy 
  39   
  40  from ganeti import utils 
  41  from ganeti import objects 
  42  from ganeti import http 
  43  from ganeti import serializer 
  44  from ganeti import constants 
  45  from ganeti import errors 
  46  from ganeti import netutils 
  47  from ganeti import ssconf 
  48  from ganeti import runtime 
  49  from ganeti import compat 
  50  from ganeti import rpc_defs 
  51  from ganeti import pathutils 
  52  from ganeti import vcluster 
  53   
  54  # Special module generated at build time 
  55  from ganeti import _generated_rpc 
  56   
  57  # pylint has a bug here, doesn't see this import 
  58  import ganeti.http.client  # pylint: disable=W0611 
  59   
  60   
  61  _RPC_CLIENT_HEADERS = [ 
  62    "Content-type: %s" % http.HTTP_APP_JSON, 
  63    "Expect:", 
  64    ] 
  65   
  66  #: Special value to describe an offline host 
  67  _OFFLINE = object() 
68 69 70 -def Init():
71 """Initializes the module-global HTTP client manager. 72 73 Must be called before using any RPC function and while exactly one thread is 74 running. 75 76 """ 77 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 78 # one thread running. This check is just a safety measure -- it doesn't 79 # cover all cases. 80 assert threading.activeCount() == 1, \ 81 "Found more than one active thread when initializing pycURL" 82 83 logging.info("Using PycURL %s", pycurl.version) 84 85 pycurl.global_init(pycurl.GLOBAL_ALL)
86
87 88 -def Shutdown():
89 """Stops the module-global HTTP client manager. 90 91 Must be called before quitting the program and while exactly one thread is 92 running. 93 94 """ 95 pycurl.global_cleanup()
96
97 98 -def _ConfigRpcCurl(curl):
99 noded_cert = str(pathutils.NODED_CERT_FILE) 100 101 curl.setopt(pycurl.FOLLOWLOCATION, False) 102 curl.setopt(pycurl.CAINFO, noded_cert) 103 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 104 curl.setopt(pycurl.SSL_VERIFYPEER, True) 105 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 106 curl.setopt(pycurl.SSLCERT, noded_cert) 107 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 108 curl.setopt(pycurl.SSLKEY, noded_cert) 109 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110
111 112 -def RunWithRPC(fn):
113 """RPC-wrapper decorator. 114 115 When applied to a function, it runs it with the RPC system 116 initialized, and it shutsdown the system afterwards. This means the 117 function must be called without RPC being initialized. 118 119 """ 120 def wrapper(*args, **kwargs): 121 Init() 122 try: 123 return fn(*args, **kwargs) 124 finally: 125 Shutdown()
126 return wrapper 127
128 129 -def _Compress(data):
130 """Compresses a string for transport over RPC. 131 132 Small amounts of data are not compressed. 133 134 @type data: str 135 @param data: Data 136 @rtype: tuple 137 @return: Encoded data to send 138 139 """ 140 # Small amounts of data are not compressed 141 if len(data) < 512: 142 return (constants.RPC_ENCODING_NONE, data) 143 144 # Compress with zlib and encode in base64 145 return (constants.RPC_ENCODING_ZLIB_BASE64, 146 base64.b64encode(zlib.compress(data, 3)))
147
148 149 -class RpcResult(object):
150 """RPC Result class. 151 152 This class holds an RPC result. It is needed since in multi-node 153 calls we can't raise an exception just because one out of many 154 failed, and therefore we use this class to encapsulate the result. 155 156 @ivar data: the data payload, for successful results, or None 157 @ivar call: the name of the RPC call 158 @ivar node: the name of the node to which we made the call 159 @ivar offline: whether the operation failed because the node was 160 offline, as opposed to actual failure; offline=True will always 161 imply failed=True, in order to allow simpler checking if 162 the user doesn't care about the exact failure mode 163 @ivar fail_msg: the error message if the call failed 164 165 """
166 - def __init__(self, data=None, failed=False, offline=False, 167 call=None, node=None):
168 self.offline = offline 169 self.call = call 170 self.node = node 171 172 if offline: 173 self.fail_msg = "Node is marked offline" 174 self.data = self.payload = None 175 elif failed: 176 self.fail_msg = self._EnsureErr(data) 177 self.data = self.payload = None 178 else: 179 self.data = data 180 if not isinstance(self.data, (tuple, list)): 181 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 182 type(self.data)) 183 self.payload = None 184 elif len(data) != 2: 185 self.fail_msg = ("RPC layer error: invalid result length (%d), " 186 "expected 2" % len(self.data)) 187 self.payload = None 188 elif not self.data[0]: 189 self.fail_msg = self._EnsureErr(self.data[1]) 190 self.payload = None 191 else: 192 # finally success 193 self.fail_msg = None 194 self.payload = data[1] 195 196 for attr_name in ["call", "data", "fail_msg", 197 "node", "offline", "payload"]: 198 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199 200 @staticmethod
201 - def _EnsureErr(val):
202 """Helper to ensure we return a 'True' value for error.""" 203 if val: 204 return val 205 else: 206 return "No error information"
207
208 - def Raise(self, msg, prereq=False, ecode=None):
209 """If the result has failed, raise an OpExecError. 210 211 This is used so that LU code doesn't have to check for each 212 result, but instead can call this function. 213 214 """ 215 if not self.fail_msg: 216 return 217 218 if not msg: # one could pass None for default message 219 msg = ("Call '%s' to node '%s' has failed: %s" % 220 (self.call, self.node, self.fail_msg)) 221 else: 222 msg = "%s: %s" % (msg, self.fail_msg) 223 if prereq: 224 ec = errors.OpPrereqError 225 else: 226 ec = errors.OpExecError 227 if ecode is not None: 228 args = (msg, ecode) 229 else: 230 args = (msg, ) 231 raise ec(*args) # pylint: disable=W0142
232
233 - def Warn(self, msg, feedback_fn):
234 """If the result has failed, call the feedback_fn. 235 236 This is used to in cases were LU wants to warn the 237 user about a failure, but continue anyway. 238 239 """ 240 if not self.fail_msg: 241 return 242 243 msg = "%s: %s" % (msg, self.fail_msg) 244 feedback_fn(msg)
245
246 247 -def _SsconfResolver(ssconf_ips, node_list, _, 248 ssc=ssconf.SimpleStore, 249 nslookup_fn=netutils.Hostname.GetIP):
250 """Return addresses for given node names. 251 252 @type ssconf_ips: bool 253 @param ssconf_ips: Use the ssconf IPs 254 @type node_list: list 255 @param node_list: List of node names 256 @type ssc: class 257 @param ssc: SimpleStore class that is used to obtain node->ip mappings 258 @type nslookup_fn: callable 259 @param nslookup_fn: function use to do NS lookup 260 @rtype: list of tuple; (string, string) 261 @return: List of tuples containing node name and IP address 262 263 """ 264 ss = ssc() 265 family = ss.GetPrimaryIPFamily() 266 267 if ssconf_ips: 268 iplist = ss.GetNodePrimaryIPList() 269 ipmap = dict(entry.split() for entry in iplist) 270 else: 271 ipmap = {} 272 273 result = [] 274 for node in node_list: 275 ip = ipmap.get(node) 276 if ip is None: 277 ip = nslookup_fn(node, family=family) 278 result.append((node, ip, node)) 279 280 return result
281
282 283 -class _StaticResolver:
284 - def __init__(self, addresses):
285 """Initializes this class. 286 287 """ 288 self._addresses = addresses
289
290 - def __call__(self, hosts, _):
291 """Returns static addresses for hosts. 292 293 """ 294 assert len(hosts) == len(self._addresses) 295 return zip(hosts, self._addresses, hosts)
296
297 298 -def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
299 """Checks if a node is online. 300 301 @type node_uuid_or_name: string 302 @param node_uuid_or_name: Node UUID 303 @type node: L{objects.Node} or None 304 @param node: Node object 305 306 """ 307 if node is None: 308 # Assume that the passed parameter was actually a node name, so depend on 309 # DNS for name resolution 310 return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name) 311 else: 312 if node.offline and not accept_offline_node: 313 ip = _OFFLINE 314 else: 315 ip = node.primary_ip 316 return (node.name, ip, node_uuid_or_name)
317
318 319 -def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
320 """Calculate node addresses using configuration. 321 322 Note that strings in node_uuids are treated as node names if the UUID is not 323 found in the configuration. 324 325 """ 326 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE) 327 328 assert accept_offline_node or opts is None, "Unknown option" 329 330 # Special case for single-host lookups 331 if len(node_uuids) == 1: 332 (uuid, ) = node_uuids 333 return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)] 334 else: 335 all_nodes = all_nodes_fn() 336 return [_CheckConfigNode(uuid, all_nodes.get(uuid, None), 337 accept_offline_node) 338 for uuid in node_uuids]
339
340 341 -class _RpcProcessor:
342 - def __init__(self, resolver, port, lock_monitor_cb=None):
343 """Initializes this class. 344 345 @param resolver: callable accepting a list of node UUIDs or hostnames, 346 returning a list of tuples containing name, IP address and original name 347 of the resolved node. IP address can be the name or the special value 348 L{_OFFLINE} to mark offline machines. 349 @type port: int 350 @param port: TCP port 351 @param lock_monitor_cb: Callable for registering with lock monitor 352 353 """ 354 self._resolver = resolver 355 self._port = port 356 self._lock_monitor_cb = lock_monitor_cb
357 358 @staticmethod
359 - def _PrepareRequests(hosts, port, procedure, body, read_timeout):
360 """Prepares requests by sorting offline hosts into separate list. 361 362 @type body: dict 363 @param body: a dictionary with per-host body data 364 365 """ 366 results = {} 367 requests = {} 368 369 assert isinstance(body, dict) 370 assert len(body) == len(hosts) 371 assert compat.all(isinstance(v, str) for v in body.values()) 372 assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \ 373 "%s != %s" % (hosts, body.keys()) 374 375 for (name, ip, original_name) in hosts: 376 if ip is _OFFLINE: 377 # Node is marked as offline 378 results[original_name] = RpcResult(node=name, 379 offline=True, 380 call=procedure) 381 else: 382 requests[original_name] = \ 383 http.client.HttpClientRequest(str(ip), port, 384 http.HTTP_POST, str("/%s" % procedure), 385 headers=_RPC_CLIENT_HEADERS, 386 post_data=body[original_name], 387 read_timeout=read_timeout, 388 nicename="%s/%s" % (name, procedure), 389 curl_config_fn=_ConfigRpcCurl) 390 391 return (results, requests)
392 393 @staticmethod
394 - def _CombineResults(results, requests, procedure):
395 """Combines pre-computed results for offline hosts with actual call results. 396 397 """ 398 for name, req in requests.items(): 399 if req.success and req.resp_status_code == http.HTTP_OK: 400 host_result = RpcResult(data=serializer.LoadJson(req.resp_body), 401 node=name, call=procedure) 402 else: 403 # TODO: Better error reporting 404 if req.error: 405 msg = req.error 406 else: 407 msg = req.resp_body 408 409 logging.error("RPC error in %s on node %s: %s", procedure, name, msg) 410 host_result = RpcResult(data=msg, failed=True, node=name, 411 call=procedure) 412 413 results[name] = host_result 414 415 return results
416
417 - def __call__(self, nodes, procedure, body, read_timeout, resolver_opts, 418 _req_process_fn=None):
419 """Makes an RPC request to a number of nodes. 420 421 @type nodes: sequence 422 @param nodes: node UUIDs or Hostnames 423 @type procedure: string 424 @param procedure: Request path 425 @type body: dictionary 426 @param body: dictionary with request bodies per host 427 @type read_timeout: int or None 428 @param read_timeout: Read timeout for request 429 @rtype: dictionary 430 @return: a dictionary mapping host names to rpc.RpcResult objects 431 432 """ 433 assert read_timeout is not None, \ 434 "Missing RPC read timeout for procedure '%s'" % procedure 435 436 if _req_process_fn is None: 437 _req_process_fn = http.client.ProcessRequests 438 439 (results, requests) = \ 440 self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port, 441 procedure, body, read_timeout) 442 443 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) 444 445 assert not frozenset(results).intersection(requests) 446 447 return self._CombineResults(results, requests, procedure)
448
449 450 -class _RpcClientBase:
451 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None, 452 _req_process_fn=None):
453 """Initializes this class. 454 455 """ 456 proc = _RpcProcessor(resolver, 457 netutils.GetDaemonPort(constants.NODED), 458 lock_monitor_cb=lock_monitor_cb) 459 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn) 460 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
461 462 @staticmethod
463 - def _EncodeArg(encoder_fn, (argkind, value)):
464 """Encode argument. 465 466 """ 467 if argkind is None: 468 return value 469 else: 470 return encoder_fn(argkind)(value)
471
472 - def _Call(self, cdef, node_list, args):
473 """Entry point for automatically generated RPC wrappers. 474 475 """ 476 (procedure, _, resolver_opts, timeout, argdefs, 477 prep_fn, postproc_fn, _) = cdef 478 479 if callable(timeout): 480 read_timeout = timeout(args) 481 else: 482 read_timeout = timeout 483 484 if callable(resolver_opts): 485 req_resolver_opts = resolver_opts(args) 486 else: 487 req_resolver_opts = resolver_opts 488 489 if len(args) != len(argdefs): 490 raise errors.ProgrammerError("Number of passed arguments doesn't match") 491 492 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args)) 493 if prep_fn is None: 494 # for a no-op prep_fn, we serialise the body once, and then we 495 # reuse it in the dictionary values 496 body = serializer.DumpJson(enc_args) 497 pnbody = dict((n, body) for n in node_list) 498 else: 499 # for a custom prep_fn, we pass the encoded arguments and the 500 # node name to the prep_fn, and we serialise its return value 501 assert callable(prep_fn) 502 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args))) 503 for n in node_list) 504 505 result = self._proc(node_list, procedure, pnbody, read_timeout, 506 req_resolver_opts) 507 508 if postproc_fn: 509 return dict(map(lambda (key, value): (key, postproc_fn(value)), 510 result.items())) 511 else: 512 return result
513
514 515 -def _ObjectToDict(value):
516 """Converts an object to a dictionary. 517 518 @note: See L{objects}. 519 520 """ 521 return value.ToDict()
522
523 524 -def _ObjectListToDict(value):
525 """Converts a list of L{objects} to dictionaries. 526 527 """ 528 return map(_ObjectToDict, value)
529
530 531 -def _EncodeNodeToDiskDict(value):
532 """Encodes a dictionary with node name as key and disk objects as values. 533 534 """ 535 return dict((name, _ObjectListToDict(disks)) 536 for name, disks in value.items())
537
538 539 -def _PrepareFileUpload(getents_fn, filename):
540 """Loads a file and prepares it for an upload to nodes. 541 542 """ 543 statcb = utils.FileStatHelper() 544 data = _Compress(utils.ReadFile(filename, preread=statcb)) 545 st = statcb.st 546 547 if getents_fn is None: 548 getents_fn = runtime.GetEnts 549 550 getents = getents_fn() 551 552 virt_filename = vcluster.MakeVirtualPath(filename) 553 554 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid), 555 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
556
557 558 -def _PrepareFinalizeExportDisks(snap_disks):
559 """Encodes disks for finalizing export. 560 561 """ 562 flat_disks = [] 563 564 for disk in snap_disks: 565 if isinstance(disk, bool): 566 flat_disks.append(disk) 567 else: 568 flat_disks.append(disk.ToDict()) 569 570 return flat_disks
571
572 573 -def _EncodeImportExportIO((ieio, ieioargs)):
574 """Encodes import/export I/O information. 575 576 """ 577 if ieio == constants.IEIO_RAW_DISK: 578 assert len(ieioargs) == 1 579 return (ieio, (ieioargs[0].ToDict(), )) 580 581 if ieio == constants.IEIO_SCRIPT: 582 assert len(ieioargs) == 2 583 return (ieio, (ieioargs[0].ToDict(), ieioargs[1])) 584 585 return (ieio, ieioargs)
586
587 588 -def _EncodeBlockdevRename(value):
589 """Encodes information for renaming block devices. 590 591 """ 592 return [(d.ToDict(), uid) for d, uid in value]
593
594 595 -def _AddSpindlesToLegacyNodeInfo(result, space_info):
596 """Extracts the spindle information from the space info and adds 597 it to the result dictionary. 598 599 @type result: dict of strings 600 @param result: dictionary holding the result of the legacy node info 601 @type space_info: list of dicts of strings 602 @param space_info: list, each row holding space information of one storage 603 unit 604 @rtype: None 605 @return: does not return anything, manipulates the C{result} variable 606 607 """ 608 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType( 609 space_info, constants.ST_LVM_PV) 610 if lvm_pv_info: 611 result["spindles_free"] = lvm_pv_info["storage_free"] 612 result["spindles_total"] = lvm_pv_info["storage_size"] 613 else: 614 raise errors.OpExecError("No spindle storage information available.")
615
616 617 -def _AddDefaultStorageInfoToLegacyNodeInfo(result, space_info):
618 """Extracts the storage space information of the default storage type from 619 the space info and adds it to the result dictionary. 620 621 @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information. 622 623 """ 624 # Check if there is at least one row for non-spindle storage info. 625 no_defaults = (len(space_info) < 1) or \ 626 (space_info[0]["type"] == constants.ST_LVM_PV and len(space_info) == 1) 627 628 default_space_info = None 629 if no_defaults: 630 logging.warning("No storage info provided for default storage type.") 631 else: 632 default_space_info = space_info[0] 633 634 if default_space_info: 635 result["name"] = default_space_info["name"] 636 result["storage_free"] = default_space_info["storage_free"] 637 result["storage_size"] = default_space_info["storage_size"]
638
639 640 -def MakeLegacyNodeInfo(data, require_spindles=False):
641 """Formats the data returned by L{rpc.RpcRunner.call_node_info}. 642 643 Converts the data into a single dictionary. This is fine for most use cases, 644 but some require information from more than one volume group or hypervisor. 645 646 @param require_spindles: add spindle storage information to the legacy node 647 info 648 649 """ 650 (bootid, space_info, (hv_info, )) = data 651 652 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid}) 653 654 if require_spindles: 655 _AddSpindlesToLegacyNodeInfo(ret, space_info) 656 _AddDefaultStorageInfoToLegacyNodeInfo(ret, space_info) 657 658 return ret
659
660 661 -def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
662 """Annotates just DRBD disks layouts. 663 664 """ 665 assert disk.dev_type == constants.DT_DRBD8 666 667 disk.params = objects.FillDict(drbd_params, disk.params) 668 (dev_data, dev_meta) = disk.children 669 dev_data.params = objects.FillDict(data_params, dev_data.params) 670 dev_meta.params = objects.FillDict(meta_params, dev_meta.params) 671 672 return disk
673
674 675 -def _AnnotateDParamsGeneric(disk, (params, )):
676 """Generic disk parameter annotation routine. 677 678 """ 679 assert disk.dev_type != constants.DT_DRBD8 680 681 disk.params = objects.FillDict(params, disk.params) 682 683 return disk
684
685 686 -def AnnotateDiskParams(template, disks, disk_params):
687 """Annotates the disk objects with the disk parameters. 688 689 @param template: The disk template used 690 @param disks: The list of disks objects to annotate 691 @param disk_params: The disk paramaters for annotation 692 @returns: A list of disk objects annotated 693 694 """ 695 ld_params = objects.Disk.ComputeLDParams(template, disk_params) 696 697 if template == constants.DT_DRBD8: 698 annotation_fn = _AnnotateDParamsDRBD 699 elif template == constants.DT_DISKLESS: 700 annotation_fn = lambda disk, _: disk 701 else: 702 annotation_fn = _AnnotateDParamsGeneric 703 704 return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
705
706 707 -def _GetExclusiveStorageFlag(cfg, node_uuid):
708 ni = cfg.GetNodeInfo(node_uuid) 709 if ni is None: 710 raise errors.OpPrereqError("Invalid node name %s" % node_uuid, 711 errors.ECODE_NOENT) 712 return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
713
714 715 -def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
716 """Adds the exclusive storage flag to lvm units. 717 718 This function creates a copy of the storage_units lists, with the 719 es_flag being added to all lvm storage units. 720 721 @type storage_units: list of pairs (string, string) 722 @param storage_units: list of 'raw' storage units, consisting only of 723 (storage_type, storage_key) 724 @type es_flag: boolean 725 @param es_flag: exclusive storage flag 726 @rtype: list of tuples (string, string, list) 727 @return: list of storage units (storage_type, storage_key, params) with 728 the params containing the es_flag for lvm-vg storage units 729 730 """ 731 result = [] 732 for (storage_type, storage_key) in storage_units: 733 if storage_type in [constants.ST_LVM_VG, constants.ST_LVM_PV]: 734 result.append((storage_type, storage_key, [es_flag])) 735 else: 736 result.append((storage_type, storage_key, [])) 737 return result
738
739 740 -def GetExclusiveStorageForNodes(cfg, node_uuids):
741 """Return the exclusive storage flag for all the given nodes. 742 743 @type cfg: L{config.ConfigWriter} 744 @param cfg: cluster configuration 745 @type node_uuids: list or tuple 746 @param node_uuids: node UUIDs for which to read the flag 747 @rtype: dict 748 @return: mapping from node uuids to exclusive storage flags 749 @raise errors.OpPrereqError: if any given node name has no corresponding 750 node 751 752 """ 753 getflag = lambda n: _GetExclusiveStorageFlag(cfg, n) 754 flags = map(getflag, node_uuids) 755 return dict(zip(node_uuids, flags))
756
757 758 -def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
759 """Return the lvm storage unit for all the given nodes. 760 761 Main purpose of this function is to map the exclusive storage flag, which 762 can be different for each node, to the default LVM storage unit. 763 764 @type cfg: L{config.ConfigWriter} 765 @param cfg: cluster configuration 766 @type storage_units: list of pairs (string, string) 767 @param storage_units: list of 'raw' storage units, e.g. pairs of 768 (storage_type, storage_key) 769 @type node_uuids: list or tuple 770 @param node_uuids: node UUIDs for which to read the flag 771 @rtype: dict 772 @return: mapping from node uuids to a list of storage units which include 773 the exclusive storage flag for lvm storage 774 @raise errors.OpPrereqError: if any given node name has no corresponding 775 node 776 777 """ 778 getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits( 779 storage_units, _GetExclusiveStorageFlag(cfg, n)) 780 flags = map(getunit, node_uuids) 781 return dict(zip(node_uuids, flags))
782 783 784 #: Generic encoders 785 _ENCODERS = { 786 rpc_defs.ED_OBJECT_DICT: _ObjectToDict, 787 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, 788 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict, 789 rpc_defs.ED_COMPRESS: _Compress, 790 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, 791 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO, 792 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, 793 }
794 795 796 -class RpcRunner(_RpcClientBase, 797 _generated_rpc.RpcClientDefault, 798 _generated_rpc.RpcClientBootstrap, 799 _generated_rpc.RpcClientDnsOnly, 800 _generated_rpc.RpcClientConfig):
801 """RPC runner class. 802 803 """
804 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
805 """Initialized the RPC runner. 806 807 @type cfg: L{config.ConfigWriter} 808 @param cfg: Configuration 809 @type lock_monitor_cb: callable 810 @param lock_monitor_cb: Lock monitor callback 811 812 """ 813 self._cfg = cfg 814 815 encoders = _ENCODERS.copy() 816 817 encoders.update({ 818 # Encoders requiring configuration object 819 rpc_defs.ED_INST_DICT: self._InstDict, 820 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp, 821 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp, 822 rpc_defs.ED_NIC_DICT: self._NicDict, 823 824 # Encoders annotating disk parameters 825 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP, 826 rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP, 827 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP, 828 829 # Encoders with special requirements 830 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 831 }) 832 833 # Resolver using configuration 834 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo, 835 cfg.GetAllNodesInfo) 836 837 # Pylint doesn't recognize multiple inheritance properly, see 838 # <http://www.logilab.org/ticket/36586> and 839 # <http://www.logilab.org/ticket/35642> 840 # pylint: disable=W0233 841 _RpcClientBase.__init__(self, resolver, encoders.get, 842 lock_monitor_cb=lock_monitor_cb, 843 _req_process_fn=_req_process_fn) 844 _generated_rpc.RpcClientConfig.__init__(self) 845 _generated_rpc.RpcClientBootstrap.__init__(self) 846 _generated_rpc.RpcClientDnsOnly.__init__(self) 847 _generated_rpc.RpcClientDefault.__init__(self)
848
849 - def _NicDict(self, nic):
850 """Convert the given nic to a dict and encapsulate netinfo 851 852 """ 853 n = copy.deepcopy(nic) 854 if n.network: 855 net_uuid = self._cfg.LookupNetwork(n.network) 856 if net_uuid: 857 nobj = self._cfg.GetNetwork(net_uuid) 858 n.netinfo = objects.Network.ToDict(nobj) 859 return n.ToDict()
860
861 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
862 """Convert the given instance to a dict. 863 864 This is done via the instance's ToDict() method and additionally 865 we fill the hvparams with the cluster defaults. 866 867 @type instance: L{objects.Instance} 868 @param instance: an Instance object 869 @type hvp: dict or None 870 @param hvp: a dictionary with overridden hypervisor parameters 871 @type bep: dict or None 872 @param bep: a dictionary with overridden backend parameters 873 @type osp: dict or None 874 @param osp: a dictionary with overridden os parameters 875 @rtype: dict 876 @return: the instance dict, with the hvparams filled with the 877 cluster defaults 878 879 """ 880 idict = instance.ToDict() 881 cluster = self._cfg.GetClusterInfo() 882 idict["hvparams"] = cluster.FillHV(instance) 883 if hvp is not None: 884 idict["hvparams"].update(hvp) 885 idict["beparams"] = cluster.FillBE(instance) 886 if bep is not None: 887 idict["beparams"].update(bep) 888 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 889 if osp is not None: 890 idict["osparams"].update(osp) 891 idict["disks"] = self._DisksDictDP((instance.disks, instance)) 892 for nic in idict["nics"]: 893 nic["nicparams"] = objects.FillDict( 894 cluster.nicparams[constants.PP_DEFAULT], 895 nic["nicparams"]) 896 network = nic.get("network", None) 897 if network: 898 net_uuid = self._cfg.LookupNetwork(network) 899 if net_uuid: 900 nobj = self._cfg.GetNetwork(net_uuid) 901 nic["netinfo"] = objects.Network.ToDict(nobj) 902 return idict
903
904 - def _InstDictHvpBepDp(self, (instance, hvp, bep)):
905 """Wrapper for L{_InstDict}. 906 907 """ 908 return self._InstDict(instance, hvp=hvp, bep=bep)
909
910 - def _InstDictOspDp(self, (instance, osparams)):
911 """Wrapper for L{_InstDict}. 912 913 """ 914 return self._InstDict(instance, osp=osparams)
915
916 - def _DisksDictDP(self, (disks, instance)):
917 """Wrapper for L{AnnotateDiskParams}. 918 919 """ 920 diskparams = self._cfg.GetInstanceDiskParams(instance) 921 return [disk.ToDict() 922 for disk in AnnotateDiskParams(instance.disk_template, 923 disks, diskparams)]
924
925 - def _MultiDiskDictDP(self, disks_insts):
926 """Wrapper for L{AnnotateDiskParams}. 927 928 Supports a list of (disk, instance) tuples. 929 """ 930 return [disk for disk_inst in disks_insts 931 for disk in self._DisksDictDP(disk_inst)]
932
933 - def _SingleDiskDictDP(self, (disk, instance)):
934 """Wrapper for L{AnnotateDiskParams}. 935 936 """ 937 (anno_disk,) = self._DisksDictDP(([disk], instance)) 938 return anno_disk
939
940 941 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
942 """RPC wrappers for job queue. 943 944 """
945 - def __init__(self, context, address_list):
946 """Initializes this class. 947 948 """ 949 if address_list is None: 950 resolver = compat.partial(_SsconfResolver, True) 951 else: 952 # Caller provided an address list 953 resolver = _StaticResolver(address_list) 954 955 _RpcClientBase.__init__(self, resolver, _ENCODERS.get, 956 lock_monitor_cb=context.glm.AddToLockMonitor) 957 _generated_rpc.RpcClientJobQueue.__init__(self)
958
959 960 -class BootstrapRunner(_RpcClientBase, 961 _generated_rpc.RpcClientBootstrap, 962 _generated_rpc.RpcClientDnsOnly):
963 """RPC wrappers for bootstrapping. 964 965 """
966 - def __init__(self):
967 """Initializes this class. 968 969 """ 970 # Pylint doesn't recognize multiple inheritance properly, see 971 # <http://www.logilab.org/ticket/36586> and 972 # <http://www.logilab.org/ticket/35642> 973 # pylint: disable=W0233 974 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True), 975 _ENCODERS.get) 976 _generated_rpc.RpcClientBootstrap.__init__(self) 977 _generated_rpc.RpcClientDnsOnly.__init__(self)
978
979 980 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
981 """RPC wrappers for calls using only DNS. 982 983 """
984 - def __init__(self):
985 """Initialize this class. 986 987 """ 988 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False), 989 _ENCODERS.get) 990 _generated_rpc.RpcClientDnsOnly.__init__(self)
991
992 993 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
994 """RPC wrappers for L{config}. 995 996 """
997 - def __init__(self, context, address_list, _req_process_fn=None, 998 _getents=None):
999 """Initializes this class. 1000 1001 """ 1002 if context: 1003 lock_monitor_cb = context.glm.AddToLockMonitor 1004 else: 1005 lock_monitor_cb = None 1006 1007 if address_list is None: 1008 resolver = compat.partial(_SsconfResolver, True) 1009 else: 1010 # Caller provided an address list 1011 resolver = _StaticResolver(address_list) 1012 1013 encoders = _ENCODERS.copy() 1014 1015 encoders.update({ 1016 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 1017 }) 1018 1019 _RpcClientBase.__init__(self, resolver, encoders.get, 1020 lock_monitor_cb=lock_monitor_cb, 1021 _req_process_fn=_req_process_fn) 1022 _generated_rpc.RpcClientConfig.__init__(self)
1023