Package ganeti :: Module rpc
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Inter-node RPC library. 
  23   
  24  """ 
  25   
  26  # pylint: disable-msg=C0103,R0201,R0904 
  27  # C0103: Invalid name, since call_ are not valid 
  28  # R0201: Method could be a function, we keep all rpcs instance methods 
  29  # as not to change them back and forth between static/instance methods 
  30  # if they need to start using instance attributes 
  31  # R0904: Too many public methods 
  32   
  33  import os 
  34  import logging 
  35  import zlib 
  36  import base64 
  37  import pycurl 
  38  import threading 
  39   
  40  from ganeti import utils 
  41  from ganeti import objects 
  42  from ganeti import http 
  43  from ganeti import serializer 
  44  from ganeti import constants 
  45  from ganeti import errors 
  46  from ganeti import netutils 
  47   
  48  # pylint has a bug here, doesn't see this import 
  49  import ganeti.http.client  # pylint: disable-msg=W0611 
  50   
  51   
  52  # Timeout for connecting to nodes (seconds) 
  53  _RPC_CONNECT_TIMEOUT = 5 
  54   
  55  _RPC_CLIENT_HEADERS = [ 
  56    "Content-type: %s" % http.HTTP_APP_JSON, 
  57    "Expect:", 
  58    ] 
  59   
  60  # Various time constants for the timeout table 
  61  _TMO_URGENT = 60 # one minute 
  62  _TMO_FAST = 5 * 60 # five minutes 
  63  _TMO_NORMAL = 15 * 60 # 15 minutes 
  64  _TMO_SLOW = 3600 # one hour 
  65  _TMO_4HRS = 4 * 3600 
  66  _TMO_1DAY = 86400 
  67   
  68  # Timeout table that will be built later by decorators 
  69  # Guidelines for choosing timeouts: 
  70  # - call used during watcher: timeout -> 1min, _TMO_URGENT 
  71  # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST 
  72  # - other calls: 15 min, _TMO_NORMAL 
  73  # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts 
  74   
  75  _TIMEOUTS = { 
  76  } 
77 78 79 -def Init():
80 """Initializes the module-global HTTP client manager. 81 82 Must be called before using any RPC function and while exactly one thread is 83 running. 84 85 """ 86 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 87 # one thread running. This check is just a safety measure -- it doesn't 88 # cover all cases. 89 assert threading.activeCount() == 1, \ 90 "Found more than one active thread when initializing pycURL" 91 92 logging.info("Using PycURL %s", pycurl.version) 93 94 pycurl.global_init(pycurl.GLOBAL_ALL)
95
96 97 -def Shutdown():
98 """Stops the module-global HTTP client manager. 99 100 Must be called before quitting the program and while exactly one thread is 101 running. 102 103 """ 104 pycurl.global_cleanup()
105
106 107 -def _ConfigRpcCurl(curl):
108 noded_cert = str(constants.NODED_CERT_FILE) 109 110 curl.setopt(pycurl.FOLLOWLOCATION, False) 111 curl.setopt(pycurl.CAINFO, noded_cert) 112 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 113 curl.setopt(pycurl.SSL_VERIFYPEER, True) 114 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 115 curl.setopt(pycurl.SSLCERT, noded_cert) 116 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 117 curl.setopt(pycurl.SSLKEY, noded_cert) 118 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119
120 121 -class _RpcThreadLocal(threading.local):
122 - def GetHttpClientPool(self):
123 """Returns a per-thread HTTP client pool. 124 125 @rtype: L{http.client.HttpClientPool} 126 127 """ 128 try: 129 pool = self.hcp 130 except AttributeError: 131 pool = http.client.HttpClientPool(_ConfigRpcCurl) 132 self.hcp = pool 133 134 return pool
135 136 137 _thread_local = _RpcThreadLocal()
138 139 140 -def _RpcTimeout(secs):
141 """Timeout decorator. 142 143 When applied to a rpc call_* function, it updates the global timeout 144 table with the given function/timeout. 145 146 """ 147 def decorator(f): 148 name = f.__name__ 149 assert name.startswith("call_") 150 _TIMEOUTS[name[len("call_"):]] = secs 151 return f
152 return decorator 153
154 155 -def RunWithRPC(fn):
156 """RPC-wrapper decorator. 157 158 When applied to a function, it runs it with the RPC system 159 initialized, and it shutsdown the system afterwards. This means the 160 function must be called without RPC being initialized. 161 162 """ 163 def wrapper(*args, **kwargs): 164 Init() 165 try: 166 return fn(*args, **kwargs) 167 finally: 168 Shutdown()
169 return wrapper 170
171 172 -class RpcResult(object):
173 """RPC Result class. 174 175 This class holds an RPC result. It is needed since in multi-node 176 calls we can't raise an exception just because one one out of many 177 failed, and therefore we use this class to encapsulate the result. 178 179 @ivar data: the data payload, for successful results, or None 180 @ivar call: the name of the RPC call 181 @ivar node: the name of the node to which we made the call 182 @ivar offline: whether the operation failed because the node was 183 offline, as opposed to actual failure; offline=True will always 184 imply failed=True, in order to allow simpler checking if 185 the user doesn't care about the exact failure mode 186 @ivar fail_msg: the error message if the call failed 187 188 """
189 - def __init__(self, data=None, failed=False, offline=False, 190 call=None, node=None):
191 self.offline = offline 192 self.call = call 193 self.node = node 194 195 if offline: 196 self.fail_msg = "Node is marked offline" 197 self.data = self.payload = None 198 elif failed: 199 self.fail_msg = self._EnsureErr(data) 200 self.data = self.payload = None 201 else: 202 self.data = data 203 if not isinstance(self.data, (tuple, list)): 204 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 205 type(self.data)) 206 self.payload = None 207 elif len(data) != 2: 208 self.fail_msg = ("RPC layer error: invalid result length (%d), " 209 "expected 2" % len(self.data)) 210 self.payload = None 211 elif not self.data[0]: 212 self.fail_msg = self._EnsureErr(self.data[1]) 213 self.payload = None 214 else: 215 # finally success 216 self.fail_msg = None 217 self.payload = data[1] 218 219 assert hasattr(self, "call") 220 assert hasattr(self, "data") 221 assert hasattr(self, "fail_msg") 222 assert hasattr(self, "node") 223 assert hasattr(self, "offline") 224 assert hasattr(self, "payload")
225 226 @staticmethod
227 - def _EnsureErr(val):
228 """Helper to ensure we return a 'True' value for error.""" 229 if val: 230 return val 231 else: 232 return "No error information"
233
234 - def Raise(self, msg, prereq=False, ecode=None):
235 """If the result has failed, raise an OpExecError. 236 237 This is used so that LU code doesn't have to check for each 238 result, but instead can call this function. 239 240 """ 241 if not self.fail_msg: 242 return 243 244 if not msg: # one could pass None for default message 245 msg = ("Call '%s' to node '%s' has failed: %s" % 246 (self.call, self.node, self.fail_msg)) 247 else: 248 msg = "%s: %s" % (msg, self.fail_msg) 249 if prereq: 250 ec = errors.OpPrereqError 251 else: 252 ec = errors.OpExecError 253 if ecode is not None: 254 args = (msg, ecode) 255 else: 256 args = (msg, ) 257 raise ec(*args) # pylint: disable-msg=W0142
258
259 260 -class Client:
261 """RPC Client class. 262 263 This class, given a (remote) method name, a list of parameters and a 264 list of nodes, will contact (in parallel) all nodes, and return a 265 dict of results (key: node name, value: result). 266 267 One current bug is that generic failure is still signaled by 268 'False' result, which is not good. This overloading of values can 269 cause bugs. 270 271 """
272 - def __init__(self, procedure, body, port):
273 assert procedure in _TIMEOUTS, ("New RPC call not declared in the" 274 " timeouts table") 275 self.procedure = procedure 276 self.body = body 277 self.port = port 278 self._request = {}
279
280 - def ConnectList(self, node_list, address_list=None, read_timeout=None):
281 """Add a list of nodes to the target nodes. 282 283 @type node_list: list 284 @param node_list: the list of node names to connect 285 @type address_list: list or None 286 @keyword address_list: either None or a list with node addresses, 287 which must have the same length as the node list 288 @type read_timeout: int 289 @param read_timeout: overwrites the default read timeout for the 290 given operation 291 292 """ 293 if address_list is None: 294 address_list = [None for _ in node_list] 295 else: 296 assert len(node_list) == len(address_list), \ 297 "Name and address lists should have the same length" 298 for node, address in zip(node_list, address_list): 299 self.ConnectNode(node, address, read_timeout=read_timeout)
300
301 - def ConnectNode(self, name, address=None, read_timeout=None):
302 """Add a node to the target list. 303 304 @type name: str 305 @param name: the node name 306 @type address: str 307 @keyword address: the node address, if known 308 309 """ 310 if address is None: 311 address = name 312 313 if read_timeout is None: 314 read_timeout = _TIMEOUTS[self.procedure] 315 316 self._request[name] = \ 317 http.client.HttpClientRequest(str(address), self.port, 318 http.HTTP_PUT, str("/%s" % self.procedure), 319 headers=_RPC_CLIENT_HEADERS, 320 post_data=str(self.body), 321 read_timeout=read_timeout)
322
323 - def GetResults(self, http_pool=None):
324 """Call nodes and return results. 325 326 @rtype: list 327 @return: List of RPC results 328 329 """ 330 if not http_pool: 331 http_pool = _thread_local.GetHttpClientPool() 332 333 http_pool.ProcessRequests(self._request.values()) 334 335 results = {} 336 337 for name, req in self._request.iteritems(): 338 if req.success and req.resp_status_code == http.HTTP_OK: 339 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body), 340 node=name, call=self.procedure) 341 continue 342 343 # TODO: Better error reporting 344 if req.error: 345 msg = req.error 346 else: 347 msg = req.resp_body 348 349 logging.error("RPC error in %s from node %s: %s", 350 self.procedure, name, msg) 351 results[name] = RpcResult(data=msg, failed=True, node=name, 352 call=self.procedure) 353 354 return results
355
356 357 -def _EncodeImportExportIO(ieio, ieioargs):
358 """Encodes import/export I/O information. 359 360 """ 361 if ieio == constants.IEIO_RAW_DISK: 362 assert len(ieioargs) == 1 363 return (ieioargs[0].ToDict(), ) 364 365 if ieio == constants.IEIO_SCRIPT: 366 assert len(ieioargs) == 2 367 return (ieioargs[0].ToDict(), ieioargs[1]) 368 369 return ieioargs
370
371 372 -class RpcRunner(object):
373 """RPC runner class""" 374
375 - def __init__(self, cfg):
376 """Initialized the rpc runner. 377 378 @type cfg: C{config.ConfigWriter} 379 @param cfg: the configuration object that will be used to get data 380 about the cluster 381 382 """ 383 self._cfg = cfg 384 self.port = netutils.GetDaemonPort(constants.NODED)
385
386 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
387 """Convert the given instance to a dict. 388 389 This is done via the instance's ToDict() method and additionally 390 we fill the hvparams with the cluster defaults. 391 392 @type instance: L{objects.Instance} 393 @param instance: an Instance object 394 @type hvp: dict or None 395 @param hvp: a dictionary with overridden hypervisor parameters 396 @type bep: dict or None 397 @param bep: a dictionary with overridden backend parameters 398 @type osp: dict or None 399 @param osp: a dictionary with overriden os parameters 400 @rtype: dict 401 @return: the instance dict, with the hvparams filled with the 402 cluster defaults 403 404 """ 405 idict = instance.ToDict() 406 cluster = self._cfg.GetClusterInfo() 407 idict["hvparams"] = cluster.FillHV(instance) 408 if hvp is not None: 409 idict["hvparams"].update(hvp) 410 idict["beparams"] = cluster.FillBE(instance) 411 if bep is not None: 412 idict["beparams"].update(bep) 413 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 414 if osp is not None: 415 idict["osparams"].update(osp) 416 for nic in idict["nics"]: 417 nic['nicparams'] = objects.FillDict( 418 cluster.nicparams[constants.PP_DEFAULT], 419 nic['nicparams']) 420 return idict
421
422 - def _ConnectList(self, client, node_list, call, read_timeout=None):
423 """Helper for computing node addresses. 424 425 @type client: L{ganeti.rpc.Client} 426 @param client: a C{Client} instance 427 @type node_list: list 428 @param node_list: the node list we should connect 429 @type call: string 430 @param call: the name of the remote procedure call, for filling in 431 correctly any eventual offline nodes' results 432 @type read_timeout: int 433 @param read_timeout: overwrites the default read timeout for the 434 given operation 435 436 """ 437 all_nodes = self._cfg.GetAllNodesInfo() 438 name_list = [] 439 addr_list = [] 440 skip_dict = {} 441 for node in node_list: 442 if node in all_nodes: 443 if all_nodes[node].offline: 444 skip_dict[node] = RpcResult(node=node, offline=True, call=call) 445 continue 446 val = all_nodes[node].primary_ip 447 else: 448 val = None 449 addr_list.append(val) 450 name_list.append(node) 451 if name_list: 452 client.ConnectList(name_list, address_list=addr_list, 453 read_timeout=read_timeout) 454 return skip_dict
455
456 - def _ConnectNode(self, client, node, call, read_timeout=None):
457 """Helper for computing one node's address. 458 459 @type client: L{ganeti.rpc.Client} 460 @param client: a C{Client} instance 461 @type node: str 462 @param node: the node we should connect 463 @type call: string 464 @param call: the name of the remote procedure call, for filling in 465 correctly any eventual offline nodes' results 466 @type read_timeout: int 467 @param read_timeout: overwrites the default read timeout for the 468 given operation 469 470 """ 471 node_info = self._cfg.GetNodeInfo(node) 472 if node_info is not None: 473 if node_info.offline: 474 return RpcResult(node=node, offline=True, call=call) 475 addr = node_info.primary_ip 476 else: 477 addr = None 478 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
479
480 - def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
481 """Helper for making a multi-node call 482 483 """ 484 body = serializer.DumpJson(args, indent=False) 485 c = Client(procedure, body, self.port) 486 skip_dict = self._ConnectList(c, node_list, procedure, 487 read_timeout=read_timeout) 488 skip_dict.update(c.GetResults()) 489 return skip_dict
490 491 @classmethod
492 - def _StaticMultiNodeCall(cls, node_list, procedure, args, 493 address_list=None, read_timeout=None):
494 """Helper for making a multi-node static call 495 496 """ 497 body = serializer.DumpJson(args, indent=False) 498 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED)) 499 c.ConnectList(node_list, address_list=address_list, 500 read_timeout=read_timeout) 501 return c.GetResults()
502
503 - def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
504 """Helper for making a single-node call 505 506 """ 507 body = serializer.DumpJson(args, indent=False) 508 c = Client(procedure, body, self.port) 509 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout) 510 if result is None: 511 # we did connect, node is not offline 512 result = c.GetResults()[node] 513 return result
514 515 @classmethod
516 - def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
517 """Helper for making a single-node static call 518 519 """ 520 body = serializer.DumpJson(args, indent=False) 521 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED)) 522 c.ConnectNode(node, read_timeout=read_timeout) 523 return c.GetResults()[node]
524 525 @staticmethod
526 - def _Compress(data):
527 """Compresses a string for transport over RPC. 528 529 Small amounts of data are not compressed. 530 531 @type data: str 532 @param data: Data 533 @rtype: tuple 534 @return: Encoded data to send 535 536 """ 537 # Small amounts of data are not compressed 538 if len(data) < 512: 539 return (constants.RPC_ENCODING_NONE, data) 540 541 # Compress with zlib and encode in base64 542 return (constants.RPC_ENCODING_ZLIB_BASE64, 543 base64.b64encode(zlib.compress(data, 3)))
544 545 # 546 # Begin RPC calls 547 # 548 549 @_RpcTimeout(_TMO_URGENT)
550 - def call_lv_list(self, node_list, vg_name):
551 """Gets the logical volumes present in a given volume group. 552 553 This is a multi-node call. 554 555 """ 556 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
557 558 @_RpcTimeout(_TMO_URGENT)
559 - def call_vg_list(self, node_list):
560 """Gets the volume group list. 561 562 This is a multi-node call. 563 564 """ 565 return self._MultiNodeCall(node_list, "vg_list", [])
566 567 @_RpcTimeout(_TMO_NORMAL)
568 - def call_storage_list(self, node_list, su_name, su_args, name, fields):
569 """Get list of storage units. 570 571 This is a multi-node call. 572 573 """ 574 return self._MultiNodeCall(node_list, "storage_list", 575 [su_name, su_args, name, fields])
576 577 @_RpcTimeout(_TMO_NORMAL)
578 - def call_storage_modify(self, node, su_name, su_args, name, changes):
579 """Modify a storage unit. 580 581 This is a single-node call. 582 583 """ 584 return self._SingleNodeCall(node, "storage_modify", 585 [su_name, su_args, name, changes])
586 587 @_RpcTimeout(_TMO_NORMAL)
588 - def call_storage_execute(self, node, su_name, su_args, name, op):
589 """Executes an operation on a storage unit. 590 591 This is a single-node call. 592 593 """ 594 return self._SingleNodeCall(node, "storage_execute", 595 [su_name, su_args, name, op])
596 597 @_RpcTimeout(_TMO_URGENT)
598 - def call_bridges_exist(self, node, bridges_list):
599 """Checks if a node has all the bridges given. 600 601 This method checks if all bridges given in the bridges_list are 602 present on the remote node, so that an instance that uses interfaces 603 on those bridges can be started. 604 605 This is a single-node call. 606 607 """ 608 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
609 610 @_RpcTimeout(_TMO_NORMAL)
611 - def call_instance_start(self, node, instance, hvp, bep):
612 """Starts an instance. 613 614 This is a single-node call. 615 616 """ 617 idict = self._InstDict(instance, hvp=hvp, bep=bep) 618 return self._SingleNodeCall(node, "instance_start", [idict])
619 620 @_RpcTimeout(_TMO_NORMAL)
621 - def call_instance_shutdown(self, node, instance, timeout):
622 """Stops an instance. 623 624 This is a single-node call. 625 626 """ 627 return self._SingleNodeCall(node, "instance_shutdown", 628 [self._InstDict(instance), timeout])
629 630 @_RpcTimeout(_TMO_NORMAL)
631 - def call_migration_info(self, node, instance):
632 """Gather the information necessary to prepare an instance migration. 633 634 This is a single-node call. 635 636 @type node: string 637 @param node: the node on which the instance is currently running 638 @type instance: C{objects.Instance} 639 @param instance: the instance definition 640 641 """ 642 return self._SingleNodeCall(node, "migration_info", 643 [self._InstDict(instance)])
644 645 @_RpcTimeout(_TMO_NORMAL)
646 - def call_accept_instance(self, node, instance, info, target):
647 """Prepare a node to accept an instance. 648 649 This is a single-node call. 650 651 @type node: string 652 @param node: the target node for the migration 653 @type instance: C{objects.Instance} 654 @param instance: the instance definition 655 @type info: opaque/hypervisor specific (string/data) 656 @param info: result for the call_migration_info call 657 @type target: string 658 @param target: target hostname (usually ip address) (on the node itself) 659 660 """ 661 return self._SingleNodeCall(node, "accept_instance", 662 [self._InstDict(instance), info, target])
663 664 @_RpcTimeout(_TMO_NORMAL)
665 - def call_finalize_migration(self, node, instance, info, success):
666 """Finalize any target-node migration specific operation. 667 668 This is called both in case of a successful migration and in case of error 669 (in which case it should abort the migration). 670 671 This is a single-node call. 672 673 @type node: string 674 @param node: the target node for the migration 675 @type instance: C{objects.Instance} 676 @param instance: the instance definition 677 @type info: opaque/hypervisor specific (string/data) 678 @param info: result for the call_migration_info call 679 @type success: boolean 680 @param success: whether the migration was a success or a failure 681 682 """ 683 return self._SingleNodeCall(node, "finalize_migration", 684 [self._InstDict(instance), info, success])
685 686 @_RpcTimeout(_TMO_SLOW)
687 - def call_instance_migrate(self, node, instance, target, live):
688 """Migrate an instance. 689 690 This is a single-node call. 691 692 @type node: string 693 @param node: the node on which the instance is currently running 694 @type instance: C{objects.Instance} 695 @param instance: the instance definition 696 @type target: string 697 @param target: the target node name 698 @type live: boolean 699 @param live: whether the migration should be done live or not (the 700 interpretation of this parameter is left to the hypervisor) 701 702 """ 703 return self._SingleNodeCall(node, "instance_migrate", 704 [self._InstDict(instance), target, live])
705 706 @_RpcTimeout(_TMO_NORMAL)
707 - def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
708 """Reboots an instance. 709 710 This is a single-node call. 711 712 """ 713 return self._SingleNodeCall(node, "instance_reboot", 714 [self._InstDict(inst), reboot_type, 715 shutdown_timeout])
716 717 @_RpcTimeout(_TMO_1DAY)
718 - def call_instance_os_add(self, node, inst, reinstall, debug):
719 """Installs an OS on the given instance. 720 721 This is a single-node call. 722 723 """ 724 return self._SingleNodeCall(node, "instance_os_add", 725 [self._InstDict(inst), reinstall, debug])
726 727 @_RpcTimeout(_TMO_SLOW)
728 - def call_instance_run_rename(self, node, inst, old_name, debug):
729 """Run the OS rename script for an instance. 730 731 This is a single-node call. 732 733 """ 734 return self._SingleNodeCall(node, "instance_run_rename", 735 [self._InstDict(inst), old_name, debug])
736 737 @_RpcTimeout(_TMO_URGENT)
738 - def call_instance_info(self, node, instance, hname):
739 """Returns information about a single instance. 740 741 This is a single-node call. 742 743 @type node: list 744 @param node: the list of nodes to query 745 @type instance: string 746 @param instance: the instance name 747 @type hname: string 748 @param hname: the hypervisor type of the instance 749 750 """ 751 return self._SingleNodeCall(node, "instance_info", [instance, hname])
752 753 @_RpcTimeout(_TMO_NORMAL)
754 - def call_instance_migratable(self, node, instance):
755 """Checks whether the given instance can be migrated. 756 757 This is a single-node call. 758 759 @param node: the node to query 760 @type instance: L{objects.Instance} 761 @param instance: the instance to check 762 763 764 """ 765 return self._SingleNodeCall(node, "instance_migratable", 766 [self._InstDict(instance)])
767 768 @_RpcTimeout(_TMO_URGENT)
769 - def call_all_instances_info(self, node_list, hypervisor_list):
770 """Returns information about all instances on the given nodes. 771 772 This is a multi-node call. 773 774 @type node_list: list 775 @param node_list: the list of nodes to query 776 @type hypervisor_list: list 777 @param hypervisor_list: the hypervisors to query for instances 778 779 """ 780 return self._MultiNodeCall(node_list, "all_instances_info", 781 [hypervisor_list])
782 783 @_RpcTimeout(_TMO_URGENT)
784 - def call_instance_list(self, node_list, hypervisor_list):
785 """Returns the list of running instances on a given node. 786 787 This is a multi-node call. 788 789 @type node_list: list 790 @param node_list: the list of nodes to query 791 @type hypervisor_list: list 792 @param hypervisor_list: the hypervisors to query for instances 793 794 """ 795 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
796 797 @_RpcTimeout(_TMO_FAST)
798 - def call_node_tcp_ping(self, node, source, target, port, timeout, 799 live_port_needed):
800 """Do a TcpPing on the remote node 801 802 This is a single-node call. 803 804 """ 805 return self._SingleNodeCall(node, "node_tcp_ping", 806 [source, target, port, timeout, 807 live_port_needed])
808 809 @_RpcTimeout(_TMO_FAST)
810 - def call_node_has_ip_address(self, node, address):
811 """Checks if a node has the given IP address. 812 813 This is a single-node call. 814 815 """ 816 return self._SingleNodeCall(node, "node_has_ip_address", [address])
817 818 @_RpcTimeout(_TMO_URGENT)
819 - def call_node_info(self, node_list, vg_name, hypervisor_type):
820 """Return node information. 821 822 This will return memory information and volume group size and free 823 space. 824 825 This is a multi-node call. 826 827 @type node_list: list 828 @param node_list: the list of nodes to query 829 @type vg_name: C{string} 830 @param vg_name: the name of the volume group to ask for disk space 831 information 832 @type hypervisor_type: C{str} 833 @param hypervisor_type: the name of the hypervisor to ask for 834 memory information 835 836 """ 837 return self._MultiNodeCall(node_list, "node_info", 838 [vg_name, hypervisor_type])
839 840 @_RpcTimeout(_TMO_NORMAL)
841 - def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
842 """Add a node to the cluster. 843 844 This is a single-node call. 845 846 """ 847 return self._SingleNodeCall(node, "node_add", 848 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
849 850 @_RpcTimeout(_TMO_NORMAL)
851 - def call_node_verify(self, node_list, checkdict, cluster_name):
852 """Request verification of given parameters. 853 854 This is a multi-node call. 855 856 """ 857 return self._MultiNodeCall(node_list, "node_verify", 858 [checkdict, cluster_name])
859 860 @classmethod 861 @_RpcTimeout(_TMO_FAST)
862 - def call_node_start_master(cls, node, start_daemons, no_voting):
863 """Tells a node to activate itself as a master. 864 865 This is a single-node call. 866 867 """ 868 return cls._StaticSingleNodeCall(node, "node_start_master", 869 [start_daemons, no_voting])
870 871 @classmethod 872 @_RpcTimeout(_TMO_FAST)
873 - def call_node_stop_master(cls, node, stop_daemons):
874 """Tells a node to demote itself from master status. 875 876 This is a single-node call. 877 878 """ 879 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
880 881 @classmethod 882 @_RpcTimeout(_TMO_URGENT)
883 - def call_master_info(cls, node_list):
884 """Query master info. 885 886 This is a multi-node call. 887 888 """ 889 # TODO: should this method query down nodes? 890 return cls._StaticMultiNodeCall(node_list, "master_info", [])
891 892 @classmethod 893 @_RpcTimeout(_TMO_URGENT)
894 - def call_version(cls, node_list):
895 """Query node version. 896 897 This is a multi-node call. 898 899 """ 900 return cls._StaticMultiNodeCall(node_list, "version", [])
901 902 @_RpcTimeout(_TMO_NORMAL)
903 - def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
904 """Request creation of a given block device. 905 906 This is a single-node call. 907 908 """ 909 return self._SingleNodeCall(node, "blockdev_create", 910 [bdev.ToDict(), size, owner, on_primary, info])
911 912 @_RpcTimeout(_TMO_NORMAL)
913 - def call_blockdev_remove(self, node, bdev):
914 """Request removal of a given block device. 915 916 This is a single-node call. 917 918 """ 919 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
920 921 @_RpcTimeout(_TMO_NORMAL)
922 - def call_blockdev_rename(self, node, devlist):
923 """Request rename of the given block devices. 924 925 This is a single-node call. 926 927 """ 928 return self._SingleNodeCall(node, "blockdev_rename", 929 [(d.ToDict(), uid) for d, uid in devlist])
930 931 @_RpcTimeout(_TMO_NORMAL)
932 - def call_blockdev_assemble(self, node, disk, owner, on_primary):
933 """Request assembling of a given block device. 934 935 This is a single-node call. 936 937 """ 938 return self._SingleNodeCall(node, "blockdev_assemble", 939 [disk.ToDict(), owner, on_primary])
940 941 @_RpcTimeout(_TMO_NORMAL)
942 - def call_blockdev_shutdown(self, node, disk):
943 """Request shutdown of a given block device. 944 945 This is a single-node call. 946 947 """ 948 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
949 950 @_RpcTimeout(_TMO_NORMAL)
951 - def call_blockdev_addchildren(self, node, bdev, ndevs):
952 """Request adding a list of children to a (mirroring) device. 953 954 This is a single-node call. 955 956 """ 957 return self._SingleNodeCall(node, "blockdev_addchildren", 958 [bdev.ToDict(), 959 [disk.ToDict() for disk in ndevs]])
960 961 @_RpcTimeout(_TMO_NORMAL)
962 - def call_blockdev_removechildren(self, node, bdev, ndevs):
963 """Request removing a list of children from a (mirroring) device. 964 965 This is a single-node call. 966 967 """ 968 return self._SingleNodeCall(node, "blockdev_removechildren", 969 [bdev.ToDict(), 970 [disk.ToDict() for disk in ndevs]])
971 972 @_RpcTimeout(_TMO_NORMAL)
973 - def call_blockdev_getmirrorstatus(self, node, disks):
974 """Request status of a (mirroring) device. 975 976 This is a single-node call. 977 978 """ 979 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus", 980 [dsk.ToDict() for dsk in disks]) 981 if not result.fail_msg: 982 result.payload = [objects.BlockDevStatus.FromDict(i) 983 for i in result.payload] 984 return result
985 986 @_RpcTimeout(_TMO_NORMAL)
987 - def call_blockdev_find(self, node, disk):
988 """Request identification of a given block device. 989 990 This is a single-node call. 991 992 """ 993 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()]) 994 if not result.fail_msg and result.payload is not None: 995 result.payload = objects.BlockDevStatus.FromDict(result.payload) 996 return result
997 998 @_RpcTimeout(_TMO_NORMAL)
999 - def call_blockdev_close(self, node, instance_name, disks):
1000 """Closes the given block devices. 1001 1002 This is a single-node call. 1003 1004 """ 1005 params = [instance_name, [cf.ToDict() for cf in disks]] 1006 return self._SingleNodeCall(node, "blockdev_close", params)
1007 1008 @_RpcTimeout(_TMO_NORMAL)
1009 - def call_blockdev_getsizes(self, node, disks):
1010 """Returns the size of the given disks. 1011 1012 This is a single-node call. 1013 1014 """ 1015 params = [[cf.ToDict() for cf in disks]] 1016 return self._SingleNodeCall(node, "blockdev_getsize", params)
1017 1018 @_RpcTimeout(_TMO_NORMAL)
1019 - def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1020 """Disconnects the network of the given drbd devices. 1021 1022 This is a multi-node call. 1023 1024 """ 1025 return self._MultiNodeCall(node_list, "drbd_disconnect_net", 1026 [nodes_ip, [cf.ToDict() for cf in disks]])
1027 1028 @_RpcTimeout(_TMO_NORMAL)
1029 - def call_drbd_attach_net(self, node_list, nodes_ip, 1030 disks, instance_name, multimaster):
1031 """Disconnects the given drbd devices. 1032 1033 This is a multi-node call. 1034 1035 """ 1036 return self._MultiNodeCall(node_list, "drbd_attach_net", 1037 [nodes_ip, [cf.ToDict() for cf in disks], 1038 instance_name, multimaster])
1039 1040 @_RpcTimeout(_TMO_SLOW)
1041 - def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1042 """Waits for the synchronization of drbd devices is complete. 1043 1044 This is a multi-node call. 1045 1046 """ 1047 return self._MultiNodeCall(node_list, "drbd_wait_sync", 1048 [nodes_ip, [cf.ToDict() for cf in disks]])
1049 1050 @_RpcTimeout(_TMO_URGENT)
1051 - def call_drbd_helper(self, node_list):
1052 """Gets drbd helper. 1053 1054 This is a multi-node call. 1055 1056 """ 1057 return self._MultiNodeCall(node_list, "drbd_helper", [])
1058 1059 @classmethod 1060 @_RpcTimeout(_TMO_NORMAL)
1061 - def call_upload_file(cls, node_list, file_name, address_list=None):
1062 """Upload a file. 1063 1064 The node will refuse the operation in case the file is not on the 1065 approved file list. 1066 1067 This is a multi-node call. 1068 1069 @type node_list: list 1070 @param node_list: the list of node names to upload to 1071 @type file_name: str 1072 @param file_name: the filename to upload 1073 @type address_list: list or None 1074 @keyword address_list: an optional list of node addresses, in order 1075 to optimize the RPC speed 1076 1077 """ 1078 file_contents = utils.ReadFile(file_name) 1079 data = cls._Compress(file_contents) 1080 st = os.stat(file_name) 1081 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, 1082 st.st_atime, st.st_mtime] 1083 return cls._StaticMultiNodeCall(node_list, "upload_file", params, 1084 address_list=address_list)
1085 1086 @classmethod 1087 @_RpcTimeout(_TMO_NORMAL)
1088 - def call_write_ssconf_files(cls, node_list, values):
1089 """Write ssconf files. 1090 1091 This is a multi-node call. 1092 1093 """ 1094 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1095 1096 @_RpcTimeout(_TMO_FAST)
1097 - def call_os_diagnose(self, node_list):
1098 """Request a diagnose of OS definitions. 1099 1100 This is a multi-node call. 1101 1102 """ 1103 return self._MultiNodeCall(node_list, "os_diagnose", [])
1104 1105 @_RpcTimeout(_TMO_FAST)
1106 - def call_os_get(self, node, name):
1107 """Returns an OS definition. 1108 1109 This is a single-node call. 1110 1111 """ 1112 result = self._SingleNodeCall(node, "os_get", [name]) 1113 if not result.fail_msg and isinstance(result.payload, dict): 1114 result.payload = objects.OS.FromDict(result.payload) 1115 return result
1116 1117 @_RpcTimeout(_TMO_FAST)
1118 - def call_os_validate(self, required, nodes, name, checks, params):
1119 """Run a validation routine for a given OS. 1120 1121 This is a multi-node call. 1122 1123 """ 1124 return self._MultiNodeCall(nodes, "os_validate", 1125 [required, name, checks, params])
1126 1127 @_RpcTimeout(_TMO_NORMAL)
1128 - def call_hooks_runner(self, node_list, hpath, phase, env):
1129 """Call the hooks runner. 1130 1131 Args: 1132 - op: the OpCode instance 1133 - env: a dictionary with the environment 1134 1135 This is a multi-node call. 1136 1137 """ 1138 params = [hpath, phase, env] 1139 return self._MultiNodeCall(node_list, "hooks_runner", params)
1140 1141 @_RpcTimeout(_TMO_NORMAL)
1142 - def call_iallocator_runner(self, node, name, idata):
1143 """Call an iallocator on a remote node 1144 1145 Args: 1146 - name: the iallocator name 1147 - input: the json-encoded input string 1148 1149 This is a single-node call. 1150 1151 """ 1152 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1153 1154 @_RpcTimeout(_TMO_NORMAL)
1155 - def call_blockdev_grow(self, node, cf_bdev, amount):
1156 """Request a snapshot of the given block device. 1157 1158 This is a single-node call. 1159 1160 """ 1161 return self._SingleNodeCall(node, "blockdev_grow", 1162 [cf_bdev.ToDict(), amount])
1163 1164 @_RpcTimeout(_TMO_1DAY)
1165 - def call_blockdev_export(self, node, cf_bdev, 1166 dest_node, dest_path, cluster_name):
1167 """Export a given disk to another node. 1168 1169 This is a single-node call. 1170 1171 """ 1172 return self._SingleNodeCall(node, "blockdev_export", 1173 [cf_bdev.ToDict(), dest_node, dest_path, 1174 cluster_name])
1175 1176 @_RpcTimeout(_TMO_NORMAL)
1177 - def call_blockdev_snapshot(self, node, cf_bdev):
1178 """Request a snapshot of the given block device. 1179 1180 This is a single-node call. 1181 1182 """ 1183 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1184 1185 @_RpcTimeout(_TMO_NORMAL)
1186 - def call_finalize_export(self, node, instance, snap_disks):
1187 """Request the completion of an export operation. 1188 1189 This writes the export config file, etc. 1190 1191 This is a single-node call. 1192 1193 """ 1194 flat_disks = [] 1195 for disk in snap_disks: 1196 if isinstance(disk, bool): 1197 flat_disks.append(disk) 1198 else: 1199 flat_disks.append(disk.ToDict()) 1200 1201 return self._SingleNodeCall(node, "finalize_export", 1202 [self._InstDict(instance), flat_disks])
1203 1204 @_RpcTimeout(_TMO_FAST)
1205 - def call_export_info(self, node, path):
1206 """Queries the export information in a given path. 1207 1208 This is a single-node call. 1209 1210 """ 1211 return self._SingleNodeCall(node, "export_info", [path])
1212 1213 @_RpcTimeout(_TMO_FAST)
1214 - def call_export_list(self, node_list):
1215 """Gets the stored exports list. 1216 1217 This is a multi-node call. 1218 1219 """ 1220 return self._MultiNodeCall(node_list, "export_list", [])
1221 1222 @_RpcTimeout(_TMO_FAST)
1223 - def call_export_remove(self, node, export):
1224 """Requests removal of a given export. 1225 1226 This is a single-node call. 1227 1228 """ 1229 return self._SingleNodeCall(node, "export_remove", [export])
1230 1231 @classmethod 1232 @_RpcTimeout(_TMO_NORMAL)
1233 - def call_node_leave_cluster(cls, node, modify_ssh_setup):
1234 """Requests a node to clean the cluster information it has. 1235 1236 This will remove the configuration information from the ganeti data 1237 dir. 1238 1239 This is a single-node call. 1240 1241 """ 1242 return cls._StaticSingleNodeCall(node, "node_leave_cluster", 1243 [modify_ssh_setup])
1244 1245 @_RpcTimeout(_TMO_FAST)
1246 - def call_node_volumes(self, node_list):
1247 """Gets all volumes on node(s). 1248 1249 This is a multi-node call. 1250 1251 """ 1252 return self._MultiNodeCall(node_list, "node_volumes", [])
1253 1254 @_RpcTimeout(_TMO_FAST)
1255 - def call_node_demote_from_mc(self, node):
1256 """Demote a node from the master candidate role. 1257 1258 This is a single-node call. 1259 1260 """ 1261 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1262 1263 @_RpcTimeout(_TMO_NORMAL)
1264 - def call_node_powercycle(self, node, hypervisor):
1265 """Tries to powercycle a node. 1266 1267 This is a single-node call. 1268 1269 """ 1270 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1271 1272 @_RpcTimeout(None)
1273 - def call_test_delay(self, node_list, duration):
1274 """Sleep for a fixed time on given node(s). 1275 1276 This is a multi-node call. 1277 1278 """ 1279 return self._MultiNodeCall(node_list, "test_delay", [duration], 1280 read_timeout=int(duration + 5))
1281 1282 @_RpcTimeout(_TMO_FAST)
1283 - def call_file_storage_dir_create(self, node, file_storage_dir):
1284 """Create the given file storage directory. 1285 1286 This is a single-node call. 1287 1288 """ 1289 return self._SingleNodeCall(node, "file_storage_dir_create", 1290 [file_storage_dir])
1291 1292 @_RpcTimeout(_TMO_FAST)
1293 - def call_file_storage_dir_remove(self, node, file_storage_dir):
1294 """Remove the given file storage directory. 1295 1296 This is a single-node call. 1297 1298 """ 1299 return self._SingleNodeCall(node, "file_storage_dir_remove", 1300 [file_storage_dir])
1301 1302 @_RpcTimeout(_TMO_FAST)
1303 - def call_file_storage_dir_rename(self, node, old_file_storage_dir, 1304 new_file_storage_dir):
1305 """Rename file storage directory. 1306 1307 This is a single-node call. 1308 1309 """ 1310 return self._SingleNodeCall(node, "file_storage_dir_rename", 1311 [old_file_storage_dir, new_file_storage_dir])
1312 1313 @classmethod 1314 @_RpcTimeout(_TMO_FAST)
1315 - def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1316 """Update job queue. 1317 1318 This is a multi-node call. 1319 1320 """ 1321 return cls._StaticMultiNodeCall(node_list, "jobqueue_update", 1322 [file_name, cls._Compress(content)], 1323 address_list=address_list)
1324 1325 @classmethod 1326 @_RpcTimeout(_TMO_NORMAL)
1327 - def call_jobqueue_purge(cls, node):
1328 """Purge job queue. 1329 1330 This is a single-node call. 1331 1332 """ 1333 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1334 1335 @classmethod 1336 @_RpcTimeout(_TMO_FAST)
1337 - def call_jobqueue_rename(cls, node_list, address_list, rename):
1338 """Rename a job queue file. 1339 1340 This is a multi-node call. 1341 1342 """ 1343 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename, 1344 address_list=address_list)
1345 1346 @_RpcTimeout(_TMO_NORMAL)
1347 - def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1348 """Validate the hypervisor params. 1349 1350 This is a multi-node call. 1351 1352 @type node_list: list 1353 @param node_list: the list of nodes to query 1354 @type hvname: string 1355 @param hvname: the hypervisor name 1356 @type hvparams: dict 1357 @param hvparams: the hypervisor parameters to be validated 1358 1359 """ 1360 cluster = self._cfg.GetClusterInfo() 1361 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) 1362 return self._MultiNodeCall(node_list, "hypervisor_validate_params", 1363 [hvname, hv_full])
1364 1365 @_RpcTimeout(_TMO_NORMAL)
1366 - def call_x509_cert_create(self, node, validity):
1367 """Creates a new X509 certificate for SSL/TLS. 1368 1369 This is a single-node call. 1370 1371 @type validity: int 1372 @param validity: Validity in seconds 1373 1374 """ 1375 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1376 1377 @_RpcTimeout(_TMO_NORMAL)
1378 - def call_x509_cert_remove(self, node, name):
1379 """Removes a X509 certificate. 1380 1381 This is a single-node call. 1382 1383 @type name: string 1384 @param name: Certificate name 1385 1386 """ 1387 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1388 1389 @_RpcTimeout(_TMO_NORMAL)
1390 - def call_import_start(self, node, opts, instance, dest, dest_args):
1391 """Starts a listener for an import. 1392 1393 This is a single-node call. 1394 1395 @type node: string 1396 @param node: Node name 1397 @type instance: C{objects.Instance} 1398 @param instance: Instance object 1399 1400 """ 1401 return self._SingleNodeCall(node, "import_start", 1402 [opts.ToDict(), 1403 self._InstDict(instance), dest, 1404 _EncodeImportExportIO(dest, dest_args)])
1405 1406 @_RpcTimeout(_TMO_NORMAL)
1407 - def call_export_start(self, node, opts, host, port, 1408 instance, source, source_args):
1409 """Starts an export daemon. 1410 1411 This is a single-node call. 1412 1413 @type node: string 1414 @param node: Node name 1415 @type instance: C{objects.Instance} 1416 @param instance: Instance object 1417 1418 """ 1419 return self._SingleNodeCall(node, "export_start", 1420 [opts.ToDict(), host, port, 1421 self._InstDict(instance), source, 1422 _EncodeImportExportIO(source, source_args)])
1423 1424 @_RpcTimeout(_TMO_FAST)
1425 - def call_impexp_status(self, node, names):
1426 """Gets the status of an import or export. 1427 1428 This is a single-node call. 1429 1430 @type node: string 1431 @param node: Node name 1432 @type names: List of strings 1433 @param names: Import/export names 1434 @rtype: List of L{objects.ImportExportStatus} instances 1435 @return: Returns a list of the state of each named import/export or None if 1436 a status couldn't be retrieved 1437 1438 """ 1439 result = self._SingleNodeCall(node, "impexp_status", [names]) 1440 1441 if not result.fail_msg: 1442 decoded = [] 1443 1444 for i in result.payload: 1445 if i is None: 1446 decoded.append(None) 1447 continue 1448 decoded.append(objects.ImportExportStatus.FromDict(i)) 1449 1450 result.payload = decoded 1451 1452 return result
1453 1454 @_RpcTimeout(_TMO_NORMAL)
1455 - def call_impexp_abort(self, node, name):
1456 """Aborts an import or export. 1457 1458 This is a single-node call. 1459 1460 @type node: string 1461 @param node: Node name 1462 @type name: string 1463 @param name: Import/export name 1464 1465 """ 1466 return self._SingleNodeCall(node, "impexp_abort", [name])
1467 1468 @_RpcTimeout(_TMO_NORMAL)
1469 - def call_impexp_cleanup(self, node, name):
1470 """Cleans up after an import or export. 1471 1472 This is a single-node call. 1473 1474 @type node: string 1475 @param node: Node name 1476 @type name: string 1477 @param name: Import/export name 1478 1479 """ 1480 return self._SingleNodeCall(node, "impexp_cleanup", [name])
1481