Package ganeti :: Package rpc :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Inter-node RPC library. 
  32   
  33  """ 
  34   
  35  # pylint: disable=C0103,R0201,R0904 
  36  # C0103: Invalid name, since call_ are not valid 
  37  # R0201: Method could be a function, we keep all rpcs instance methods 
  38  # as not to change them back and forth between static/instance methods 
  39  # if they need to start using instance attributes 
  40  # R0904: Too many public methods 
  41   
  42  import logging 
  43  import zlib 
  44  import base64 
  45  import pycurl 
  46  import threading 
  47  import copy 
  48  import os 
  49   
  50  from ganeti import utils 
  51  from ganeti import objects 
  52  from ganeti import http 
  53  from ganeti import serializer 
  54  from ganeti import constants 
  55  from ganeti import errors 
  56  from ganeti import netutils 
  57  from ganeti import ssconf 
  58  from ganeti import runtime 
  59  from ganeti import compat 
  60  from ganeti import rpc_defs 
  61  from ganeti import pathutils 
  62  from ganeti import vcluster 
  63   
  64  # Special module generated at build time 
  65  from ganeti import _generated_rpc 
  66   
  67  # pylint has a bug here, doesn't see this import 
  68  import ganeti.http.client  # pylint: disable=W0611 
  69   
  70   
  71  _RPC_CLIENT_HEADERS = [ 
  72    "Content-type: %s" % http.HTTP_APP_JSON, 
  73    "Expect:", 
  74    ] 
  75   
  76  #: Special value to describe an offline host 
  77  _OFFLINE = object() 
78 79 80 -def Init():
81 """Initializes the module-global HTTP client manager. 82 83 Must be called before using any RPC function and while exactly one thread is 84 running. 85 86 """ 87 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 88 # one thread running. This check is just a safety measure -- it doesn't 89 # cover all cases. 90 assert threading.activeCount() == 1, \ 91 "Found more than one active thread when initializing pycURL" 92 93 logging.info("Using PycURL %s", pycurl.version) 94 95 pycurl.global_init(pycurl.GLOBAL_ALL)
96
97 98 -def Shutdown():
99 """Stops the module-global HTTP client manager. 100 101 Must be called before quitting the program and while exactly one thread is 102 running. 103 104 """ 105 pycurl.global_cleanup()
106
107 108 -def _ConfigRpcCurl(curl):
109 noded_cert = pathutils.NODED_CERT_FILE 110 noded_client_cert = pathutils.NODED_CLIENT_CERT_FILE 111 112 # FIXME: The next two lines are necessary to ensure upgradability from 113 # 2.10 to 2.11. Remove in 2.12, because this slows down RPC calls. 114 if not os.path.exists(noded_client_cert): 115 logging.warn("Using server certificate as client certificate for RPC" 116 "call.") 117 noded_client_cert = noded_cert 118 119 curl.setopt(pycurl.FOLLOWLOCATION, False) 120 curl.setopt(pycurl.CAINFO, noded_cert) 121 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 122 curl.setopt(pycurl.SSL_VERIFYPEER, True) 123 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 124 curl.setopt(pycurl.SSLCERT, noded_client_cert) 125 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 126 curl.setopt(pycurl.SSLKEY, noded_client_cert) 127 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
128
129 130 -def RunWithRPC(fn):
131 """RPC-wrapper decorator. 132 133 When applied to a function, it runs it with the RPC system 134 initialized, and it shutsdown the system afterwards. This means the 135 function must be called without RPC being initialized. 136 137 """ 138 def wrapper(*args, **kwargs): 139 Init() 140 try: 141 return fn(*args, **kwargs) 142 finally: 143 Shutdown()
144 return wrapper 145
146 147 -def _Compress(_, data):
148 """Compresses a string for transport over RPC. 149 150 Small amounts of data are not compressed. 151 152 @type data: str 153 @param data: Data 154 @rtype: tuple 155 @return: Encoded data to send 156 157 """ 158 # Small amounts of data are not compressed 159 if len(data) < 512: 160 return (constants.RPC_ENCODING_NONE, data) 161 162 # Compress with zlib and encode in base64 163 return (constants.RPC_ENCODING_ZLIB_BASE64, 164 base64.b64encode(zlib.compress(data, 3)))
165
166 167 -class RpcResult(object):
168 """RPC Result class. 169 170 This class holds an RPC result. It is needed since in multi-node 171 calls we can't raise an exception just because one out of many 172 failed, and therefore we use this class to encapsulate the result. 173 174 @ivar data: the data payload, for successful results, or None 175 @ivar call: the name of the RPC call 176 @ivar node: the name of the node to which we made the call 177 @ivar offline: whether the operation failed because the node was 178 offline, as opposed to actual failure; offline=True will always 179 imply failed=True, in order to allow simpler checking if 180 the user doesn't care about the exact failure mode 181 @ivar fail_msg: the error message if the call failed 182 183 """
184 - def __init__(self, data=None, failed=False, offline=False, 185 call=None, node=None):
186 self.offline = offline 187 self.call = call 188 self.node = node 189 190 if offline: 191 self.fail_msg = "Node is marked offline" 192 self.data = self.payload = None 193 elif failed: 194 self.fail_msg = self._EnsureErr(data) 195 self.data = self.payload = None 196 else: 197 self.data = data 198 if not isinstance(self.data, (tuple, list)): 199 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 200 type(self.data)) 201 self.payload = None 202 elif len(data) != 2: 203 self.fail_msg = ("RPC layer error: invalid result length (%d), " 204 "expected 2" % len(self.data)) 205 self.payload = None 206 elif not self.data[0]: 207 self.fail_msg = self._EnsureErr(self.data[1]) 208 self.payload = None 209 else: 210 # finally success 211 self.fail_msg = None 212 self.payload = data[1] 213 214 for attr_name in ["call", "data", "fail_msg", 215 "node", "offline", "payload"]: 216 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
217
218 - def __repr__(self):
219 return ("RpcResult(data=%s, call=%s, node=%s, offline=%s, fail_msg=%s)" % 220 (self.offline, self.call, self.node, self.offline, self.fail_msg))
221 222 @staticmethod
223 - def _EnsureErr(val):
224 """Helper to ensure we return a 'True' value for error.""" 225 if val: 226 return val 227 else: 228 return "No error information"
229
230 - def Raise(self, msg, prereq=False, ecode=None):
231 """If the result has failed, raise an OpExecError. 232 233 This is used so that LU code doesn't have to check for each 234 result, but instead can call this function. 235 236 """ 237 if not self.fail_msg: 238 return 239 240 if not msg: # one could pass None for default message 241 msg = ("Call '%s' to node '%s' has failed: %s" % 242 (self.call, self.node, self.fail_msg)) 243 else: 244 msg = "%s: %s" % (msg, self.fail_msg) 245 if prereq: 246 ec = errors.OpPrereqError 247 else: 248 ec = errors.OpExecError 249 if ecode is not None: 250 args = (msg, ecode) 251 else: 252 args = (msg, ) 253 raise ec(*args) # pylint: disable=W0142
254
255 - def Warn(self, msg, feedback_fn):
256 """If the result has failed, call the feedback_fn. 257 258 This is used to in cases were LU wants to warn the 259 user about a failure, but continue anyway. 260 261 """ 262 if not self.fail_msg: 263 return 264 265 msg = "%s: %s" % (msg, self.fail_msg) 266 feedback_fn(msg)
267
268 269 -def _SsconfResolver(ssconf_ips, node_list, _, 270 ssc=ssconf.SimpleStore, 271 nslookup_fn=netutils.Hostname.GetIP):
272 """Return addresses for given node names. 273 274 @type ssconf_ips: bool 275 @param ssconf_ips: Use the ssconf IPs 276 @type node_list: list 277 @param node_list: List of node names 278 @type ssc: class 279 @param ssc: SimpleStore class that is used to obtain node->ip mappings 280 @type nslookup_fn: callable 281 @param nslookup_fn: function use to do NS lookup 282 @rtype: list of tuple; (string, string) 283 @return: List of tuples containing node name and IP address 284 285 """ 286 ss = ssc() 287 family = ss.GetPrimaryIPFamily() 288 289 if ssconf_ips: 290 iplist = ss.GetNodePrimaryIPList() 291 ipmap = dict(entry.split() for entry in iplist) 292 else: 293 ipmap = {} 294 295 result = [] 296 for node in node_list: 297 ip = ipmap.get(node) 298 if ip is None: 299 ip = nslookup_fn(node, family=family) 300 result.append((node, ip, node)) 301 302 return result
303
304 305 -class _StaticResolver:
306 - def __init__(self, addresses):
307 """Initializes this class. 308 309 """ 310 self._addresses = addresses
311
312 - def __call__(self, hosts, _):
313 """Returns static addresses for hosts. 314 315 """ 316 assert len(hosts) == len(self._addresses) 317 return zip(hosts, self._addresses, hosts)
318
319 320 -def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
321 """Checks if a node is online. 322 323 @type node_uuid_or_name: string 324 @param node_uuid_or_name: Node UUID 325 @type node: L{objects.Node} or None 326 @param node: Node object 327 328 """ 329 if node is None: 330 # Assume that the passed parameter was actually a node name, so depend on 331 # DNS for name resolution 332 return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name) 333 else: 334 if node.offline and not accept_offline_node: 335 ip = _OFFLINE 336 else: 337 ip = node.primary_ip 338 return (node.name, ip, node_uuid_or_name)
339
340 341 -def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
342 """Calculate node addresses using configuration. 343 344 Note that strings in node_uuids are treated as node names if the UUID is not 345 found in the configuration. 346 347 """ 348 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE) 349 350 assert accept_offline_node or opts is None, "Unknown option" 351 352 # Special case for single-host lookups 353 if len(node_uuids) == 1: 354 (uuid, ) = node_uuids 355 return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)] 356 else: 357 all_nodes = all_nodes_fn() 358 return [_CheckConfigNode(uuid, all_nodes.get(uuid, None), 359 accept_offline_node) 360 for uuid in node_uuids]
361
362 363 -class _RpcProcessor:
364 - def __init__(self, resolver, port, lock_monitor_cb=None):
365 """Initializes this class. 366 367 @param resolver: callable accepting a list of node UUIDs or hostnames, 368 returning a list of tuples containing name, IP address and original name 369 of the resolved node. IP address can be the name or the special value 370 L{_OFFLINE} to mark offline machines. 371 @type port: int 372 @param port: TCP port 373 @param lock_monitor_cb: Callable for registering with lock monitor 374 375 """ 376 self._resolver = resolver 377 self._port = port 378 self._lock_monitor_cb = lock_monitor_cb
379 380 @staticmethod
381 - def _PrepareRequests(hosts, port, procedure, body, read_timeout):
382 """Prepares requests by sorting offline hosts into separate list. 383 384 @type body: dict 385 @param body: a dictionary with per-host body data 386 387 """ 388 results = {} 389 requests = {} 390 391 assert isinstance(body, dict) 392 assert len(body) == len(hosts) 393 assert compat.all(isinstance(v, str) for v in body.values()) 394 assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \ 395 "%s != %s" % (hosts, body.keys()) 396 397 for (name, ip, original_name) in hosts: 398 if ip is _OFFLINE: 399 # Node is marked as offline 400 results[original_name] = RpcResult(node=name, 401 offline=True, 402 call=procedure) 403 else: 404 requests[original_name] = \ 405 http.client.HttpClientRequest(str(ip), port, 406 http.HTTP_POST, str("/%s" % procedure), 407 headers=_RPC_CLIENT_HEADERS, 408 post_data=body[original_name], 409 read_timeout=read_timeout, 410 nicename="%s/%s" % (name, procedure), 411 curl_config_fn=_ConfigRpcCurl) 412 413 return (results, requests)
414 415 @staticmethod
416 - def _CombineResults(results, requests, procedure):
417 """Combines pre-computed results for offline hosts with actual call results. 418 419 """ 420 for name, req in requests.items(): 421 if req.success and req.resp_status_code == http.HTTP_OK: 422 host_result = RpcResult(data=serializer.LoadJson(req.resp_body), 423 node=name, call=procedure) 424 else: 425 # TODO: Better error reporting 426 if req.error: 427 msg = req.error 428 else: 429 msg = req.resp_body 430 431 logging.error("RPC error in %s on node %s: %s", procedure, name, msg) 432 host_result = RpcResult(data=msg, failed=True, node=name, 433 call=procedure) 434 435 results[name] = host_result 436 437 return results
438
439 - def __call__(self, nodes, procedure, body, read_timeout, resolver_opts, 440 _req_process_fn=None):
441 """Makes an RPC request to a number of nodes. 442 443 @type nodes: sequence 444 @param nodes: node UUIDs or Hostnames 445 @type procedure: string 446 @param procedure: Request path 447 @type body: dictionary 448 @param body: dictionary with request bodies per host 449 @type read_timeout: int or None 450 @param read_timeout: Read timeout for request 451 @rtype: dictionary 452 @return: a dictionary mapping host names to rpc.RpcResult objects 453 454 """ 455 assert read_timeout is not None, \ 456 "Missing RPC read timeout for procedure '%s'" % procedure 457 458 if _req_process_fn is None: 459 _req_process_fn = http.client.ProcessRequests 460 461 (results, requests) = \ 462 self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port, 463 procedure, body, read_timeout) 464 465 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) 466 467 assert not frozenset(results).intersection(requests) 468 469 return self._CombineResults(results, requests, procedure)
470
471 472 -class _RpcClientBase:
473 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None, 474 _req_process_fn=None):
475 """Initializes this class. 476 477 """ 478 proc = _RpcProcessor(resolver, 479 netutils.GetDaemonPort(constants.NODED), 480 lock_monitor_cb=lock_monitor_cb) 481 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn) 482 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
483 484 @staticmethod
485 - def _EncodeArg(encoder_fn, node, (argkind, value)):
486 """Encode argument. 487 488 """ 489 if argkind is None: 490 return value 491 else: 492 return encoder_fn(argkind)(node, value)
493
494 - def _Call(self, cdef, node_list, args):
495 """Entry point for automatically generated RPC wrappers. 496 497 """ 498 (procedure, _, resolver_opts, timeout, argdefs, 499 prep_fn, postproc_fn, _) = cdef 500 501 if callable(timeout): 502 read_timeout = timeout(args) 503 else: 504 read_timeout = timeout 505 506 if callable(resolver_opts): 507 req_resolver_opts = resolver_opts(args) 508 else: 509 req_resolver_opts = resolver_opts 510 511 if len(args) != len(argdefs): 512 raise errors.ProgrammerError("Number of passed arguments doesn't match") 513 514 if prep_fn is None: 515 prep_fn = lambda _, args: args 516 assert callable(prep_fn) 517 518 # encode the arguments for each node individually, pass them and the node 519 # name to the prep_fn, and serialise its return value 520 encode_args_fn = lambda node: map(compat.partial(self._encoder, node), 521 zip(map(compat.snd, argdefs), args)) 522 pnbody = dict( 523 (n, 524 serializer.DumpJson(prep_fn(n, encode_args_fn(n)), 525 private_encoder=serializer.EncodeWithPrivateFields)) 526 for n in node_list 527 ) 528 529 result = self._proc(node_list, procedure, pnbody, read_timeout, 530 req_resolver_opts) 531 532 if postproc_fn: 533 return dict(map(lambda (key, value): (key, postproc_fn(value)), 534 result.items())) 535 else: 536 return result
537
538 539 -def _ObjectToDict(_, value):
540 """Converts an object to a dictionary. 541 542 @note: See L{objects}. 543 544 """ 545 return value.ToDict()
546
547 548 -def _ObjectListToDict(node, value):
549 """Converts a list of L{objects} to dictionaries. 550 551 """ 552 return map(compat.partial(_ObjectToDict, node), value)
553
554 555 -def _PrepareFileUpload(getents_fn, node, filename):
556 """Loads a file and prepares it for an upload to nodes. 557 558 """ 559 statcb = utils.FileStatHelper() 560 data = _Compress(node, utils.ReadFile(filename, preread=statcb)) 561 st = statcb.st 562 563 if getents_fn is None: 564 getents_fn = runtime.GetEnts 565 566 getents = getents_fn() 567 568 virt_filename = vcluster.MakeVirtualPath(filename) 569 570 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid), 571 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
572
573 574 -def _PrepareFinalizeExportDisks(_, snap_disks):
575 """Encodes disks for finalizing export. 576 577 """ 578 flat_disks = [] 579 580 for disk in snap_disks: 581 if isinstance(disk, bool): 582 flat_disks.append(disk) 583 else: 584 flat_disks.append(disk.ToDict()) 585 586 return flat_disks
587
588 589 -def _EncodeBlockdevRename(_, value):
590 """Encodes information for renaming block devices. 591 592 """ 593 return [(d.ToDict(), uid) for d, uid in value]
594
595 596 -def _AddSpindlesToLegacyNodeInfo(result, space_info):
597 """Extracts the spindle information from the space info and adds 598 it to the result dictionary. 599 600 @type result: dict of strings 601 @param result: dictionary holding the result of the legacy node info 602 @type space_info: list of dicts of strings 603 @param space_info: list, each row holding space information of one storage 604 unit 605 @rtype: None 606 @return: does not return anything, manipulates the C{result} variable 607 608 """ 609 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType( 610 space_info, constants.ST_LVM_PV) 611 if lvm_pv_info: 612 result["spindles_free"] = lvm_pv_info["storage_free"] 613 result["spindles_total"] = lvm_pv_info["storage_size"] 614 else: 615 result["spindles_free"] = 0 616 result["spindles_total"] = 0
617
618 619 -def _AddStorageInfoToLegacyNodeInfoByTemplate( 620 result, space_info, disk_template):
621 """Extracts the storage space information of the disk template from 622 the space info and adds it to the result dictionary. 623 624 @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information. 625 626 """ 627 if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template): 628 disk_info = utils.storage.LookupSpaceInfoByDiskTemplate( 629 space_info, disk_template) 630 result["name"] = disk_info["name"] 631 result["storage_free"] = disk_info["storage_free"] 632 result["storage_size"] = disk_info["storage_size"] 633 else: 634 # FIXME: consider displaying '-' in this case 635 result["storage_free"] = 0 636 result["storage_size"] = 0
637
638 639 -def MakeLegacyNodeInfo(data, disk_template):
640 """Formats the data returned by call_node_info. 641 642 Converts the data into a single dictionary. This is fine for most use cases, 643 but some require information from more than one volume group or hypervisor. 644 645 """ 646 (bootid, space_info, (hv_info, )) = data 647 648 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid}) 649 650 _AddSpindlesToLegacyNodeInfo(ret, space_info) 651 _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template) 652 653 return ret
654
655 656 -def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
657 """Annotates just DRBD disks layouts. 658 659 """ 660 assert disk.dev_type == constants.DT_DRBD8 661 662 disk.params = objects.FillDict(drbd_params, disk.params) 663 (dev_data, dev_meta) = disk.children 664 dev_data.params = objects.FillDict(data_params, dev_data.params) 665 dev_meta.params = objects.FillDict(meta_params, dev_meta.params) 666 667 return disk
668
669 670 -def _AnnotateDParamsGeneric(disk, (params, )):
671 """Generic disk parameter annotation routine. 672 673 """ 674 assert disk.dev_type != constants.DT_DRBD8 675 676 disk.params = objects.FillDict(params, disk.params) 677 678 return disk
679
680 681 -def AnnotateDiskParams(disks, disk_params):
682 """Annotates the disk objects with the disk parameters. 683 684 @param disks: The list of disks objects to annotate 685 @param disk_params: The disk parameters for annotation 686 @returns: A list of disk objects annotated 687 688 """ 689 def AnnotateDisk(disk): 690 if disk.dev_type == constants.DT_DISKLESS: 691 return disk 692 693 ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params) 694 695 if disk.dev_type == constants.DT_DRBD8: 696 return _AnnotateDParamsDRBD(disk, ld_params) 697 else: 698 return _AnnotateDParamsGeneric(disk, ld_params)
699 700 return [AnnotateDisk(disk.Copy()) for disk in disks] 701
702 703 -def _GetExclusiveStorageFlag(cfg, node_uuid):
704 ni = cfg.GetNodeInfo(node_uuid) 705 if ni is None: 706 raise errors.OpPrereqError("Invalid node name %s" % node_uuid, 707 errors.ECODE_NOENT) 708 return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
709
710 711 -def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
712 """Adds the exclusive storage flag to lvm units. 713 714 This function creates a copy of the storage_units lists, with the 715 es_flag being added to all lvm storage units. 716 717 @type storage_units: list of pairs (string, string) 718 @param storage_units: list of 'raw' storage units, consisting only of 719 (storage_type, storage_key) 720 @type es_flag: boolean 721 @param es_flag: exclusive storage flag 722 @rtype: list of tuples (string, string, list) 723 @return: list of storage units (storage_type, storage_key, params) with 724 the params containing the es_flag for lvm-vg storage units 725 726 """ 727 result = [] 728 for (storage_type, storage_key) in storage_units: 729 if storage_type in [constants.ST_LVM_VG]: 730 result.append((storage_type, storage_key, [es_flag])) 731 if es_flag: 732 result.append((constants.ST_LVM_PV, storage_key, [es_flag])) 733 else: 734 result.append((storage_type, storage_key, [])) 735 return result
736
737 738 -def GetExclusiveStorageForNodes(cfg, node_uuids):
739 """Return the exclusive storage flag for all the given nodes. 740 741 @type cfg: L{config.ConfigWriter} 742 @param cfg: cluster configuration 743 @type node_uuids: list or tuple 744 @param node_uuids: node UUIDs for which to read the flag 745 @rtype: dict 746 @return: mapping from node uuids to exclusive storage flags 747 @raise errors.OpPrereqError: if any given node name has no corresponding 748 node 749 750 """ 751 getflag = lambda n: _GetExclusiveStorageFlag(cfg, n) 752 flags = map(getflag, node_uuids) 753 return dict(zip(node_uuids, flags))
754
755 756 -def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
757 """Return the lvm storage unit for all the given nodes. 758 759 Main purpose of this function is to map the exclusive storage flag, which 760 can be different for each node, to the default LVM storage unit. 761 762 @type cfg: L{config.ConfigWriter} 763 @param cfg: cluster configuration 764 @type storage_units: list of pairs (string, string) 765 @param storage_units: list of 'raw' storage units, e.g. pairs of 766 (storage_type, storage_key) 767 @type node_uuids: list or tuple 768 @param node_uuids: node UUIDs for which to read the flag 769 @rtype: dict 770 @return: mapping from node uuids to a list of storage units which include 771 the exclusive storage flag for lvm storage 772 @raise errors.OpPrereqError: if any given node name has no corresponding 773 node 774 775 """ 776 getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits( 777 storage_units, _GetExclusiveStorageFlag(cfg, n)) 778 flags = map(getunit, node_uuids) 779 return dict(zip(node_uuids, flags))
780 781 782 #: Generic encoders 783 _ENCODERS = { 784 rpc_defs.ED_OBJECT_DICT: _ObjectToDict, 785 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, 786 rpc_defs.ED_COMPRESS: _Compress, 787 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, 788 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, 789 }
790 791 792 -class RpcRunner(_RpcClientBase, 793 _generated_rpc.RpcClientDefault, 794 _generated_rpc.RpcClientBootstrap, 795 _generated_rpc.RpcClientDnsOnly, 796 _generated_rpc.RpcClientConfig):
797 """RPC runner class. 798 799 """
800 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
801 """Initialized the RPC runner. 802 803 @type cfg: L{config.ConfigWriter} 804 @param cfg: Configuration 805 @type lock_monitor_cb: callable 806 @param lock_monitor_cb: Lock monitor callback 807 808 """ 809 self._cfg = cfg 810 811 encoders = _ENCODERS.copy() 812 813 encoders.update({ 814 # Encoders requiring configuration object 815 rpc_defs.ED_INST_DICT: self._InstDict, 816 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp, 817 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp, 818 rpc_defs.ED_NIC_DICT: self._NicDict, 819 rpc_defs.ED_DEVICE_DICT: self._DeviceDict, 820 821 # Encoders annotating disk parameters 822 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP, 823 rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP, 824 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP, 825 rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP, 826 827 # Encoders with special requirements 828 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 829 830 rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO, 831 }) 832 833 # Resolver using configuration 834 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo, 835 cfg.GetAllNodesInfo) 836 837 # Pylint doesn't recognize multiple inheritance properly, see 838 # <http://www.logilab.org/ticket/36586> and 839 # <http://www.logilab.org/ticket/35642> 840 # pylint: disable=W0233 841 _RpcClientBase.__init__(self, resolver, encoders.get, 842 lock_monitor_cb=lock_monitor_cb, 843 _req_process_fn=_req_process_fn) 844 _generated_rpc.RpcClientConfig.__init__(self) 845 _generated_rpc.RpcClientBootstrap.__init__(self) 846 _generated_rpc.RpcClientDnsOnly.__init__(self) 847 _generated_rpc.RpcClientDefault.__init__(self)
848
849 - def _NicDict(self, _, nic):
850 """Convert the given nic to a dict and encapsulate netinfo 851 852 """ 853 n = copy.deepcopy(nic) 854 if n.network: 855 net_uuid = self._cfg.LookupNetwork(n.network) 856 if net_uuid: 857 nobj = self._cfg.GetNetwork(net_uuid) 858 n.netinfo = objects.Network.ToDict(nobj) 859 return n.ToDict()
860
861 - def _DeviceDict(self, _, (device, instance)):
862 if isinstance(device, objects.NIC): 863 return self._NicDict(None, device) 864 elif isinstance(device, objects.Disk): 865 return self._SingleDiskDictDP(None, (device, instance))
866
867 - def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
868 """Convert the given instance to a dict. 869 870 This is done via the instance's ToDict() method and additionally 871 we fill the hvparams with the cluster defaults. 872 873 @type instance: L{objects.Instance} 874 @param instance: an Instance object 875 @type hvp: dict or None 876 @param hvp: a dictionary with overridden hypervisor parameters 877 @type bep: dict or None 878 @param bep: a dictionary with overridden backend parameters 879 @type osp: dict or None 880 @param osp: a dictionary with overridden os parameters 881 @rtype: dict 882 @return: the instance dict, with the hvparams filled with the 883 cluster defaults 884 885 """ 886 idict = instance.ToDict() 887 cluster = self._cfg.GetClusterInfo() 888 idict["hvparams"] = cluster.FillHV(instance) 889 idict["secondary_nodes"] = \ 890 self._cfg.GetInstanceSecondaryNodes(instance.uuid) 891 if hvp is not None: 892 idict["hvparams"].update(hvp) 893 idict["beparams"] = cluster.FillBE(instance) 894 if bep is not None: 895 idict["beparams"].update(bep) 896 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 897 if osp is not None: 898 idict["osparams"].update(osp) 899 disks = self._cfg.GetInstanceDisks(instance.uuid) 900 idict["disks_info"] = self._DisksDictDP(node, (disks, instance)) 901 for nic in idict["nics"]: 902 nic["nicparams"] = objects.FillDict( 903 cluster.nicparams[constants.PP_DEFAULT], 904 nic["nicparams"]) 905 network = nic.get("network", None) 906 if network: 907 net_uuid = self._cfg.LookupNetwork(network) 908 if net_uuid: 909 nobj = self._cfg.GetNetwork(net_uuid) 910 nic["netinfo"] = objects.Network.ToDict(nobj) 911 return idict
912
913 - def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
914 """Wrapper for L{_InstDict}. 915 916 """ 917 return self._InstDict(node, instance, hvp=hvp, bep=bep)
918
919 - def _InstDictOspDp(self, node, (instance, osparams)):
920 """Wrapper for L{_InstDict}. 921 922 """ 923 return self._InstDict(node, instance, osp=osparams)
924
925 - def _DisksDictDP(self, node, (disks, instance)):
926 """Wrapper for L{AnnotateDiskParams}. 927 928 """ 929 diskparams = self._cfg.GetInstanceDiskParams(instance) 930 ret = [] 931 for disk in AnnotateDiskParams(disks, diskparams): 932 disk_node_uuids = disk.GetNodes(instance.primary_node) 933 node_ips = dict((uuid, node.secondary_ip) for (uuid, node) 934 in self._cfg.GetMultiNodeInfo(disk_node_uuids)) 935 936 disk.UpdateDynamicDiskParams(node, node_ips) 937 938 ret.append(disk.ToDict(include_dynamic_params=True)) 939 940 return ret
941
942 - def _MultiDiskDictDP(self, node, disks_insts):
943 """Wrapper for L{AnnotateDiskParams}. 944 945 Supports a list of (disk, instance) tuples. 946 """ 947 return [disk for disk_inst in disks_insts 948 for disk in self._DisksDictDP(node, disk_inst)]
949
950 - def _SingleDiskDictDP(self, node, (disk, instance)):
951 """Wrapper for L{AnnotateDiskParams}. 952 953 """ 954 (anno_disk,) = self._DisksDictDP(node, ([disk], instance)) 955 return anno_disk
956
957 - def _EncodeNodeToDiskDictDP(self, node, value):
958 """Encode dict of node name -> list of (disk, instance) tuples as values. 959 960 """ 961 return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks]) 962 for name, disks in value.items())
963
964 - def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
965 """Encodes import/export I/O information. 966 967 """ 968 if ieio == constants.IEIO_RAW_DISK: 969 assert len(ieioargs) == 2 970 return (ieio, (self._SingleDiskDictDP(node, ieioargs), )) 971 972 if ieio == constants.IEIO_SCRIPT: 973 assert len(ieioargs) == 2 974 return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1])) 975 976 return (ieio, ieioargs)
977
978 979 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
980 """RPC wrappers for job queue. 981 982 """
983 - def __init__(self, _context, address_list):
984 """Initializes this class. 985 986 """ 987 if address_list is None: 988 resolver = compat.partial(_SsconfResolver, True) 989 else: 990 # Caller provided an address list 991 resolver = _StaticResolver(address_list) 992 993 _RpcClientBase.__init__(self, resolver, _ENCODERS.get, 994 lock_monitor_cb=lambda _: None) 995 _generated_rpc.RpcClientJobQueue.__init__(self)
996
997 998 -class BootstrapRunner(_RpcClientBase, 999 _generated_rpc.RpcClientBootstrap, 1000 _generated_rpc.RpcClientDnsOnly):
1001 """RPC wrappers for bootstrapping. 1002 1003 """
1004 - def __init__(self):
1005 """Initializes this class. 1006 1007 """ 1008 # Pylint doesn't recognize multiple inheritance properly, see 1009 # <http://www.logilab.org/ticket/36586> and 1010 # <http://www.logilab.org/ticket/35642> 1011 # pylint: disable=W0233 1012 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True), 1013 _ENCODERS.get) 1014 _generated_rpc.RpcClientBootstrap.__init__(self) 1015 _generated_rpc.RpcClientDnsOnly.__init__(self)
1016
1017 1018 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
1019 """RPC wrappers for calls using only DNS. 1020 1021 """
1022 - def __init__(self):
1023 """Initialize this class. 1024 1025 """ 1026 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False), 1027 _ENCODERS.get) 1028 _generated_rpc.RpcClientDnsOnly.__init__(self)
1029
1030 1031 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1032 """RPC wrappers for L{config}. 1033 1034 """
1035 - def __init__(self, _context, address_list, _req_process_fn=None, 1036 _getents=None):
1037 """Initializes this class. 1038 1039 """ 1040 lock_monitor_cb = None 1041 1042 if address_list is None: 1043 resolver = compat.partial(_SsconfResolver, True) 1044 else: 1045 # Caller provided an address list 1046 resolver = _StaticResolver(address_list) 1047 1048 encoders = _ENCODERS.copy() 1049 1050 encoders.update({ 1051 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 1052 }) 1053 1054 _RpcClientBase.__init__(self, resolver, encoders.get, 1055 lock_monitor_cb=lock_monitor_cb, 1056 _req_process_fn=_req_process_fn) 1057 _generated_rpc.RpcClientConfig.__init__(self)
1058