Package ganeti :: Package rpc :: Module node
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc.node

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Inter-node RPC library. 
  32   
  33  """ 
  34   
  35  # pylint: disable=C0103,R0201,R0904 
  36  # C0103: Invalid name, since call_ are not valid 
  37  # R0201: Method could be a function, we keep all rpcs instance methods 
  38  # as not to change them back and forth between static/instance methods 
  39  # if they need to start using instance attributes 
  40  # R0904: Too many public methods 
  41   
  42  import logging 
  43  import zlib 
  44  import base64 
  45  import pycurl 
  46  import threading 
  47  import copy 
  48  import os 
  49   
  50  from ganeti import utils 
  51  from ganeti import objects 
  52  from ganeti import http 
  53  from ganeti import serializer 
  54  from ganeti import constants 
  55  from ganeti import errors 
  56  from ganeti import netutils 
  57  from ganeti import ssconf 
  58  from ganeti import runtime 
  59  from ganeti import compat 
  60  from ganeti import rpc_defs 
  61  from ganeti import pathutils 
  62  from ganeti import vcluster 
  63   
  64  # Special module generated at build time 
  65  from ganeti import _generated_rpc 
  66   
  67  # pylint has a bug here, doesn't see this import 
  68  import ganeti.http.client  # pylint: disable=W0611 
  69   
  70   
  71  _RPC_CLIENT_HEADERS = [ 
  72    "Content-type: %s" % http.HTTP_APP_JSON, 
  73    "Expect:", 
  74    ] 
  75   
  76  #: Special value to describe an offline host 
  77  _OFFLINE = object() 
78 79 80 -def Init():
81 """Initializes the module-global HTTP client manager. 82 83 Must be called before using any RPC function and while exactly one thread is 84 running. 85 86 """ 87 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 88 # one thread running. This check is just a safety measure -- it doesn't 89 # cover all cases. 90 assert threading.activeCount() == 1, \ 91 "Found more than one active thread when initializing pycURL" 92 93 logging.info("Using PycURL %s", pycurl.version) 94 95 pycurl.global_init(pycurl.GLOBAL_ALL)
96
97 98 -def Shutdown():
99 """Stops the module-global HTTP client manager. 100 101 Must be called before quitting the program and while exactly one thread is 102 running. 103 104 """ 105 pycurl.global_cleanup()
106
107 108 -def _ConfigRpcCurl(curl):
109 noded_cert = str(pathutils.NODED_CERT_FILE) 110 noded_client_cert = str(pathutils.NODED_CLIENT_CERT_FILE) 111 112 # FIXME: The next two lines are necessary to ensure upgradability from 113 # 2.10 to 2.11. Remove in 2.12, because this slows down RPC calls. 114 if not os.path.exists(noded_client_cert): 115 logging.info("Using server certificate as client certificate for RPC" 116 "call.") 117 noded_client_cert = noded_cert 118 119 curl.setopt(pycurl.FOLLOWLOCATION, False) 120 curl.setopt(pycurl.CAINFO, noded_cert) 121 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 122 curl.setopt(pycurl.SSL_VERIFYPEER, True) 123 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 124 curl.setopt(pycurl.SSLCERT, noded_client_cert) 125 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 126 curl.setopt(pycurl.SSLKEY, noded_client_cert) 127 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
128
129 130 -def RunWithRPC(fn):
131 """RPC-wrapper decorator. 132 133 When applied to a function, it runs it with the RPC system 134 initialized, and it shutsdown the system afterwards. This means the 135 function must be called without RPC being initialized. 136 137 """ 138 def wrapper(*args, **kwargs): 139 Init() 140 try: 141 return fn(*args, **kwargs) 142 finally: 143 Shutdown()
144 return wrapper 145
146 147 -def _Compress(_, data):
148 """Compresses a string for transport over RPC. 149 150 Small amounts of data are not compressed. 151 152 @type data: str 153 @param data: Data 154 @rtype: tuple 155 @return: Encoded data to send 156 157 """ 158 # Small amounts of data are not compressed 159 if len(data) < 512: 160 return (constants.RPC_ENCODING_NONE, data) 161 162 # Compress with zlib and encode in base64 163 return (constants.RPC_ENCODING_ZLIB_BASE64, 164 base64.b64encode(zlib.compress(data, 3)))
165
166 167 -class RpcResult(object):
168 """RPC Result class. 169 170 This class holds an RPC result. It is needed since in multi-node 171 calls we can't raise an exception just because one out of many 172 failed, and therefore we use this class to encapsulate the result. 173 174 @ivar data: the data payload, for successful results, or None 175 @ivar call: the name of the RPC call 176 @ivar node: the name of the node to which we made the call 177 @ivar offline: whether the operation failed because the node was 178 offline, as opposed to actual failure; offline=True will always 179 imply failed=True, in order to allow simpler checking if 180 the user doesn't care about the exact failure mode 181 @ivar fail_msg: the error message if the call failed 182 183 """
184 - def __init__(self, data=None, failed=False, offline=False, 185 call=None, node=None):
186 self.offline = offline 187 self.call = call 188 self.node = node 189 190 if offline: 191 self.fail_msg = "Node is marked offline" 192 self.data = self.payload = None 193 elif failed: 194 self.fail_msg = self._EnsureErr(data) 195 self.data = self.payload = None 196 else: 197 self.data = data 198 if not isinstance(self.data, (tuple, list)): 199 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 200 type(self.data)) 201 self.payload = None 202 elif len(data) != 2: 203 self.fail_msg = ("RPC layer error: invalid result length (%d), " 204 "expected 2" % len(self.data)) 205 self.payload = None 206 elif not self.data[0]: 207 self.fail_msg = self._EnsureErr(self.data[1]) 208 self.payload = None 209 else: 210 # finally success 211 self.fail_msg = None 212 self.payload = data[1] 213 214 for attr_name in ["call", "data", "fail_msg", 215 "node", "offline", "payload"]: 216 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
217
218 - def __repr__(self):
219 return ("RpcResult(data=%s, call=%s, node=%s, offline=%s, fail_msg=%s)" % 220 (self.offline, self.call, self.node, self.offline, self.fail_msg))
221 222 @staticmethod
223 - def _EnsureErr(val):
224 """Helper to ensure we return a 'True' value for error.""" 225 if val: 226 return val 227 else: 228 return "No error information"
229
230 - def Raise(self, msg, prereq=False, ecode=None):
231 """If the result has failed, raise an OpExecError. 232 233 This is used so that LU code doesn't have to check for each 234 result, but instead can call this function. 235 236 """ 237 if not self.fail_msg: 238 return 239 240 if not msg: # one could pass None for default message 241 msg = ("Call '%s' to node '%s' has failed: %s" % 242 (self.call, self.node, self.fail_msg)) 243 else: 244 msg = "%s: %s" % (msg, self.fail_msg) 245 if prereq: 246 ec = errors.OpPrereqError 247 else: 248 ec = errors.OpExecError 249 if ecode is not None: 250 args = (msg, ecode) 251 else: 252 args = (msg, ) 253 raise ec(*args) # pylint: disable=W0142
254
255 - def Warn(self, msg, feedback_fn):
256 """If the result has failed, call the feedback_fn. 257 258 This is used to in cases were LU wants to warn the 259 user about a failure, but continue anyway. 260 261 """ 262 if not self.fail_msg: 263 return 264 265 msg = "%s: %s" % (msg, self.fail_msg) 266 feedback_fn(msg)
267
268 269 -def _SsconfResolver(ssconf_ips, node_list, _, 270 ssc=ssconf.SimpleStore, 271 nslookup_fn=netutils.Hostname.GetIP):
272 """Return addresses for given node names. 273 274 @type ssconf_ips: bool 275 @param ssconf_ips: Use the ssconf IPs 276 @type node_list: list 277 @param node_list: List of node names 278 @type ssc: class 279 @param ssc: SimpleStore class that is used to obtain node->ip mappings 280 @type nslookup_fn: callable 281 @param nslookup_fn: function use to do NS lookup 282 @rtype: list of tuple; (string, string) 283 @return: List of tuples containing node name and IP address 284 285 """ 286 ss = ssc() 287 family = ss.GetPrimaryIPFamily() 288 289 if ssconf_ips: 290 iplist = ss.GetNodePrimaryIPList() 291 ipmap = dict(entry.split() for entry in iplist) 292 else: 293 ipmap = {} 294 295 result = [] 296 for node in node_list: 297 ip = ipmap.get(node) 298 if ip is None: 299 ip = nslookup_fn(node, family=family) 300 result.append((node, ip, node)) 301 302 return result
303
304 305 -class _StaticResolver:
306 - def __init__(self, addresses):
307 """Initializes this class. 308 309 """ 310 self._addresses = addresses
311
312 - def __call__(self, hosts, _):
313 """Returns static addresses for hosts. 314 315 """ 316 assert len(hosts) == len(self._addresses) 317 return zip(hosts, self._addresses, hosts)
318
319 320 -def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
321 """Checks if a node is online. 322 323 @type node_uuid_or_name: string 324 @param node_uuid_or_name: Node UUID 325 @type node: L{objects.Node} or None 326 @param node: Node object 327 328 """ 329 if node is None: 330 # Assume that the passed parameter was actually a node name, so depend on 331 # DNS for name resolution 332 return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name) 333 else: 334 if node.offline and not accept_offline_node: 335 ip = _OFFLINE 336 else: 337 ip = node.primary_ip 338 return (node.name, ip, node_uuid_or_name)
339
340 341 -def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
342 """Calculate node addresses using configuration. 343 344 Note that strings in node_uuids are treated as node names if the UUID is not 345 found in the configuration. 346 347 """ 348 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE) 349 350 assert accept_offline_node or opts is None, "Unknown option" 351 352 # Special case for single-host lookups 353 if len(node_uuids) == 1: 354 (uuid, ) = node_uuids 355 return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)] 356 else: 357 all_nodes = all_nodes_fn() 358 return [_CheckConfigNode(uuid, all_nodes.get(uuid, None), 359 accept_offline_node) 360 for uuid in node_uuids]
361
362 363 -class _RpcProcessor:
364 - def __init__(self, resolver, port, lock_monitor_cb=None):
365 """Initializes this class. 366 367 @param resolver: callable accepting a list of node UUIDs or hostnames, 368 returning a list of tuples containing name, IP address and original name 369 of the resolved node. IP address can be the name or the special value 370 L{_OFFLINE} to mark offline machines. 371 @type port: int 372 @param port: TCP port 373 @param lock_monitor_cb: Callable for registering with lock monitor 374 375 """ 376 self._resolver = resolver 377 self._port = port 378 self._lock_monitor_cb = lock_monitor_cb
379 380 @staticmethod
381 - def _PrepareRequests(hosts, port, procedure, body, read_timeout):
382 """Prepares requests by sorting offline hosts into separate list. 383 384 @type body: dict 385 @param body: a dictionary with per-host body data 386 387 """ 388 results = {} 389 requests = {} 390 391 assert isinstance(body, dict) 392 assert len(body) == len(hosts) 393 assert compat.all(isinstance(v, str) for v in body.values()) 394 assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \ 395 "%s != %s" % (hosts, body.keys()) 396 397 for (name, ip, original_name) in hosts: 398 if ip is _OFFLINE: 399 # Node is marked as offline 400 results[original_name] = RpcResult(node=name, 401 offline=True, 402 call=procedure) 403 else: 404 requests[original_name] = \ 405 http.client.HttpClientRequest(str(ip), port, 406 http.HTTP_POST, str("/%s" % procedure), 407 headers=_RPC_CLIENT_HEADERS, 408 post_data=body[original_name], 409 read_timeout=read_timeout, 410 nicename="%s/%s" % (name, procedure), 411 curl_config_fn=_ConfigRpcCurl) 412 413 return (results, requests)
414 415 @staticmethod
416 - def _CombineResults(results, requests, procedure):
417 """Combines pre-computed results for offline hosts with actual call results. 418 419 """ 420 for name, req in requests.items(): 421 if req.success and req.resp_status_code == http.HTTP_OK: 422 host_result = RpcResult(data=serializer.LoadJson(req.resp_body), 423 node=name, call=procedure) 424 else: 425 # TODO: Better error reporting 426 if req.error: 427 msg = req.error 428 else: 429 msg = req.resp_body 430 431 logging.error("RPC error in %s on node %s: %s", procedure, name, msg) 432 host_result = RpcResult(data=msg, failed=True, node=name, 433 call=procedure) 434 435 results[name] = host_result 436 437 return results
438
439 - def __call__(self, nodes, procedure, body, read_timeout, resolver_opts, 440 _req_process_fn=None):
441 """Makes an RPC request to a number of nodes. 442 443 @type nodes: sequence 444 @param nodes: node UUIDs or Hostnames 445 @type procedure: string 446 @param procedure: Request path 447 @type body: dictionary 448 @param body: dictionary with request bodies per host 449 @type read_timeout: int or None 450 @param read_timeout: Read timeout for request 451 @rtype: dictionary 452 @return: a dictionary mapping host names to rpc.RpcResult objects 453 454 """ 455 assert read_timeout is not None, \ 456 "Missing RPC read timeout for procedure '%s'" % procedure 457 458 if _req_process_fn is None: 459 _req_process_fn = http.client.ProcessRequests 460 461 (results, requests) = \ 462 self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port, 463 procedure, body, read_timeout) 464 465 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) 466 467 assert not frozenset(results).intersection(requests) 468 469 return self._CombineResults(results, requests, procedure)
470
471 472 -class _RpcClientBase:
473 - def __init__(self, resolver, encoder_fn, lock_monitor_cb=None, 474 _req_process_fn=None):
475 """Initializes this class. 476 477 """ 478 proc = _RpcProcessor(resolver, 479 netutils.GetDaemonPort(constants.NODED), 480 lock_monitor_cb=lock_monitor_cb) 481 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn) 482 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
483 484 @staticmethod
485 - def _EncodeArg(encoder_fn, node, (argkind, value)):
486 """Encode argument. 487 488 """ 489 if argkind is None: 490 return value 491 else: 492 return encoder_fn(argkind)(node, value)
493
494 - def _Call(self, cdef, node_list, args):
495 """Entry point for automatically generated RPC wrappers. 496 497 """ 498 (procedure, _, resolver_opts, timeout, argdefs, 499 prep_fn, postproc_fn, _) = cdef 500 501 if callable(timeout): 502 read_timeout = timeout(args) 503 else: 504 read_timeout = timeout 505 506 if callable(resolver_opts): 507 req_resolver_opts = resolver_opts(args) 508 else: 509 req_resolver_opts = resolver_opts 510 511 if len(args) != len(argdefs): 512 raise errors.ProgrammerError("Number of passed arguments doesn't match") 513 514 if prep_fn is None: 515 prep_fn = lambda _, args: args 516 assert callable(prep_fn) 517 518 # encode the arguments for each node individually, pass them and the node 519 # name to the prep_fn, and serialise its return value 520 encode_args_fn = lambda node: map(compat.partial(self._encoder, node), 521 zip(map(compat.snd, argdefs), args)) 522 pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n)))) 523 for n in node_list) 524 525 result = self._proc(node_list, procedure, pnbody, read_timeout, 526 req_resolver_opts) 527 528 if postproc_fn: 529 return dict(map(lambda (key, value): (key, postproc_fn(value)), 530 result.items())) 531 else: 532 return result
533
534 535 -def _ObjectToDict(_, value):
536 """Converts an object to a dictionary. 537 538 @note: See L{objects}. 539 540 """ 541 return value.ToDict()
542
543 544 -def _ObjectListToDict(node, value):
545 """Converts a list of L{objects} to dictionaries. 546 547 """ 548 return map(compat.partial(_ObjectToDict, node), value)
549
550 551 -def _PrepareFileUpload(getents_fn, node, filename):
552 """Loads a file and prepares it for an upload to nodes. 553 554 """ 555 statcb = utils.FileStatHelper() 556 data = _Compress(node, utils.ReadFile(filename, preread=statcb)) 557 st = statcb.st 558 559 if getents_fn is None: 560 getents_fn = runtime.GetEnts 561 562 getents = getents_fn() 563 564 virt_filename = vcluster.MakeVirtualPath(filename) 565 566 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid), 567 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
568
569 570 -def _PrepareFinalizeExportDisks(_, snap_disks):
571 """Encodes disks for finalizing export. 572 573 """ 574 flat_disks = [] 575 576 for disk in snap_disks: 577 if isinstance(disk, bool): 578 flat_disks.append(disk) 579 else: 580 flat_disks.append(disk.ToDict()) 581 582 return flat_disks
583
584 585 -def _EncodeBlockdevRename(_, value):
586 """Encodes information for renaming block devices. 587 588 """ 589 return [(d.ToDict(), uid) for d, uid in value]
590
591 592 -def _AddSpindlesToLegacyNodeInfo(result, space_info):
593 """Extracts the spindle information from the space info and adds 594 it to the result dictionary. 595 596 @type result: dict of strings 597 @param result: dictionary holding the result of the legacy node info 598 @type space_info: list of dicts of strings 599 @param space_info: list, each row holding space information of one storage 600 unit 601 @rtype: None 602 @return: does not return anything, manipulates the C{result} variable 603 604 """ 605 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType( 606 space_info, constants.ST_LVM_PV) 607 if lvm_pv_info: 608 result["spindles_free"] = lvm_pv_info["storage_free"] 609 result["spindles_total"] = lvm_pv_info["storage_size"] 610 else: 611 result["spindles_free"] = 0 612 result["spindles_total"] = 0
613
614 615 -def _AddStorageInfoToLegacyNodeInfoByTemplate( 616 result, space_info, disk_template):
617 """Extracts the storage space information of the disk template from 618 the space info and adds it to the result dictionary. 619 620 @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information. 621 622 """ 623 if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template): 624 disk_info = utils.storage.LookupSpaceInfoByDiskTemplate( 625 space_info, disk_template) 626 result["name"] = disk_info["name"] 627 result["storage_free"] = disk_info["storage_free"] 628 result["storage_size"] = disk_info["storage_size"] 629 else: 630 # FIXME: consider displaying '-' in this case 631 result["storage_free"] = 0 632 result["storage_size"] = 0
633
634 635 -def MakeLegacyNodeInfo(data, disk_template):
636 """Formats the data returned by call_node_info. 637 638 Converts the data into a single dictionary. This is fine for most use cases, 639 but some require information from more than one volume group or hypervisor. 640 641 """ 642 (bootid, space_info, (hv_info, )) = data 643 644 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid}) 645 646 _AddSpindlesToLegacyNodeInfo(ret, space_info) 647 _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template) 648 649 return ret
650
651 652 -def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
653 """Annotates just DRBD disks layouts. 654 655 """ 656 assert disk.dev_type == constants.DT_DRBD8 657 658 disk.params = objects.FillDict(drbd_params, disk.params) 659 (dev_data, dev_meta) = disk.children 660 dev_data.params = objects.FillDict(data_params, dev_data.params) 661 dev_meta.params = objects.FillDict(meta_params, dev_meta.params) 662 663 return disk
664
665 666 -def _AnnotateDParamsGeneric(disk, (params, )):
667 """Generic disk parameter annotation routine. 668 669 """ 670 assert disk.dev_type != constants.DT_DRBD8 671 672 disk.params = objects.FillDict(params, disk.params) 673 674 return disk
675
676 677 -def AnnotateDiskParams(disks, disk_params):
678 """Annotates the disk objects with the disk parameters. 679 680 @param disks: The list of disks objects to annotate 681 @param disk_params: The disk parameters for annotation 682 @returns: A list of disk objects annotated 683 684 """ 685 def AnnotateDisk(disk): 686 if disk.dev_type == constants.DT_DISKLESS: 687 return disk 688 689 ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params) 690 691 if disk.dev_type == constants.DT_DRBD8: 692 return _AnnotateDParamsDRBD(disk, ld_params) 693 else: 694 return _AnnotateDParamsGeneric(disk, ld_params)
695 696 return [AnnotateDisk(disk.Copy()) for disk in disks] 697
698 699 -def _GetExclusiveStorageFlag(cfg, node_uuid):
700 ni = cfg.GetNodeInfo(node_uuid) 701 if ni is None: 702 raise errors.OpPrereqError("Invalid node name %s" % node_uuid, 703 errors.ECODE_NOENT) 704 return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
705
706 707 -def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
708 """Adds the exclusive storage flag to lvm units. 709 710 This function creates a copy of the storage_units lists, with the 711 es_flag being added to all lvm storage units. 712 713 @type storage_units: list of pairs (string, string) 714 @param storage_units: list of 'raw' storage units, consisting only of 715 (storage_type, storage_key) 716 @type es_flag: boolean 717 @param es_flag: exclusive storage flag 718 @rtype: list of tuples (string, string, list) 719 @return: list of storage units (storage_type, storage_key, params) with 720 the params containing the es_flag for lvm-vg storage units 721 722 """ 723 result = [] 724 for (storage_type, storage_key) in storage_units: 725 if storage_type in [constants.ST_LVM_VG]: 726 result.append((storage_type, storage_key, [es_flag])) 727 if es_flag: 728 result.append((constants.ST_LVM_PV, storage_key, [es_flag])) 729 else: 730 result.append((storage_type, storage_key, [])) 731 return result
732
733 734 -def GetExclusiveStorageForNodes(cfg, node_uuids):
735 """Return the exclusive storage flag for all the given nodes. 736 737 @type cfg: L{config.ConfigWriter} 738 @param cfg: cluster configuration 739 @type node_uuids: list or tuple 740 @param node_uuids: node UUIDs for which to read the flag 741 @rtype: dict 742 @return: mapping from node uuids to exclusive storage flags 743 @raise errors.OpPrereqError: if any given node name has no corresponding 744 node 745 746 """ 747 getflag = lambda n: _GetExclusiveStorageFlag(cfg, n) 748 flags = map(getflag, node_uuids) 749 return dict(zip(node_uuids, flags))
750
751 752 -def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
753 """Return the lvm storage unit for all the given nodes. 754 755 Main purpose of this function is to map the exclusive storage flag, which 756 can be different for each node, to the default LVM storage unit. 757 758 @type cfg: L{config.ConfigWriter} 759 @param cfg: cluster configuration 760 @type storage_units: list of pairs (string, string) 761 @param storage_units: list of 'raw' storage units, e.g. pairs of 762 (storage_type, storage_key) 763 @type node_uuids: list or tuple 764 @param node_uuids: node UUIDs for which to read the flag 765 @rtype: dict 766 @return: mapping from node uuids to a list of storage units which include 767 the exclusive storage flag for lvm storage 768 @raise errors.OpPrereqError: if any given node name has no corresponding 769 node 770 771 """ 772 getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits( 773 storage_units, _GetExclusiveStorageFlag(cfg, n)) 774 flags = map(getunit, node_uuids) 775 return dict(zip(node_uuids, flags))
776 777 778 #: Generic encoders 779 _ENCODERS = { 780 rpc_defs.ED_OBJECT_DICT: _ObjectToDict, 781 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, 782 rpc_defs.ED_COMPRESS: _Compress, 783 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, 784 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, 785 }
786 787 788 -class RpcRunner(_RpcClientBase, 789 _generated_rpc.RpcClientDefault, 790 _generated_rpc.RpcClientBootstrap, 791 _generated_rpc.RpcClientDnsOnly, 792 _generated_rpc.RpcClientConfig):
793 """RPC runner class. 794 795 """
796 - def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
797 """Initialized the RPC runner. 798 799 @type cfg: L{config.ConfigWriter} 800 @param cfg: Configuration 801 @type lock_monitor_cb: callable 802 @param lock_monitor_cb: Lock monitor callback 803 804 """ 805 self._cfg = cfg 806 807 encoders = _ENCODERS.copy() 808 809 encoders.update({ 810 # Encoders requiring configuration object 811 rpc_defs.ED_INST_DICT: self._InstDict, 812 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp, 813 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp, 814 rpc_defs.ED_NIC_DICT: self._NicDict, 815 rpc_defs.ED_DEVICE_DICT: self._DeviceDict, 816 817 # Encoders annotating disk parameters 818 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP, 819 rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP, 820 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP, 821 rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP, 822 823 # Encoders with special requirements 824 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 825 826 rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO, 827 }) 828 829 # Resolver using configuration 830 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo, 831 cfg.GetAllNodesInfo) 832 833 # Pylint doesn't recognize multiple inheritance properly, see 834 # <http://www.logilab.org/ticket/36586> and 835 # <http://www.logilab.org/ticket/35642> 836 # pylint: disable=W0233 837 _RpcClientBase.__init__(self, resolver, encoders.get, 838 lock_monitor_cb=lock_monitor_cb, 839 _req_process_fn=_req_process_fn) 840 _generated_rpc.RpcClientConfig.__init__(self) 841 _generated_rpc.RpcClientBootstrap.__init__(self) 842 _generated_rpc.RpcClientDnsOnly.__init__(self) 843 _generated_rpc.RpcClientDefault.__init__(self)
844
845 - def _NicDict(self, _, nic):
846 """Convert the given nic to a dict and encapsulate netinfo 847 848 """ 849 n = copy.deepcopy(nic) 850 if n.network: 851 net_uuid = self._cfg.LookupNetwork(n.network) 852 if net_uuid: 853 nobj = self._cfg.GetNetwork(net_uuid) 854 n.netinfo = objects.Network.ToDict(nobj) 855 return n.ToDict()
856
857 - def _DeviceDict(self, _, (device, instance)):
858 if isinstance(device, objects.NIC): 859 return self._NicDict(None, device) 860 elif isinstance(device, objects.Disk): 861 return self._SingleDiskDictDP(None, (device, instance))
862
863 - def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
864 """Convert the given instance to a dict. 865 866 This is done via the instance's ToDict() method and additionally 867 we fill the hvparams with the cluster defaults. 868 869 @type instance: L{objects.Instance} 870 @param instance: an Instance object 871 @type hvp: dict or None 872 @param hvp: a dictionary with overridden hypervisor parameters 873 @type bep: dict or None 874 @param bep: a dictionary with overridden backend parameters 875 @type osp: dict or None 876 @param osp: a dictionary with overridden os parameters 877 @rtype: dict 878 @return: the instance dict, with the hvparams filled with the 879 cluster defaults 880 881 """ 882 idict = instance.ToDict() 883 cluster = self._cfg.GetClusterInfo() 884 idict["hvparams"] = cluster.FillHV(instance) 885 if hvp is not None: 886 idict["hvparams"].update(hvp) 887 idict["beparams"] = cluster.FillBE(instance) 888 if bep is not None: 889 idict["beparams"].update(bep) 890 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 891 if osp is not None: 892 idict["osparams"].update(osp) 893 idict["disks"] = self._DisksDictDP(node, (instance.disks, instance)) 894 for nic in idict["nics"]: 895 nic["nicparams"] = objects.FillDict( 896 cluster.nicparams[constants.PP_DEFAULT], 897 nic["nicparams"]) 898 network = nic.get("network", None) 899 if network: 900 net_uuid = self._cfg.LookupNetwork(network) 901 if net_uuid: 902 nobj = self._cfg.GetNetwork(net_uuid) 903 nic["netinfo"] = objects.Network.ToDict(nobj) 904 return idict
905
906 - def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
907 """Wrapper for L{_InstDict}. 908 909 """ 910 return self._InstDict(node, instance, hvp=hvp, bep=bep)
911
912 - def _InstDictOspDp(self, node, (instance, osparams)):
913 """Wrapper for L{_InstDict}. 914 915 """ 916 return self._InstDict(node, instance, osp=osparams)
917
918 - def _DisksDictDP(self, node, (disks, instance)):
919 """Wrapper for L{AnnotateDiskParams}. 920 921 """ 922 diskparams = self._cfg.GetInstanceDiskParams(instance) 923 ret = [] 924 for disk in AnnotateDiskParams(disks, diskparams): 925 disk_node_uuids = disk.GetNodes(instance.primary_node) 926 node_ips = dict((uuid, node.secondary_ip) for (uuid, node) 927 in self._cfg.GetMultiNodeInfo(disk_node_uuids)) 928 929 disk.UpdateDynamicDiskParams(node, node_ips) 930 931 ret.append(disk.ToDict(include_dynamic_params=True)) 932 933 return ret
934
935 - def _MultiDiskDictDP(self, node, disks_insts):
936 """Wrapper for L{AnnotateDiskParams}. 937 938 Supports a list of (disk, instance) tuples. 939 """ 940 return [disk for disk_inst in disks_insts 941 for disk in self._DisksDictDP(node, disk_inst)]
942
943 - def _SingleDiskDictDP(self, node, (disk, instance)):
944 """Wrapper for L{AnnotateDiskParams}. 945 946 """ 947 (anno_disk,) = self._DisksDictDP(node, ([disk], instance)) 948 return anno_disk
949
950 - def _EncodeNodeToDiskDictDP(self, node, value):
951 """Encode dict of node name -> list of (disk, instance) tuples as values. 952 953 """ 954 return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks]) 955 for name, disks in value.items())
956
957 - def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
958 """Encodes import/export I/O information. 959 960 """ 961 if ieio == constants.IEIO_RAW_DISK: 962 assert len(ieioargs) == 2 963 return (ieio, (self._SingleDiskDictDP(node, ieioargs), )) 964 965 if ieio == constants.IEIO_SCRIPT: 966 assert len(ieioargs) == 2 967 return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1])) 968 969 return (ieio, ieioargs)
970
971 972 -class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
973 """RPC wrappers for job queue. 974 975 """
976 - def __init__(self, context, address_list):
977 """Initializes this class. 978 979 """ 980 if address_list is None: 981 resolver = compat.partial(_SsconfResolver, True) 982 else: 983 # Caller provided an address list 984 resolver = _StaticResolver(address_list) 985 986 _RpcClientBase.__init__(self, resolver, _ENCODERS.get, 987 lock_monitor_cb=context.glm.AddToLockMonitor) 988 _generated_rpc.RpcClientJobQueue.__init__(self)
989
990 991 -class BootstrapRunner(_RpcClientBase, 992 _generated_rpc.RpcClientBootstrap, 993 _generated_rpc.RpcClientDnsOnly):
994 """RPC wrappers for bootstrapping. 995 996 """
997 - def __init__(self):
998 """Initializes this class. 999 1000 """ 1001 # Pylint doesn't recognize multiple inheritance properly, see 1002 # <http://www.logilab.org/ticket/36586> and 1003 # <http://www.logilab.org/ticket/35642> 1004 # pylint: disable=W0233 1005 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True), 1006 _ENCODERS.get) 1007 _generated_rpc.RpcClientBootstrap.__init__(self) 1008 _generated_rpc.RpcClientDnsOnly.__init__(self)
1009
1010 1011 -class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
1012 """RPC wrappers for calls using only DNS. 1013 1014 """
1015 - def __init__(self):
1016 """Initialize this class. 1017 1018 """ 1019 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False), 1020 _ENCODERS.get) 1021 _generated_rpc.RpcClientDnsOnly.__init__(self)
1022
1023 1024 -class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1025 """RPC wrappers for L{config}. 1026 1027 """
1028 - def __init__(self, context, address_list, _req_process_fn=None, 1029 _getents=None):
1030 """Initializes this class. 1031 1032 """ 1033 if context: 1034 lock_monitor_cb = context.glm.AddToLockMonitor 1035 else: 1036 lock_monitor_cb = None 1037 1038 if address_list is None: 1039 resolver = compat.partial(_SsconfResolver, True) 1040 else: 1041 # Caller provided an address list 1042 resolver = _StaticResolver(address_list) 1043 1044 encoders = _ENCODERS.copy() 1045 1046 encoders.update({ 1047 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), 1048 }) 1049 1050 _RpcClientBase.__init__(self, resolver, encoders.get, 1051 lock_monitor_cb=lock_monitor_cb, 1052 _req_process_fn=_req_process_fn) 1053 _generated_rpc.RpcClientConfig.__init__(self)
1054