Package ganeti :: Package rpc :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Inter-node RPC library. 
  32   
  33  """ 
  34   
  35  # pylint: disable=C0103,R0201,R0904 
  36  # C0103: Invalid name, since call_ are not valid 
  37  # R0201: Method could be a function, we keep all rpcs instance methods 
  38  # as not to change them back and forth between static/instance methods 
  39  # if they need to start using instance attributes 
  40  # R0904: Too many public methods 
  41   
  42  import logging 
  43  import zlib 
  44  import base64 
  45  import pycurl 
  46  import threading 
  47  import copy 
  48  import os 
  49   
  50  from ganeti import utils 
  51  from ganeti import objects 
  52  from ganeti import http 
  53  from ganeti import serializer 
  54  from ganeti import constants 
  55  from ganeti import errors 
  56  from ganeti import netutils 
  57  from ganeti import ssconf 
  58  from ganeti import runtime 
  59  from ganeti import compat 
  60  from ganeti import rpc_defs 
  61  from ganeti import pathutils 
  62  from ganeti import vcluster 
  63   
  64  # Special module generated at build time 
  65  from ganeti import _generated_rpc 
  66   
  67  # pylint has a bug here, doesn't see this import 
  68  import ganeti.http.client  # pylint: disable=W0611 
  69   
  70   
  71  _RPC_CLIENT_HEADERS = [ 
  72    "Content-type: %s" % http.HTTP_APP_JSON, 
  73    "Expect:", 
  74    ] 
  75   
  76  #: Special value to describe an offline host 
  77  _OFFLINE = object() 
78 79 80 -def Init():
81 """Initializes the module-global HTTP client manager. 82 83 Must be called before using any RPC function and while exactly one thread is 84 running. 85 86 """ 87 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 88 # one thread running. This check is just a safety measure -- it doesn't 89 # cover all cases. 90 assert threading.activeCount() == 1, \ 91 "Found more than one active thread when initializing pycURL" 92 93 logging.info("Using PycURL %s", pycurl.version) 94 95 pycurl.global_init(pycurl.GLOBAL_ALL)
96
97 98 -def Shutdown():
99 """Stops the module-global HTTP client manager. 100 101 Must be called before quitting the program and while exactly one thread is 102 running. 103 104 """ 105 pycurl.global_cleanup()
106
107 108 -def _ConfigRpcCurl(curl):
109 noded_cert = pathutils.NODED_CERT_FILE 110 noded_client_cert = pathutils.NODED_CLIENT_CERT_FILE 111 112 # This fallback is required for backwards compatibility with 2.10. Ganeti 113 # 2.11 introduced per-node client certificates, but when we restart after 114 # an upgrade from 2.10, the client certs are not in place yet, and we need 115 # to fall back to using the cluster-wide server cert. 116 if not os.path.exists(noded_client_cert): 117 logging.warn("Using server certificate as client certificate for RPC" 118 "call.") 119 noded_client_cert = noded_cert 120 121 curl.setopt(pycurl.FOLLOWLOCATION, False) 122 curl.setopt(pycurl.CAINFO, noded_cert) 123 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 124 curl.setopt(pycurl.SSL_VERIFYPEER, True) 125 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 126 curl.setopt(pycurl.SSLCERT, noded_client_cert) 127 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 128 curl.setopt(pycurl.SSLKEY, noded_client_cert) 129 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
130
131 132 -def RunWithRPC(fn):
133 """RPC-wrapper decorator. 134 135 When applied to a function, it runs it with the RPC system 136 initialized, and it shutsdown the system afterwards. This means the 137 function must be called without RPC being initialized. 138 139 """ 140 def wrapper(*args, **kwargs): 141 Init() 142 try: 143 return fn(*args, **kwargs) 144 finally: 145 Shutdown()
146 return wrapper 147
148 149 -def _Compress(_, data):
150 """Compresses a string for transport over RPC. 151 152 Small amounts of data are not compressed. 153 154 @type data: str 155 @param data: Data 156 @rtype: tuple 157 @return: Encoded data to send 158 159 """ 160 # Small amounts of data are not compressed 161 if len(data) < 512: 162 return (constants.RPC_ENCODING_NONE, data) 163 164 # Compress with zlib and encode in base64 165 return (constants.RPC_ENCODING_ZLIB_BASE64, 166 base64.b64encode(zlib.compress(data, 3)))
167
168 169 -class RpcResult(object):
170 """RPC Result class. 171 172 This class holds an RPC result. It is needed since in multi-node 173 calls we can't raise an exception just because one out of many 174 failed, and therefore we use this class to encapsulate the result. 175 176 @ivar data: the data payload, for successful results, or None 177 @ivar call: the name of the RPC call 178 @ivar node: the name of the node to which we made the call 179 @ivar offline: whether the operation failed because the node was 180 offline, as opposed to actual failure; offline=True will always 181 imply failed=True, in order to allow simpler checking if 182 the user doesn't care about the exact failure mode 183 @ivar fail_msg: the error message if the call failed 184 185 """
186 - def __init__(self, data=None, failed=False, offline=False, 187 call=None, node=None):
188 self.offline = offline 189 self.call = call 190 self.node = node 191 192 if offline: 193 self.fail_msg = "Node is marked offline" 194 self.data = self.payload = None 195 elif failed: 196 self.fail_msg = self._EnsureErr(data) 197 self.data = self.payload = None 198 else: 199 self.data = data 200 if not isinstance(self.data, (tuple, list)): 201 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 202 type(self.data)) 203 self.payload = None 204 elif len(data) != 2: 205 self.fail_msg = ("RPC layer error: invalid result length (%d), " 206 "expected 2" % len(self.data)) 207 self.payload = None 208 elif not self.data[0]: 209 self.fail_msg = self._EnsureErr(self.data[1]) 210 self.payload = None 211 else: 212 # finally success 213 self.fail_msg = None 214 self.payload = data[1] 215 216 for attr_name in ["call", "data", "fail_msg", 217 "node", "offline", "payload"]: 218 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
219
220 - def __repr__(self):
221 return ("RpcResult(data=%s, call=%s, node=%s, offline=%s, fail_msg=%s)" % 222 (self.offline, self.call, self.node, self.offline, self.fail_msg))
223 224 @staticmethod
225 - def _EnsureErr(val):
226 """Helper to ensure we return a 'True' value for error.""" 227 if val: 228 return val 229 else: 230 return "No error information"
231
232 - def Raise(self, msg, prereq=False, ecode=None):
233 """If the result has failed, raise an OpExecError. 234 235 This is used so that LU code doesn't have to check for each 236 result, but instead can call this function. 237 238 """ 239 if not self.fail_msg: 240 return 241 242 if not msg: # one could pass None for default message 243 msg = ("Call '%s' to node '%s' has failed: %s" % 244 (self.call, self.node, self.fail_msg)) 245 else: 246 msg = "%s: %s" % (msg, self.fail_msg) 247 if prereq: 248 ec = errors.OpPrereqError 249 else: 250 ec = errors.OpExecError 251 if ecode is not None: 252 args = (msg, ecode) 253 else: 254 args = (msg, ) 255 raise ec(*args) # pylint: disable=W0142
256
257 - def Warn(self, msg, feedback_fn):
258 """If the result has failed, call the feedback_fn. 259 260 This is used to in cases were LU wants to warn the 261 user about a failure, but continue anyway. 262 263 """ 264 if not self.fail_msg: 265 return 266 267 msg = "%s: %s" % (msg, self.fail_msg) 268 feedback_fn(msg)
269
270 271 -def _SsconfResolver(ssconf_ips, node_list, _, 272 ssc=ssconf.SimpleStore, 273 nslookup_fn=netutils.Hostname.GetIP):
274 """Return addresses for given node names. 275 276 @type ssconf_ips: bool 277 @param ssconf_ips: Use the ssconf IPs 278 @type node_list: list 279 @param node_list: List of node names 280 @type ssc: class 281 @param ssc: SimpleStore class that is used to obtain node->ip mappings 282 @type nslookup_fn: callable 283 @param nslookup_fn: function use to do NS lookup 284 @rtype: list of tuple; (string, string) 285 @return: List of tuples containing node name and IP address 286 287 """ 288 ss = ssc() 289 family = ss.GetPrimaryIPFamily() 290 291 if ssconf_ips: 292 iplist = ss.GetNodePrimaryIPList() 293 ipmap = dict(entry.split() for entry in iplist) 294 else: 295 ipmap = {} 296 297 result = [] 298 for node in node_list: 299 ip = ipmap.get(node) 300 if ip is None: 301 ip = nslookup_fn(node, family=family) 302 result.append((node, ip, node)) 303 304 return result
305
306 307 -class _StaticResolver:
308 - def __init__(self, addresses):
309 """Initializes this class. 310 311 """ 312 self._addresses = addresses
313
314 - def __call__(self, hosts, _):
315 """Returns static addresses for hosts. 316 317 """ 318 assert len(hosts) == len(self._addresses) 319 return zip(hosts, self._addresses, hosts)
320
321 322 -def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
323 """Checks if a node is online. 324 325 @type node_uuid_or_name: string 326 @param node_uuid_or_name: Node UUID 327 @type node: L{objects.Node} or None 328 @param node: Node object 329 330 """ 331 if node is None: 332 # Assume that the passed parameter was actually a node name, so depend on 333 # DNS for name resolution 334 return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name) 335 else: 336 if node.offline and not accept_offline_node: 337 ip = _OFFLINE 338 else: 339 ip = node.primary_ip 340 return (node.name, ip, node_uuid_or_name)
341
342 343 -def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
344 """Calculate node addresses using configuration. 345 346 Note that strings in node_uuids are treated as node names if the UUID is not 347 found in the configuration. 348 349 """ 350 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE) 351 352 assert accept_offline_node or opts is None, "Unknown option" 353 354 # Special case for single-host lookups 355 if len(node_uuids) == 1: 356 (uuid, ) = node_uuids 357 return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)] 358 else: 359 all_nodes = all_nodes_fn() 360 return [_CheckConfigNode(uuid, all_nodes.get(uuid, None), 361 accept_offline_node) 362 for uuid in node_uuids]
363
364 365 -class _RpcProcessor:
366 - def __init__(self, resolver, port, lock_monitor_cb=None):
367 """Initializes this class. 368 369 @param resolver: callable accepting a list of node UUIDs or hostnames, 370 returning a list of tuples containing name, IP address and original name 371 of the resolved node. IP address can be the name or the special value 372 L{_OFFLINE} to mark offline machines. 373 @type port: int 374 @param port: TCP port 375 @param lock_monitor_cb: Callable for registering with lock monitor 376 377 """ 378 self._resolver = resolver 379 self._port = port 380 self._lock_monitor_cb = lock_monitor_cb
381 382 @staticmethod
383 - def _PrepareRequests(hosts, port, procedure, body, read_timeout):
384 """Prepares requests by sorting offline hosts into separate list. 385 386 @type body: dict 387 @param body: a dictionary with per-host body data 388 389 """ 390 results = {} 391 requests = {} 392 393 assert isinstance(body, dict) 394 assert len(body) == len(hosts) 395 assert compat.all(isinstance(v, str) for v in body.values()) 396 assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \ 397 "%s != %s" % (hosts, body.keys()) 398 399 for (name, ip, original_name) in hosts: 400 if ip is _OFFLINE: 401 # Node is marked as offline 402 results[original_name] = RpcResult(node=name, 403 offline=True, 404 call=procedure) 405 else: 406 requests[original_name] = \ 407 http.client.HttpClientRequest(str(ip), port, 408 http.HTTP_POST, str("/%s" % procedure), 409 headers=_RPC_CLIENT_HEADERS, 410 post_data=body[original_name], 411 read_timeout=read_timeout, 412 nicename="%s/%s" % (name, procedure), 413 curl_config_fn=_ConfigRpcCurl) 414 415 return (results, requests)
416 417 @staticmethod
418 - def _CombineResults(results, requests, procedure):
419 """Combines pre-computed results for offline hosts with actual call results. 420 421 """ 422 for name, req in requests.items(): 423 if req.success and req.resp_status_code == http.HTTP_OK: 424 host_result = RpcResult(data=serializer.LoadJson(req.resp_body), 425 node=name, call=procedure) 426 else: 427 # TODO: Better error reporting 428 if req.error: 429 msg = req.error 430 else: 431 msg = req.resp_body 432 433 logging.error("RPC error in %s on node %s: %s", procedure, name, msg) 434 host_result = RpcResult(data=msg, failed=True, node=name, 435 call=procedure) 436 437 results[name] = host_result 438 439 return results
440
441 - def __call__(self, nodes, procedure, body, read_timeout, resolver_opts, 442 _req_process_fn=None):
443 """Makes an RPC request to a number of nodes. 444 445 @type nodes: sequence 446 @param nodes: node UUIDs or Hostnames 447 @type procedure: string 448 @param procedure: Request path 449 @type body: dictionary 450 @param body: dictionary with request bodies per host 451 @type read_timeout: int or None 452 @param read_timeout: Read timeout for request 453 @rtype: dictionary 454 @return: a dictionary mapping host names to rpc.RpcResult objects 455 456 """ 457 assert read_timeout is not None, \ 458 "Missing RPC read timeout for procedure '%s'" % procedure 459 460 if _req_process_fn is None: 461 _req_process_fn = http.client.ProcessRequests 462 463 (results, requests) = \ 464 self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port, 465 procedure, body, read_timeout) 466 467 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) 468 469 assert not frozenset(results).intersection(requests) 470 471 return self._CombineResults(results, requests, procedure)
472
473 474 -class _RpcClientBase:
475 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None, 476 _req_process_fn=None):
477 """Initializes this class. 478 479 """ 480 proc = _RpcProcessor(resolver, 481 netutils.GetDaemonPort(constants.NODED), 482 lock_monitor_cb=lock_monitor_cb) 483 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn) 484 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
485 486 @staticmethod
487 - def _EncodeArg(encoder_fn, node, (argkind, value)):
488 """Encode argument. 489 490 """ 491 if argkind is None: 492 return value 493 else: 494 return encoder_fn(argkind)(node, value)
495
496 - def _Call(self, cdef, node_list, args):
497 """Entry point for automatically generated RPC wrappers. 498 499 """ 500 (procedure, _, resolver_opts, timeout, argdefs, 501 prep_fn, postproc_fn, _) = cdef 502 503 if callable(timeout): 504 read_timeout = timeout(args) 505 else: 506 read_timeout = timeout 507 508 if callable(resolver_opts): 509 req_resolver_opts = resolver_opts(args) 510 else: 511 req_resolver_opts = resolver_opts 512 513 if len(args) != len(argdefs): 514 raise errors.ProgrammerError("Number of passed arguments doesn't match") 515 516 if prep_fn is None: 517 prep_fn = lambda _, args: args 518 assert callable(prep_fn) 519 520 # encode the arguments for each node individually, pass them and the node 521 # name to the prep_fn, and serialise its return value 522 encode_args_fn = lambda node: map(compat.partial(self._encoder, node), 523 zip(map(compat.snd, argdefs), args)) 524 pnbody = dict( 525 (n, 526 serializer.DumpJson(prep_fn(n, encode_args_fn(n)), 527 private_encoder=serializer.EncodeWithPrivateFields)) 528 for n in node_list 529 ) 530 531 result = self._proc(node_list, procedure, pnbody, read_timeout, 532 req_resolver_opts) 533 534 if postproc_fn: 535 return dict(map(lambda (key, value): (key, postproc_fn(value)), 536 result.items())) 537 else: 538 return result
539
540 541 -def _ObjectToDict(_, value):
542 """Converts an object to a dictionary. 543 544 @note: See L{objects}. 545 546 """ 547 return value.ToDict()
548
549 550 -def _ObjectListToDict(node, value):
551 """Converts a list of L{objects} to dictionaries. 552 553 """ 554 return map(compat.partial(_ObjectToDict, node), value)
555
556 557 -def _PrepareFileUpload(getents_fn, node, filename):
558 """Loads a file and prepares it for an upload to nodes. 559 560 """ 561 statcb = utils.FileStatHelper() 562 data = _Compress(node, utils.ReadFile(filename, preread=statcb)) 563 st = statcb.st 564 565 if getents_fn is None: 566 getents_fn = runtime.GetEnts 567 568 getents = getents_fn() 569 570 virt_filename = vcluster.MakeVirtualPath(filename) 571 572 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid), 573 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
574
575 576 -def _PrepareFinalizeExportDisks(_, snap_disks):
577 """Encodes disks for finalizing export. 578 579 """ 580 flat_disks = [] 581 582 for disk in snap_disks: 583 if isinstance(disk, bool): 584 flat_disks.append(disk) 585 else: 586 flat_disks.append(disk.ToDict()) 587 588 return flat_disks
589
590 591 -def _EncodeBlockdevRename(_, value):
592 """Encodes information for renaming block devices. 593 594 """ 595 return [(d.ToDict(), uid) for d, uid in value]
596
597 598 -def _AddSpindlesToLegacyNodeInfo(result, space_info):
599 """Extracts the spindle information from the space info and adds 600 it to the result dictionary. 601 602 @type result: dict of strings 603 @param result: dictionary holding the result of the legacy node info 604 @type space_info: list of dicts of strings 605 @param space_info: list, each row holding space information of one storage 606 unit 607 @rtype: None 608 @return: does not return anything, manipulates the C{result} variable 609 610 """ 611 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType( 612 space_info, constants.ST_LVM_PV) 613 if lvm_pv_info: 614 result["spindles_free"] = lvm_pv_info["storage_free"] 615 result["spindles_total"] = lvm_pv_info["storage_size"] 616 else: 617 result["spindles_free"] = 0 618 result["spindles_total"] = 0
619
620 621 -def _AddStorageInfoToLegacyNodeInfoByTemplate( 622 result, space_info, disk_template):
623 """Extracts the storage space information of the disk template from 624 the space info and adds it to the result dictionary. 625 626 @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information. 627 628 """ 629 if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template): 630 disk_info = utils.storage.LookupSpaceInfoByDiskTemplate( 631 space_info, disk_template) 632 result["name"] = disk_info["name"] 633 result["storage_free"] = disk_info["storage_free"] 634 result["storage_size"] = disk_info["storage_size"] 635 else: 636 # FIXME: consider displaying '-' in this case 637 result["storage_free"] = 0 638 result["storage_size"] = 0
639
640 641 -def MakeLegacyNodeInfo(data, disk_template):
642 """Formats the data returned by call_node_info. 643 644 Converts the data into a single dictionary. This is fine for most use cases, 645 but some require information from more than one volume group or hypervisor. 646 647 """ 648 (bootid, space_info, (hv_info, )) = data 649 650 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid}) 651 652 _AddSpindlesToLegacyNodeInfo(ret, space_info) 653 _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template) 654 655 return ret
656
657 658 -def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
659 """Annotates just DRBD disks layouts. 660 661 """ 662 assert disk.dev_type == constants.DT_DRBD8 663 664 disk.params = objects.FillDict(drbd_params, disk.params) 665 (dev_data, dev_meta) = disk.children 666 dev_data.params = objects.FillDict(data_params, dev_data.params) 667 dev_meta.params = objects.FillDict(meta_params, dev_meta.params) 668 669 return disk
670
671 672 -def _AnnotateDParamsGeneric(disk, (params, )):
673 """Generic disk parameter annotation routine. 674 675 """ 676 assert disk.dev_type != constants.DT_DRBD8 677 678 disk.params = objects.FillDict(params, disk.params) 679 680 return disk
681
682 683 -def AnnotateDiskParams(disks, disk_params):
684 """Annotates the disk objects with the disk parameters. 685 686 @param disks: The list of disks objects to annotate 687 @param disk_params: The disk parameters for annotation 688 @returns: A list of disk objects annotated 689 690 """ 691 def AnnotateDisk(disk): 692 if disk.dev_type == constants.DT_DISKLESS: 693 return disk 694 695 ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params) 696 697 if disk.dev_type == constants.DT_DRBD8: 698 return _AnnotateDParamsDRBD(disk, ld_params) 699 else: 700 return _AnnotateDParamsGeneric(disk, ld_params)
701 702 return [AnnotateDisk(disk.Copy()) for disk in disks] 703
704 705 -def _GetExclusiveStorageFlag(cfg, node_uuid):
706 ni = cfg.GetNodeInfo(node_uuid) 707 if ni is None: 708 raise errors.OpPrereqError("Invalid node name %s" % node_uuid, 709 errors.ECODE_NOENT) 710 return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
711
712 713 -def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
714 """Adds the exclusive storage flag to lvm units. 715 716 This function creates a copy of the storage_units lists, with the 717 es_flag being added to all lvm storage units. 718 719 @type storage_units: list of pairs (string, string) 720 @param storage_units: list of 'raw' storage units, consisting only of 721 (storage_type, storage_key) 722 @type es_flag: boolean 723 @param es_flag: exclusive storage flag 724 @rtype: list of tuples (string, string, list) 725 @return: list of storage units (storage_type, storage_key, params) with 726 the params containing the es_flag for lvm-vg storage units 727 728 """ 729 result = [] 730 for (storage_type, storage_key) in storage_units: 731 if storage_type in [constants.ST_LVM_VG]: 732 result.append((storage_type, storage_key, [es_flag])) 733 if es_flag: 734 result.append((constants.ST_LVM_PV, storage_key, [es_flag])) 735 else: 736 result.append((storage_type, storage_key, [])) 737 return result
738
739 740 -def GetExclusiveStorageForNodes(cfg, node_uuids):
741 """Return the exclusive storage flag for all the given nodes. 742 743 @type cfg: L{config.ConfigWriter} 744 @param cfg: cluster configuration 745 @type node_uuids: list or tuple 746 @param node_uuids: node UUIDs for which to read the flag 747 @rtype: dict 748 @return: mapping from node uuids to exclusive storage flags 749 @raise errors.OpPrereqError: if any given node name has no corresponding 750 node 751 752 """ 753 getflag = lambda n: _GetExclusiveStorageFlag(cfg, n) 754 flags = map(getflag, node_uuids) 755 return dict(zip(node_uuids, flags))
756
757 758 -def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
759 """Return the lvm storage unit for all the given nodes. 760 761 Main purpose of this function is to map the exclusive storage flag, which 762 can be different for each node, to the default LVM storage unit. 763 764 @type cfg: L{config.ConfigWriter} 765 @param cfg: cluster configuration 766 @type storage_units: list of pairs (string, string) 767 @param storage_units: list of 'raw' storage units, e.g. pairs of 768 (storage_type, storage_key) 769 @type node_uuids: list or tuple 770 @param node_uuids: node UUIDs for which to read the flag 771 @rtype: dict 772 @return: mapping from node uuids to a list of storage units which include 773 the exclusive storage flag for lvm storage 774 @raise errors.OpPrereqError: if any given node name has no corresponding 775 node 776 777 """ 778 getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits( 779 storage_units, _GetExclusiveStorageFlag(cfg, n)) 780 flags = map(getunit, node_uuids) 781 return dict(zip(node_uuids, flags))
782 783 784 #: Generic encoders 785 _ENCODERS = { 786 rpc_defs.ED_OBJECT_DICT: _ObjectToDict, 787 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, 788 rpc_defs.ED_COMPRESS: _Compress, 789 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, 790 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, 791 }
792 793 794 -class RpcRunner(_RpcClientBase, 795 _generated_rpc.RpcClientDefault, 796 _generated_rpc.RpcClientBootstrap, 797 _generated_rpc.RpcClientDnsOnly, 798 _generated_rpc.RpcClientConfig):
799 """RPC runner class. 800 801 """
802 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
803 """Initialized the RPC runner. 804 805 @type cfg: L{config.ConfigWriter} 806 @param cfg: Configuration 807 @type lock_monitor_cb: callable 808 @param lock_monitor_cb: Lock monitor callback 809 810 """ 811 self._cfg = cfg 812 813 encoders = _ENCODERS.copy() 814 815 encoders.update({ 816 # Encoders requiring configuration object 817 rpc_defs.ED_INST_DICT: self._InstDict, 818 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp, 819 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp, 820 rpc_defs.ED_NIC_DICT: self._NicDict, 821 rpc_defs.ED_DEVICE_DICT: self._DeviceDict, 822 823 # Encoders annotating disk parameters 824 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP, 825 rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP, 826 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP, 827 rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP, 828 829 # Encoders with special requirements 830 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 831 832 rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO, 833 }) 834 835 # Resolver using configuration 836 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo, 837 cfg.GetAllNodesInfo) 838 839 # Pylint doesn't recognize multiple inheritance properly, see 840 # <http://www.logilab.org/ticket/36586> and 841 # <http://www.logilab.org/ticket/35642> 842 # pylint: disable=W0233 843 _RpcClientBase.__init__(self, resolver, encoders.get, 844 lock_monitor_cb=lock_monitor_cb, 845 _req_process_fn=_req_process_fn) 846 _generated_rpc.RpcClientConfig.__init__(self) 847 _generated_rpc.RpcClientBootstrap.__init__(self) 848 _generated_rpc.RpcClientDnsOnly.__init__(self) 849 _generated_rpc.RpcClientDefault.__init__(self)
850
851 - def _NicDict(self, _, nic):
852 """Convert the given nic to a dict and encapsulate netinfo 853 854 """ 855 n = copy.deepcopy(nic) 856 if n.network: 857 net_uuid = self._cfg.LookupNetwork(n.network) 858 if net_uuid: 859 nobj = self._cfg.GetNetwork(net_uuid) 860 n.netinfo = objects.Network.ToDict(nobj) 861 return n.ToDict()
862
863 - def _DeviceDict(self, _, (device, instance)):
864 if isinstance(device, objects.NIC): 865 return self._NicDict(None, device) 866 elif isinstance(device, objects.Disk): 867 return self._SingleDiskDictDP(None, (device, instance))
868
869 - def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
870 """Convert the given instance to a dict. 871 872 This is done via the instance's ToDict() method and additionally 873 we fill the hvparams with the cluster defaults. 874 875 @type instance: L{objects.Instance} 876 @param instance: an Instance object 877 @type hvp: dict or None 878 @param hvp: a dictionary with overridden hypervisor parameters 879 @type bep: dict or None 880 @param bep: a dictionary with overridden backend parameters 881 @type osp: dict or None 882 @param osp: a dictionary with overridden os parameters 883 @rtype: dict 884 @return: the instance dict, with the hvparams filled with the 885 cluster defaults 886 887 """ 888 idict = instance.ToDict() 889 cluster = self._cfg.GetClusterInfo() 890 idict["hvparams"] = cluster.FillHV(instance) 891 idict["secondary_nodes"] = \ 892 self._cfg.GetInstanceSecondaryNodes(instance.uuid) 893 if hvp is not None: 894 idict["hvparams"].update(hvp) 895 idict["beparams"] = cluster.FillBE(instance) 896 if bep is not None: 897 idict["beparams"].update(bep) 898 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 899 if osp is not None: 900 idict["osparams"].update(osp) 901 disks = self._cfg.GetInstanceDisks(instance.uuid) 902 idict["disks_info"] = self._DisksDictDP(node, (disks, instance)) 903 for nic in idict["nics"]: 904 nic["nicparams"] = objects.FillDict( 905 cluster.nicparams[constants.PP_DEFAULT], 906 nic["nicparams"]) 907 network = nic.get("network", None) 908 if network: 909 net_uuid = self._cfg.LookupNetwork(network) 910 if net_uuid: 911 nobj = self._cfg.GetNetwork(net_uuid) 912 nic["netinfo"] = objects.Network.ToDict(nobj) 913 return idict
914
915 - def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
916 """Wrapper for L{_InstDict}. 917 918 """ 919 return self._InstDict(node, instance, hvp=hvp, bep=bep)
920
921 - def _InstDictOspDp(self, node, (instance, osparams)):
922 """Wrapper for L{_InstDict}. 923 924 """ 925 return self._InstDict(node, instance, osp=osparams)
926
927 - def _DisksDictDP(self, node, (disks, instance)):
928 """Wrapper for L{AnnotateDiskParams}. 929 930 """ 931 diskparams = self._cfg.GetInstanceDiskParams(instance) 932 ret = [] 933 for disk in AnnotateDiskParams(disks, diskparams): 934 disk_node_uuids = disk.GetNodes(instance.primary_node) 935 node_ips = dict((uuid, node.secondary_ip) for (uuid, node) 936 in self._cfg.GetMultiNodeInfo(disk_node_uuids)) 937 938 disk.UpdateDynamicDiskParams(node, node_ips) 939 940 ret.append(disk.ToDict(include_dynamic_params=True)) 941 942 return ret
943
944 - def _MultiDiskDictDP(self, node, disks_insts):
945 """Wrapper for L{AnnotateDiskParams}. 946 947 Supports a list of (disk, instance) tuples. 948 """ 949 return [disk for disk_inst in disks_insts 950 for disk in self._DisksDictDP(node, disk_inst)]
951
952 - def _SingleDiskDictDP(self, node, (disk, instance)):
953 """Wrapper for L{AnnotateDiskParams}. 954 955 """ 956 (anno_disk,) = self._DisksDictDP(node, ([disk], instance)) 957 return anno_disk
958
959 - def _EncodeNodeToDiskDictDP(self, node, value):
960 """Encode dict of node name -> list of (disk, instance) tuples as values. 961 962 """ 963 return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks]) 964 for name, disks in value.items())
965
966 - def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
967 """Encodes import/export I/O information. 968 969 """ 970 if ieio == constants.IEIO_RAW_DISK: 971 assert len(ieioargs) == 2 972 return (ieio, (self._SingleDiskDictDP(node, ieioargs), )) 973 974 if ieio == constants.IEIO_SCRIPT: 975 assert len(ieioargs) == 2 976 return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1])) 977 978 return (ieio, ieioargs)
979
980 981 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
982 """RPC wrappers for job queue. 983 984 """
985 - def __init__(self, _context, address_list):
986 """Initializes this class. 987 988 """ 989 if address_list is None: 990 resolver = compat.partial(_SsconfResolver, True) 991 else: 992 # Caller provided an address list 993 resolver = _StaticResolver(address_list) 994 995 _RpcClientBase.__init__(self, resolver, _ENCODERS.get, 996 lock_monitor_cb=lambda _: None) 997 _generated_rpc.RpcClientJobQueue.__init__(self)
998
999 1000 -class BootstrapRunner(_RpcClientBase, 1001 _generated_rpc.RpcClientBootstrap, 1002 _generated_rpc.RpcClientDnsOnly):
1003 """RPC wrappers for bootstrapping. 1004 1005 """
1006 - def __init__(self):
1007 """Initializes this class. 1008 1009 """ 1010 # Pylint doesn't recognize multiple inheritance properly, see 1011 # <http://www.logilab.org/ticket/36586> and 1012 # <http://www.logilab.org/ticket/35642> 1013 # pylint: disable=W0233 1014 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True), 1015 _ENCODERS.get) 1016 _generated_rpc.RpcClientBootstrap.__init__(self) 1017 _generated_rpc.RpcClientDnsOnly.__init__(self)
1018
1019 1020 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
1021 """RPC wrappers for calls using only DNS. 1022 1023 """
1024 - def __init__(self):
1025 """Initialize this class. 1026 1027 """ 1028 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False), 1029 _ENCODERS.get) 1030 _generated_rpc.RpcClientDnsOnly.__init__(self)
1031
1032 1033 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1034 """RPC wrappers for L{config}. 1035 1036 """
1037 - def __init__(self, _context, address_list, _req_process_fn=None, 1038 _getents=None):
1039 """Initializes this class. 1040 1041 """ 1042 lock_monitor_cb = None 1043 1044 if address_list is None: 1045 resolver = compat.partial(_SsconfResolver, True) 1046 else: 1047 # Caller provided an address list 1048 resolver = _StaticResolver(address_list) 1049 1050 encoders = _ENCODERS.copy() 1051 1052 encoders.update({ 1053 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 1054 }) 1055 1056 _RpcClientBase.__init__(self, resolver, encoders.get, 1057 lock_monitor_cb=lock_monitor_cb, 1058 _req_process_fn=_req_process_fn) 1059 _generated_rpc.RpcClientConfig.__init__(self)
1060