Package ganeti :: Module rpc
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Inter-node RPC library. 
  23   
  24  """ 
  25   
  26  # pylint: disable-msg=C0103,R0201,R0904 
  27  # C0103: Invalid name, since call_ are not valid 
  28  # R0201: Method could be a function, we keep all rpcs instance methods 
  29  # as not to change them back and forth between static/instance methods 
  30  # if they need to start using instance attributes 
  31  # R0904: Too many public methods 
  32   
  33  import os 
  34  import logging 
  35  import zlib 
  36  import base64 
  37  import pycurl 
  38  import threading 
  39   
  40  from ganeti import utils 
  41  from ganeti import objects 
  42  from ganeti import http 
  43  from ganeti import serializer 
  44  from ganeti import constants 
  45  from ganeti import errors 
  46  from ganeti import netutils 
  47  from ganeti import ssconf 
  48  from ganeti import runtime 
  49   
  50  # pylint has a bug here, doesn't see this import 
  51  import ganeti.http.client  # pylint: disable-msg=W0611 
  52   
  53   
  54  # Timeout for connecting to nodes (seconds) 
  55  _RPC_CONNECT_TIMEOUT = 5 
  56   
  57  _RPC_CLIENT_HEADERS = [ 
  58    "Content-type: %s" % http.HTTP_APP_JSON, 
  59    "Expect:", 
  60    ] 
  61   
  62  # Various time constants for the timeout table 
  63  _TMO_URGENT = 60 # one minute 
  64  _TMO_FAST = 5 * 60 # five minutes 
  65  _TMO_NORMAL = 15 * 60 # 15 minutes 
  66  _TMO_SLOW = 3600 # one hour 
  67  _TMO_4HRS = 4 * 3600 
  68  _TMO_1DAY = 86400 
  69   
  70  # Timeout table that will be built later by decorators 
  71  # Guidelines for choosing timeouts: 
  72  # - call used during watcher: timeout -> 1min, _TMO_URGENT 
  73  # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST 
  74  # - other calls: 15 min, _TMO_NORMAL 
  75  # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts 
  76   
  77  _TIMEOUTS = { 
  78  } 
79 80 81 -def Init():
82 """Initializes the module-global HTTP client manager. 83 84 Must be called before using any RPC function and while exactly one thread is 85 running. 86 87 """ 88 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 89 # one thread running. This check is just a safety measure -- it doesn't 90 # cover all cases. 91 assert threading.activeCount() == 1, \ 92 "Found more than one active thread when initializing pycURL" 93 94 logging.info("Using PycURL %s", pycurl.version) 95 96 pycurl.global_init(pycurl.GLOBAL_ALL)
97
98 99 -def Shutdown():
100 """Stops the module-global HTTP client manager. 101 102 Must be called before quitting the program and while exactly one thread is 103 running. 104 105 """ 106 pycurl.global_cleanup()
107
108 109 -def _ConfigRpcCurl(curl):
110 noded_cert = str(constants.NODED_CERT_FILE) 111 112 curl.setopt(pycurl.FOLLOWLOCATION, False) 113 curl.setopt(pycurl.CAINFO, noded_cert) 114 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 115 curl.setopt(pycurl.SSL_VERIFYPEER, True) 116 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 117 curl.setopt(pycurl.SSLCERT, noded_cert) 118 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 119 curl.setopt(pycurl.SSLKEY, noded_cert) 120 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121 122 123 # Aliasing this module avoids the following warning by epydoc: "Warning: No 124 # information available for ganeti.rpc._RpcThreadLocal's base threading.local" 125 _threading = threading
126 127 128 -class _RpcThreadLocal(_threading.local):
129 - def GetHttpClientPool(self):
130 """Returns a per-thread HTTP client pool. 131 132 @rtype: L{http.client.HttpClientPool} 133 134 """ 135 try: 136 pool = self.hcp 137 except AttributeError: 138 pool = http.client.HttpClientPool(_ConfigRpcCurl) 139 self.hcp = pool 140 141 return pool
142 143 144 # Remove module alias (see above) 145 del _threading 146 147 148 _thread_local = _RpcThreadLocal()
149 150 151 -def _RpcTimeout(secs):
152 """Timeout decorator. 153 154 When applied to a rpc call_* function, it updates the global timeout 155 table with the given function/timeout. 156 157 """ 158 def decorator(f): 159 name = f.__name__ 160 assert name.startswith("call_") 161 _TIMEOUTS[name[len("call_"):]] = secs 162 return f
163 return decorator 164
165 166 -def RunWithRPC(fn):
167 """RPC-wrapper decorator. 168 169 When applied to a function, it runs it with the RPC system 170 initialized, and it shutsdown the system afterwards. This means the 171 function must be called without RPC being initialized. 172 173 """ 174 def wrapper(*args, **kwargs): 175 Init() 176 try: 177 return fn(*args, **kwargs) 178 finally: 179 Shutdown()
180 return wrapper 181
182 183 -class RpcResult(object):
184 """RPC Result class. 185 186 This class holds an RPC result. It is needed since in multi-node 187 calls we can't raise an exception just because one one out of many 188 failed, and therefore we use this class to encapsulate the result. 189 190 @ivar data: the data payload, for successful results, or None 191 @ivar call: the name of the RPC call 192 @ivar node: the name of the node to which we made the call 193 @ivar offline: whether the operation failed because the node was 194 offline, as opposed to actual failure; offline=True will always 195 imply failed=True, in order to allow simpler checking if 196 the user doesn't care about the exact failure mode 197 @ivar fail_msg: the error message if the call failed 198 199 """
200 - def __init__(self, data=None, failed=False, offline=False, 201 call=None, node=None):
202 self.offline = offline 203 self.call = call 204 self.node = node 205 206 if offline: 207 self.fail_msg = "Node is marked offline" 208 self.data = self.payload = None 209 elif failed: 210 self.fail_msg = self._EnsureErr(data) 211 self.data = self.payload = None 212 else: 213 self.data = data 214 if not isinstance(self.data, (tuple, list)): 215 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 216 type(self.data)) 217 self.payload = None 218 elif len(data) != 2: 219 self.fail_msg = ("RPC layer error: invalid result length (%d), " 220 "expected 2" % len(self.data)) 221 self.payload = None 222 elif not self.data[0]: 223 self.fail_msg = self._EnsureErr(self.data[1]) 224 self.payload = None 225 else: 226 # finally success 227 self.fail_msg = None 228 self.payload = data[1] 229 230 for attr_name in ["call", "data", "fail_msg", 231 "node", "offline", "payload"]: 232 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
233 234 @staticmethod
235 - def _EnsureErr(val):
236 """Helper to ensure we return a 'True' value for error.""" 237 if val: 238 return val 239 else: 240 return "No error information"
241
242 - def Raise(self, msg, prereq=False, ecode=None):
243 """If the result has failed, raise an OpExecError. 244 245 This is used so that LU code doesn't have to check for each 246 result, but instead can call this function. 247 248 """ 249 if not self.fail_msg: 250 return 251 252 if not msg: # one could pass None for default message 253 msg = ("Call '%s' to node '%s' has failed: %s" % 254 (self.call, self.node, self.fail_msg)) 255 else: 256 msg = "%s: %s" % (msg, self.fail_msg) 257 if prereq: 258 ec = errors.OpPrereqError 259 else: 260 ec = errors.OpExecError 261 if ecode is not None: 262 args = (msg, ecode) 263 else: 264 args = (msg, ) 265 raise ec(*args) # pylint: disable-msg=W0142
266
267 268 -def _AddressLookup(node_list, 269 ssc=ssconf.SimpleStore, 270 nslookup_fn=netutils.Hostname.GetIP):
271 """Return addresses for given node names. 272 273 @type node_list: list 274 @param node_list: List of node names 275 @type ssc: class 276 @param ssc: SimpleStore class that is used to obtain node->ip mappings 277 @type nslookup_fn: callable 278 @param nslookup_fn: function use to do NS lookup 279 @rtype: list of addresses and/or None's 280 @returns: List of corresponding addresses, if found 281 282 """ 283 ss = ssc() 284 iplist = ss.GetNodePrimaryIPList() 285 family = ss.GetPrimaryIPFamily() 286 addresses = [] 287 ipmap = dict(entry.split() for entry in iplist) 288 for node in node_list: 289 address = ipmap.get(node) 290 if address is None: 291 address = nslookup_fn(node, family=family) 292 addresses.append(address) 293 294 return addresses
295
296 297 -class Client:
298 """RPC Client class. 299 300 This class, given a (remote) method name, a list of parameters and a 301 list of nodes, will contact (in parallel) all nodes, and return a 302 dict of results (key: node name, value: result). 303 304 One current bug is that generic failure is still signaled by 305 'False' result, which is not good. This overloading of values can 306 cause bugs. 307 308 """
309 - def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
310 assert procedure in _TIMEOUTS, ("New RPC call not declared in the" 311 " timeouts table") 312 self.procedure = procedure 313 self.body = body 314 self.port = port 315 self._request = {} 316 self._address_lookup_fn = address_lookup_fn
317
318 - def ConnectList(self, node_list, address_list=None, read_timeout=None):
319 """Add a list of nodes to the target nodes. 320 321 @type node_list: list 322 @param node_list: the list of node names to connect 323 @type address_list: list or None 324 @keyword address_list: either None or a list with node addresses, 325 which must have the same length as the node list 326 @type read_timeout: int 327 @param read_timeout: overwrites default timeout for operation 328 329 """ 330 if address_list is None: 331 # Always use IP address instead of node name 332 address_list = self._address_lookup_fn(node_list) 333 334 assert len(node_list) == len(address_list), \ 335 "Name and address lists must have the same length" 336 337 for node, address in zip(node_list, address_list): 338 self.ConnectNode(node, address, read_timeout=read_timeout)
339
340 - def ConnectNode(self, name, address=None, read_timeout=None):
341 """Add a node to the target list. 342 343 @type name: str 344 @param name: the node name 345 @type address: str 346 @param address: the node address, if known 347 @type read_timeout: int 348 @param read_timeout: overwrites default timeout for operation 349 350 """ 351 if address is None: 352 # Always use IP address instead of node name 353 address = self._address_lookup_fn([name])[0] 354 355 assert(address is not None) 356 357 if read_timeout is None: 358 read_timeout = _TIMEOUTS[self.procedure] 359 360 self._request[name] = \ 361 http.client.HttpClientRequest(str(address), self.port, 362 http.HTTP_PUT, str("/%s" % self.procedure), 363 headers=_RPC_CLIENT_HEADERS, 364 post_data=str(self.body), 365 read_timeout=read_timeout)
366
367 - def GetResults(self, http_pool=None):
368 """Call nodes and return results. 369 370 @rtype: list 371 @return: List of RPC results 372 373 """ 374 if not http_pool: 375 http_pool = http.client.HttpClientPool(_ConfigRpcCurl) 376 377 http_pool.ProcessRequests(self._request.values()) 378 379 results = {} 380 381 for name, req in self._request.iteritems(): 382 if req.success and req.resp_status_code == http.HTTP_OK: 383 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body), 384 node=name, call=self.procedure) 385 continue 386 387 # TODO: Better error reporting 388 if req.error: 389 msg = req.error 390 else: 391 msg = req.resp_body 392 393 logging.error("RPC error in %s from node %s: %s", 394 self.procedure, name, msg) 395 results[name] = RpcResult(data=msg, failed=True, node=name, 396 call=self.procedure) 397 398 return results
399
400 401 -def _EncodeImportExportIO(ieio, ieioargs):
402 """Encodes import/export I/O information. 403 404 """ 405 if ieio == constants.IEIO_RAW_DISK: 406 assert len(ieioargs) == 1 407 return (ieioargs[0].ToDict(), ) 408 409 if ieio == constants.IEIO_SCRIPT: 410 assert len(ieioargs) == 2 411 return (ieioargs[0].ToDict(), ieioargs[1]) 412 413 return ieioargs
414
415 416 -class RpcRunner(object):
417 """RPC runner class""" 418
419 - def __init__(self, cfg):
420 """Initialized the rpc runner. 421 422 @type cfg: C{config.ConfigWriter} 423 @param cfg: the configuration object that will be used to get data 424 about the cluster 425 426 """ 427 self._cfg = cfg 428 self.port = netutils.GetDaemonPort(constants.NODED)
429
430 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431 """Convert the given instance to a dict. 432 433 This is done via the instance's ToDict() method and additionally 434 we fill the hvparams with the cluster defaults. 435 436 @type instance: L{objects.Instance} 437 @param instance: an Instance object 438 @type hvp: dict or None 439 @param hvp: a dictionary with overridden hypervisor parameters 440 @type bep: dict or None 441 @param bep: a dictionary with overridden backend parameters 442 @type osp: dict or None 443 @param osp: a dictionary with overridden os parameters 444 @rtype: dict 445 @return: the instance dict, with the hvparams filled with the 446 cluster defaults 447 448 """ 449 idict = instance.ToDict() 450 cluster = self._cfg.GetClusterInfo() 451 idict["hvparams"] = cluster.FillHV(instance) 452 if hvp is not None: 453 idict["hvparams"].update(hvp) 454 idict["beparams"] = cluster.FillBE(instance) 455 if bep is not None: 456 idict["beparams"].update(bep) 457 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 458 if osp is not None: 459 idict["osparams"].update(osp) 460 for nic in idict["nics"]: 461 nic['nicparams'] = objects.FillDict( 462 cluster.nicparams[constants.PP_DEFAULT], 463 nic['nicparams']) 464 return idict
465
466 - def _ConnectList(self, client, node_list, call, read_timeout=None):
467 """Helper for computing node addresses. 468 469 @type client: L{ganeti.rpc.Client} 470 @param client: a C{Client} instance 471 @type node_list: list 472 @param node_list: the node list we should connect 473 @type call: string 474 @param call: the name of the remote procedure call, for filling in 475 correctly any eventual offline nodes' results 476 @type read_timeout: int 477 @param read_timeout: overwrites the default read timeout for the 478 given operation 479 480 """ 481 all_nodes = self._cfg.GetAllNodesInfo() 482 name_list = [] 483 addr_list = [] 484 skip_dict = {} 485 for node in node_list: 486 if node in all_nodes: 487 if all_nodes[node].offline: 488 skip_dict[node] = RpcResult(node=node, offline=True, call=call) 489 continue 490 val = all_nodes[node].primary_ip 491 else: 492 val = None 493 addr_list.append(val) 494 name_list.append(node) 495 if name_list: 496 client.ConnectList(name_list, address_list=addr_list, 497 read_timeout=read_timeout) 498 return skip_dict
499
500 - def _ConnectNode(self, client, node, call, read_timeout=None):
501 """Helper for computing one node's address. 502 503 @type client: L{ganeti.rpc.Client} 504 @param client: a C{Client} instance 505 @type node: str 506 @param node: the node we should connect 507 @type call: string 508 @param call: the name of the remote procedure call, for filling in 509 correctly any eventual offline nodes' results 510 @type read_timeout: int 511 @param read_timeout: overwrites the default read timeout for the 512 given operation 513 514 """ 515 node_info = self._cfg.GetNodeInfo(node) 516 if node_info is not None: 517 if node_info.offline: 518 return RpcResult(node=node, offline=True, call=call) 519 addr = node_info.primary_ip 520 else: 521 addr = None 522 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523
524 - def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525 """Helper for making a multi-node call 526 527 """ 528 body = serializer.DumpJson(args, indent=False) 529 c = Client(procedure, body, self.port) 530 skip_dict = self._ConnectList(c, node_list, procedure, 531 read_timeout=read_timeout) 532 skip_dict.update(c.GetResults()) 533 return skip_dict
534 535 @classmethod
536 - def _StaticMultiNodeCall(cls, node_list, procedure, args, 537 address_list=None, read_timeout=None):
538 """Helper for making a multi-node static call 539 540 """ 541 body = serializer.DumpJson(args, indent=False) 542 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED)) 543 c.ConnectList(node_list, address_list=address_list, 544 read_timeout=read_timeout) 545 return c.GetResults()
546
547 - def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
548 """Helper for making a single-node call 549 550 """ 551 body = serializer.DumpJson(args, indent=False) 552 c = Client(procedure, body, self.port) 553 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout) 554 if result is None: 555 # we did connect, node is not offline 556 result = c.GetResults()[node] 557 return result
558 559 @classmethod
560 - def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
561 """Helper for making a single-node static call 562 563 """ 564 body = serializer.DumpJson(args, indent=False) 565 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED)) 566 c.ConnectNode(node, read_timeout=read_timeout) 567 return c.GetResults()[node]
568 569 @staticmethod
570 - def _Compress(data):
571 """Compresses a string for transport over RPC. 572 573 Small amounts of data are not compressed. 574 575 @type data: str 576 @param data: Data 577 @rtype: tuple 578 @return: Encoded data to send 579 580 """ 581 # Small amounts of data are not compressed 582 if len(data) < 512: 583 return (constants.RPC_ENCODING_NONE, data) 584 585 # Compress with zlib and encode in base64 586 return (constants.RPC_ENCODING_ZLIB_BASE64, 587 base64.b64encode(zlib.compress(data, 3)))
588 589 # 590 # Begin RPC calls 591 # 592 593 @_RpcTimeout(_TMO_URGENT)
594 - def call_lv_list(self, node_list, vg_name):
595 """Gets the logical volumes present in a given volume group. 596 597 This is a multi-node call. 598 599 """ 600 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
601 602 @_RpcTimeout(_TMO_URGENT)
603 - def call_vg_list(self, node_list):
604 """Gets the volume group list. 605 606 This is a multi-node call. 607 608 """ 609 return self._MultiNodeCall(node_list, "vg_list", [])
610 611 @_RpcTimeout(_TMO_NORMAL)
612 - def call_storage_list(self, node_list, su_name, su_args, name, fields):
613 """Get list of storage units. 614 615 This is a multi-node call. 616 617 """ 618 return self._MultiNodeCall(node_list, "storage_list", 619 [su_name, su_args, name, fields])
620 621 @_RpcTimeout(_TMO_NORMAL)
622 - def call_storage_modify(self, node, su_name, su_args, name, changes):
623 """Modify a storage unit. 624 625 This is a single-node call. 626 627 """ 628 return self._SingleNodeCall(node, "storage_modify", 629 [su_name, su_args, name, changes])
630 631 @_RpcTimeout(_TMO_NORMAL)
632 - def call_storage_execute(self, node, su_name, su_args, name, op):
633 """Executes an operation on a storage unit. 634 635 This is a single-node call. 636 637 """ 638 return self._SingleNodeCall(node, "storage_execute", 639 [su_name, su_args, name, op])
640 641 @_RpcTimeout(_TMO_URGENT)
642 - def call_bridges_exist(self, node, bridges_list):
643 """Checks if a node has all the bridges given. 644 645 This method checks if all bridges given in the bridges_list are 646 present on the remote node, so that an instance that uses interfaces 647 on those bridges can be started. 648 649 This is a single-node call. 650 651 """ 652 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
653 654 @_RpcTimeout(_TMO_NORMAL)
655 - def call_instance_start(self, node, instance, hvp, bep):
656 """Starts an instance. 657 658 This is a single-node call. 659 660 """ 661 idict = self._InstDict(instance, hvp=hvp, bep=bep) 662 return self._SingleNodeCall(node, "instance_start", [idict])
663 664 @_RpcTimeout(_TMO_NORMAL)
665 - def call_instance_shutdown(self, node, instance, timeout):
666 """Stops an instance. 667 668 This is a single-node call. 669 670 """ 671 return self._SingleNodeCall(node, "instance_shutdown", 672 [self._InstDict(instance), timeout])
673 674 @_RpcTimeout(_TMO_NORMAL)
675 - def call_migration_info(self, node, instance):
676 """Gather the information necessary to prepare an instance migration. 677 678 This is a single-node call. 679 680 @type node: string 681 @param node: the node on which the instance is currently running 682 @type instance: C{objects.Instance} 683 @param instance: the instance definition 684 685 """ 686 return self._SingleNodeCall(node, "migration_info", 687 [self._InstDict(instance)])
688 689 @_RpcTimeout(_TMO_NORMAL)
690 - def call_accept_instance(self, node, instance, info, target):
691 """Prepare a node to accept an instance. 692 693 This is a single-node call. 694 695 @type node: string 696 @param node: the target node for the migration 697 @type instance: C{objects.Instance} 698 @param instance: the instance definition 699 @type info: opaque/hypervisor specific (string/data) 700 @param info: result for the call_migration_info call 701 @type target: string 702 @param target: target hostname (usually ip address) (on the node itself) 703 704 """ 705 return self._SingleNodeCall(node, "accept_instance", 706 [self._InstDict(instance), info, target])
707 708 @_RpcTimeout(_TMO_NORMAL)
709 - def call_finalize_migration(self, node, instance, info, success):
710 """Finalize any target-node migration specific operation. 711 712 This is called both in case of a successful migration and in case of error 713 (in which case it should abort the migration). 714 715 This is a single-node call. 716 717 @type node: string 718 @param node: the target node for the migration 719 @type instance: C{objects.Instance} 720 @param instance: the instance definition 721 @type info: opaque/hypervisor specific (string/data) 722 @param info: result for the call_migration_info call 723 @type success: boolean 724 @param success: whether the migration was a success or a failure 725 726 """ 727 return self._SingleNodeCall(node, "finalize_migration", 728 [self._InstDict(instance), info, success])
729 730 @_RpcTimeout(_TMO_SLOW)
731 - def call_instance_migrate(self, node, instance, target, live):
732 """Migrate an instance. 733 734 This is a single-node call. 735 736 @type node: string 737 @param node: the node on which the instance is currently running 738 @type instance: C{objects.Instance} 739 @param instance: the instance definition 740 @type target: string 741 @param target: the target node name 742 @type live: boolean 743 @param live: whether the migration should be done live or not (the 744 interpretation of this parameter is left to the hypervisor) 745 746 """ 747 return self._SingleNodeCall(node, "instance_migrate", 748 [self._InstDict(instance), target, live])
749 750 @_RpcTimeout(_TMO_NORMAL)
751 - def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
752 """Reboots an instance. 753 754 This is a single-node call. 755 756 """ 757 return self._SingleNodeCall(node, "instance_reboot", 758 [self._InstDict(inst), reboot_type, 759 shutdown_timeout])
760 761 @_RpcTimeout(_TMO_1DAY)
762 - def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
763 """Installs an OS on the given instance. 764 765 This is a single-node call. 766 767 """ 768 return self._SingleNodeCall(node, "instance_os_add", 769 [self._InstDict(inst, osp=osparams), 770 reinstall, debug])
771 772 @_RpcTimeout(_TMO_SLOW)
773 - def call_instance_run_rename(self, node, inst, old_name, debug):
774 """Run the OS rename script for an instance. 775 776 This is a single-node call. 777 778 """ 779 return self._SingleNodeCall(node, "instance_run_rename", 780 [self._InstDict(inst), old_name, debug])
781 782 @_RpcTimeout(_TMO_URGENT)
783 - def call_instance_info(self, node, instance, hname):
784 """Returns information about a single instance. 785 786 This is a single-node call. 787 788 @type node: list 789 @param node: the list of nodes to query 790 @type instance: string 791 @param instance: the instance name 792 @type hname: string 793 @param hname: the hypervisor type of the instance 794 795 """ 796 return self._SingleNodeCall(node, "instance_info", [instance, hname])
797 798 @_RpcTimeout(_TMO_NORMAL)
799 - def call_instance_migratable(self, node, instance):
800 """Checks whether the given instance can be migrated. 801 802 This is a single-node call. 803 804 @param node: the node to query 805 @type instance: L{objects.Instance} 806 @param instance: the instance to check 807 808 809 """ 810 return self._SingleNodeCall(node, "instance_migratable", 811 [self._InstDict(instance)])
812 813 @_RpcTimeout(_TMO_URGENT)
814 - def call_all_instances_info(self, node_list, hypervisor_list):
815 """Returns information about all instances on the given nodes. 816 817 This is a multi-node call. 818 819 @type node_list: list 820 @param node_list: the list of nodes to query 821 @type hypervisor_list: list 822 @param hypervisor_list: the hypervisors to query for instances 823 824 """ 825 return self._MultiNodeCall(node_list, "all_instances_info", 826 [hypervisor_list])
827 828 @_RpcTimeout(_TMO_URGENT)
829 - def call_instance_list(self, node_list, hypervisor_list):
830 """Returns the list of running instances on a given node. 831 832 This is a multi-node call. 833 834 @type node_list: list 835 @param node_list: the list of nodes to query 836 @type hypervisor_list: list 837 @param hypervisor_list: the hypervisors to query for instances 838 839 """ 840 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
841 842 @_RpcTimeout(_TMO_FAST)
843 - def call_node_tcp_ping(self, node, source, target, port, timeout, 844 live_port_needed):
845 """Do a TcpPing on the remote node 846 847 This is a single-node call. 848 849 """ 850 return self._SingleNodeCall(node, "node_tcp_ping", 851 [source, target, port, timeout, 852 live_port_needed])
853 854 @_RpcTimeout(_TMO_FAST)
855 - def call_node_has_ip_address(self, node, address):
856 """Checks if a node has the given IP address. 857 858 This is a single-node call. 859 860 """ 861 return self._SingleNodeCall(node, "node_has_ip_address", [address])
862 863 @_RpcTimeout(_TMO_URGENT)
864 - def call_node_info(self, node_list, vg_name, hypervisor_type):
865 """Return node information. 866 867 This will return memory information and volume group size and free 868 space. 869 870 This is a multi-node call. 871 872 @type node_list: list 873 @param node_list: the list of nodes to query 874 @type vg_name: C{string} 875 @param vg_name: the name of the volume group to ask for disk space 876 information 877 @type hypervisor_type: C{str} 878 @param hypervisor_type: the name of the hypervisor to ask for 879 memory information 880 881 """ 882 return self._MultiNodeCall(node_list, "node_info", 883 [vg_name, hypervisor_type])
884 885 @_RpcTimeout(_TMO_NORMAL)
886 - def call_etc_hosts_modify(self, node, mode, name, ip):
887 """Modify hosts file with name 888 889 @type node: string 890 @param node: The node to call 891 @type mode: string 892 @param mode: The mode to operate. Currently "add" or "remove" 893 @type name: string 894 @param name: The host name to be modified 895 @type ip: string 896 @param ip: The ip of the entry (just valid if mode is "add") 897 898 """ 899 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
900 901 @_RpcTimeout(_TMO_NORMAL)
902 - def call_node_verify(self, node_list, checkdict, cluster_name):
903 """Request verification of given parameters. 904 905 This is a multi-node call. 906 907 """ 908 return self._MultiNodeCall(node_list, "node_verify", 909 [checkdict, cluster_name])
910 911 @classmethod 912 @_RpcTimeout(_TMO_FAST)
913 - def call_node_start_master(cls, node, start_daemons, no_voting):
914 """Tells a node to activate itself as a master. 915 916 This is a single-node call. 917 918 """ 919 return cls._StaticSingleNodeCall(node, "node_start_master", 920 [start_daemons, no_voting])
921 922 @classmethod 923 @_RpcTimeout(_TMO_FAST)
924 - def call_node_stop_master(cls, node, stop_daemons):
925 """Tells a node to demote itself from master status. 926 927 This is a single-node call. 928 929 """ 930 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
931 932 @classmethod 933 @_RpcTimeout(_TMO_URGENT)
934 - def call_master_info(cls, node_list):
935 """Query master info. 936 937 This is a multi-node call. 938 939 """ 940 # TODO: should this method query down nodes? 941 return cls._StaticMultiNodeCall(node_list, "master_info", [])
942 943 @classmethod 944 @_RpcTimeout(_TMO_URGENT)
945 - def call_version(cls, node_list):
946 """Query node version. 947 948 This is a multi-node call. 949 950 """ 951 return cls._StaticMultiNodeCall(node_list, "version", [])
952 953 @_RpcTimeout(_TMO_NORMAL)
954 - def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
955 """Request creation of a given block device. 956 957 This is a single-node call. 958 959 """ 960 return self._SingleNodeCall(node, "blockdev_create", 961 [bdev.ToDict(), size, owner, on_primary, info])
962 963 @_RpcTimeout(_TMO_SLOW)
964 - def call_blockdev_wipe(self, node, bdev, offset, size):
965 """Request wipe at given offset with given size of a block device. 966 967 This is a single-node call. 968 969 """ 970 return self._SingleNodeCall(node, "blockdev_wipe", 971 [bdev.ToDict(), offset, size])
972 973 @_RpcTimeout(_TMO_NORMAL)
974 - def call_blockdev_remove(self, node, bdev):
975 """Request removal of a given block device. 976 977 This is a single-node call. 978 979 """ 980 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
981 982 @_RpcTimeout(_TMO_NORMAL)
983 - def call_blockdev_rename(self, node, devlist):
984 """Request rename of the given block devices. 985 986 This is a single-node call. 987 988 """ 989 return self._SingleNodeCall(node, "blockdev_rename", 990 [(d.ToDict(), uid) for d, uid in devlist])
991 992 @_RpcTimeout(_TMO_NORMAL)
993 - def call_blockdev_pause_resume_sync(self, node, disks, pause):
994 """Request a pause/resume of given block device. 995 996 This is a single-node call. 997 998 """ 999 return self._SingleNodeCall(node, "blockdev_pause_resume_sync", 1000 [[bdev.ToDict() for bdev in disks], pause])
1001 1002 @_RpcTimeout(_TMO_NORMAL)
1003 - def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1004 """Request assembling of a given block device. 1005 1006 This is a single-node call. 1007 1008 """ 1009 return self._SingleNodeCall(node, "blockdev_assemble", 1010 [disk.ToDict(), owner, on_primary, idx])
1011 1012 @_RpcTimeout(_TMO_NORMAL)
1013 - def call_blockdev_shutdown(self, node, disk):
1014 """Request shutdown of a given block device. 1015 1016 This is a single-node call. 1017 1018 """ 1019 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1020 1021 @_RpcTimeout(_TMO_NORMAL)
1022 - def call_blockdev_addchildren(self, node, bdev, ndevs):
1023 """Request adding a list of children to a (mirroring) device. 1024 1025 This is a single-node call. 1026 1027 """ 1028 return self._SingleNodeCall(node, "blockdev_addchildren", 1029 [bdev.ToDict(), 1030 [disk.ToDict() for disk in ndevs]])
1031 1032 @_RpcTimeout(_TMO_NORMAL)
1033 - def call_blockdev_removechildren(self, node, bdev, ndevs):
1034 """Request removing a list of children from a (mirroring) device. 1035 1036 This is a single-node call. 1037 1038 """ 1039 return self._SingleNodeCall(node, "blockdev_removechildren", 1040 [bdev.ToDict(), 1041 [disk.ToDict() for disk in ndevs]])
1042 1043 @_RpcTimeout(_TMO_NORMAL)
1044 - def call_blockdev_getmirrorstatus(self, node, disks):
1045 """Request status of a (mirroring) device. 1046 1047 This is a single-node call. 1048 1049 """ 1050 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus", 1051 [dsk.ToDict() for dsk in disks]) 1052 if not result.fail_msg: 1053 result.payload = [objects.BlockDevStatus.FromDict(i) 1054 for i in result.payload] 1055 return result
1056 1057 @_RpcTimeout(_TMO_NORMAL)
1058 - def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1059 """Request status of (mirroring) devices from multiple nodes. 1060 1061 This is a multi-node call. 1062 1063 """ 1064 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi", 1065 [dict((name, [dsk.ToDict() for dsk in disks]) 1066 for name, disks in node_disks.items())]) 1067 for nres in result.values(): 1068 if nres.fail_msg: 1069 continue 1070 1071 for idx, (success, status) in enumerate(nres.payload): 1072 if success: 1073 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) 1074 1075 return result
1076 1077 @_RpcTimeout(_TMO_NORMAL)
1078 - def call_blockdev_find(self, node, disk):
1079 """Request identification of a given block device. 1080 1081 This is a single-node call. 1082 1083 """ 1084 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()]) 1085 if not result.fail_msg and result.payload is not None: 1086 result.payload = objects.BlockDevStatus.FromDict(result.payload) 1087 return result
1088 1089 @_RpcTimeout(_TMO_NORMAL)
1090 - def call_blockdev_close(self, node, instance_name, disks):
1091 """Closes the given block devices. 1092 1093 This is a single-node call. 1094 1095 """ 1096 params = [instance_name, [cf.ToDict() for cf in disks]] 1097 return self._SingleNodeCall(node, "blockdev_close", params)
1098 1099 @_RpcTimeout(_TMO_NORMAL)
1100 - def call_blockdev_getsize(self, node, disks):
1101 """Returns the size of the given disks. 1102 1103 This is a single-node call. 1104 1105 """ 1106 params = [[cf.ToDict() for cf in disks]] 1107 return self._SingleNodeCall(node, "blockdev_getsize", params)
1108 1109 @_RpcTimeout(_TMO_NORMAL)
1110 - def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1111 """Disconnects the network of the given drbd devices. 1112 1113 This is a multi-node call. 1114 1115 """ 1116 return self._MultiNodeCall(node_list, "drbd_disconnect_net", 1117 [nodes_ip, [cf.ToDict() for cf in disks]])
1118 1119 @_RpcTimeout(_TMO_NORMAL)
1120 - def call_drbd_attach_net(self, node_list, nodes_ip, 1121 disks, instance_name, multimaster):
1122 """Disconnects the given drbd devices. 1123 1124 This is a multi-node call. 1125 1126 """ 1127 return self._MultiNodeCall(node_list, "drbd_attach_net", 1128 [nodes_ip, [cf.ToDict() for cf in disks], 1129 instance_name, multimaster])
1130 1131 @_RpcTimeout(_TMO_SLOW)
1132 - def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1133 """Waits for the synchronization of drbd devices is complete. 1134 1135 This is a multi-node call. 1136 1137 """ 1138 return self._MultiNodeCall(node_list, "drbd_wait_sync", 1139 [nodes_ip, [cf.ToDict() for cf in disks]])
1140 1141 @_RpcTimeout(_TMO_URGENT)
1142 - def call_drbd_helper(self, node_list):
1143 """Gets drbd helper. 1144 1145 This is a multi-node call. 1146 1147 """ 1148 return self._MultiNodeCall(node_list, "drbd_helper", [])
1149 1150 @classmethod 1151 @_RpcTimeout(_TMO_NORMAL)
1152 - def call_upload_file(cls, node_list, file_name, address_list=None):
1153 """Upload a file. 1154 1155 The node will refuse the operation in case the file is not on the 1156 approved file list. 1157 1158 This is a multi-node call. 1159 1160 @type node_list: list 1161 @param node_list: the list of node names to upload to 1162 @type file_name: str 1163 @param file_name: the filename to upload 1164 @type address_list: list or None 1165 @keyword address_list: an optional list of node addresses, in order 1166 to optimize the RPC speed 1167 1168 """ 1169 file_contents = utils.ReadFile(file_name) 1170 data = cls._Compress(file_contents) 1171 st = os.stat(file_name) 1172 getents = runtime.GetEnts() 1173 params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid), 1174 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] 1175 return cls._StaticMultiNodeCall(node_list, "upload_file", params, 1176 address_list=address_list)
1177 1178 @classmethod 1179 @_RpcTimeout(_TMO_NORMAL)
1180 - def call_write_ssconf_files(cls, node_list, values):
1181 """Write ssconf files. 1182 1183 This is a multi-node call. 1184 1185 """ 1186 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1187 1188 @_RpcTimeout(_TMO_NORMAL)
1189 - def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1190 """Runs OOB. 1191 1192 This is a single-node call. 1193 1194 """ 1195 return self._SingleNodeCall(node, "run_oob", [oob_program, command, 1196 remote_node, timeout])
1197 1198 @_RpcTimeout(_TMO_FAST)
1199 - def call_os_diagnose(self, node_list):
1200 """Request a diagnose of OS definitions. 1201 1202 This is a multi-node call. 1203 1204 """ 1205 return self._MultiNodeCall(node_list, "os_diagnose", [])
1206 1207 @_RpcTimeout(_TMO_FAST)
1208 - def call_os_get(self, node, name):
1209 """Returns an OS definition. 1210 1211 This is a single-node call. 1212 1213 """ 1214 result = self._SingleNodeCall(node, "os_get", [name]) 1215 if not result.fail_msg and isinstance(result.payload, dict): 1216 result.payload = objects.OS.FromDict(result.payload) 1217 return result
1218 1219 @_RpcTimeout(_TMO_FAST)
1220 - def call_os_validate(self, required, nodes, name, checks, params):
1221 """Run a validation routine for a given OS. 1222 1223 This is a multi-node call. 1224 1225 """ 1226 return self._MultiNodeCall(nodes, "os_validate", 1227 [required, name, checks, params])
1228 1229 @_RpcTimeout(_TMO_NORMAL)
1230 - def call_hooks_runner(self, node_list, hpath, phase, env):
1231 """Call the hooks runner. 1232 1233 Args: 1234 - op: the OpCode instance 1235 - env: a dictionary with the environment 1236 1237 This is a multi-node call. 1238 1239 """ 1240 params = [hpath, phase, env] 1241 return self._MultiNodeCall(node_list, "hooks_runner", params)
1242 1243 @_RpcTimeout(_TMO_NORMAL)
1244 - def call_iallocator_runner(self, node, name, idata):
1245 """Call an iallocator on a remote node 1246 1247 Args: 1248 - name: the iallocator name 1249 - input: the json-encoded input string 1250 1251 This is a single-node call. 1252 1253 """ 1254 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1255 1256 @_RpcTimeout(_TMO_NORMAL)
1257 - def call_blockdev_grow(self, node, cf_bdev, amount):
1258 """Request a snapshot of the given block device. 1259 1260 This is a single-node call. 1261 1262 """ 1263 return self._SingleNodeCall(node, "blockdev_grow", 1264 [cf_bdev.ToDict(), amount])
1265 1266 @_RpcTimeout(_TMO_1DAY)
1267 - def call_blockdev_export(self, node, cf_bdev, 1268 dest_node, dest_path, cluster_name):
1269 """Export a given disk to another node. 1270 1271 This is a single-node call. 1272 1273 """ 1274 return self._SingleNodeCall(node, "blockdev_export", 1275 [cf_bdev.ToDict(), dest_node, dest_path, 1276 cluster_name])
1277 1278 @_RpcTimeout(_TMO_NORMAL)
1279 - def call_blockdev_snapshot(self, node, cf_bdev):
1280 """Request a snapshot of the given block device. 1281 1282 This is a single-node call. 1283 1284 """ 1285 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1286 1287 @_RpcTimeout(_TMO_NORMAL)
1288 - def call_finalize_export(self, node, instance, snap_disks):
1289 """Request the completion of an export operation. 1290 1291 This writes the export config file, etc. 1292 1293 This is a single-node call. 1294 1295 """ 1296 flat_disks = [] 1297 for disk in snap_disks: 1298 if isinstance(disk, bool): 1299 flat_disks.append(disk) 1300 else: 1301 flat_disks.append(disk.ToDict()) 1302 1303 return self._SingleNodeCall(node, "finalize_export", 1304 [self._InstDict(instance), flat_disks])
1305 1306 @_RpcTimeout(_TMO_FAST)
1307 - def call_export_info(self, node, path):
1308 """Queries the export information in a given path. 1309 1310 This is a single-node call. 1311 1312 """ 1313 return self._SingleNodeCall(node, "export_info", [path])
1314 1315 @_RpcTimeout(_TMO_FAST)
1316 - def call_export_list(self, node_list):
1317 """Gets the stored exports list. 1318 1319 This is a multi-node call. 1320 1321 """ 1322 return self._MultiNodeCall(node_list, "export_list", [])
1323 1324 @_RpcTimeout(_TMO_FAST)
1325 - def call_export_remove(self, node, export):
1326 """Requests removal of a given export. 1327 1328 This is a single-node call. 1329 1330 """ 1331 return self._SingleNodeCall(node, "export_remove", [export])
1332 1333 @classmethod 1334 @_RpcTimeout(_TMO_NORMAL)
1335 - def call_node_leave_cluster(cls, node, modify_ssh_setup):
1336 """Requests a node to clean the cluster information it has. 1337 1338 This will remove the configuration information from the ganeti data 1339 dir. 1340 1341 This is a single-node call. 1342 1343 """ 1344 return cls._StaticSingleNodeCall(node, "node_leave_cluster", 1345 [modify_ssh_setup])
1346 1347 @_RpcTimeout(_TMO_FAST)
1348 - def call_node_volumes(self, node_list):
1349 """Gets all volumes on node(s). 1350 1351 This is a multi-node call. 1352 1353 """ 1354 return self._MultiNodeCall(node_list, "node_volumes", [])
1355 1356 @_RpcTimeout(_TMO_FAST)
1357 - def call_node_demote_from_mc(self, node):
1358 """Demote a node from the master candidate role. 1359 1360 This is a single-node call. 1361 1362 """ 1363 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1364 1365 @_RpcTimeout(_TMO_NORMAL)
1366 - def call_node_powercycle(self, node, hypervisor):
1367 """Tries to powercycle a node. 1368 1369 This is a single-node call. 1370 1371 """ 1372 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1373 1374 @_RpcTimeout(None)
1375 - def call_test_delay(self, node_list, duration):
1376 """Sleep for a fixed time on given node(s). 1377 1378 This is a multi-node call. 1379 1380 """ 1381 return self._MultiNodeCall(node_list, "test_delay", [duration], 1382 read_timeout=int(duration + 5))
1383 1384 @_RpcTimeout(_TMO_FAST)
1385 - def call_file_storage_dir_create(self, node, file_storage_dir):
1386 """Create the given file storage directory. 1387 1388 This is a single-node call. 1389 1390 """ 1391 return self._SingleNodeCall(node, "file_storage_dir_create", 1392 [file_storage_dir])
1393 1394 @_RpcTimeout(_TMO_FAST)
1395 - def call_file_storage_dir_remove(self, node, file_storage_dir):
1396 """Remove the given file storage directory. 1397 1398 This is a single-node call. 1399 1400 """ 1401 return self._SingleNodeCall(node, "file_storage_dir_remove", 1402 [file_storage_dir])
1403 1404 @_RpcTimeout(_TMO_FAST)
1405 - def call_file_storage_dir_rename(self, node, old_file_storage_dir, 1406 new_file_storage_dir):
1407 """Rename file storage directory. 1408 1409 This is a single-node call. 1410 1411 """ 1412 return self._SingleNodeCall(node, "file_storage_dir_rename", 1413 [old_file_storage_dir, new_file_storage_dir])
1414 1415 @classmethod 1416 @_RpcTimeout(_TMO_URGENT)
1417 - def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1418 """Update job queue. 1419 1420 This is a multi-node call. 1421 1422 """ 1423 return cls._StaticMultiNodeCall(node_list, "jobqueue_update", 1424 [file_name, cls._Compress(content)], 1425 address_list=address_list)
1426 1427 @classmethod 1428 @_RpcTimeout(_TMO_NORMAL)
1429 - def call_jobqueue_purge(cls, node):
1430 """Purge job queue. 1431 1432 This is a single-node call. 1433 1434 """ 1435 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1436 1437 @classmethod 1438 @_RpcTimeout(_TMO_URGENT)
1439 - def call_jobqueue_rename(cls, node_list, address_list, rename):
1440 """Rename a job queue file. 1441 1442 This is a multi-node call. 1443 1444 """ 1445 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename, 1446 address_list=address_list)
1447 1448 @_RpcTimeout(_TMO_NORMAL)
1449 - def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1450 """Validate the hypervisor params. 1451 1452 This is a multi-node call. 1453 1454 @type node_list: list 1455 @param node_list: the list of nodes to query 1456 @type hvname: string 1457 @param hvname: the hypervisor name 1458 @type hvparams: dict 1459 @param hvparams: the hypervisor parameters to be validated 1460 1461 """ 1462 cluster = self._cfg.GetClusterInfo() 1463 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) 1464 return self._MultiNodeCall(node_list, "hypervisor_validate_params", 1465 [hvname, hv_full])
1466 1467 @_RpcTimeout(_TMO_NORMAL)
1468 - def call_x509_cert_create(self, node, validity):
1469 """Creates a new X509 certificate for SSL/TLS. 1470 1471 This is a single-node call. 1472 1473 @type validity: int 1474 @param validity: Validity in seconds 1475 1476 """ 1477 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1478 1479 @_RpcTimeout(_TMO_NORMAL)
1480 - def call_x509_cert_remove(self, node, name):
1481 """Removes a X509 certificate. 1482 1483 This is a single-node call. 1484 1485 @type name: string 1486 @param name: Certificate name 1487 1488 """ 1489 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1490 1491 @_RpcTimeout(_TMO_NORMAL)
1492 - def call_import_start(self, node, opts, instance, dest, dest_args):
1493 """Starts a listener for an import. 1494 1495 This is a single-node call. 1496 1497 @type node: string 1498 @param node: Node name 1499 @type instance: C{objects.Instance} 1500 @param instance: Instance object 1501 1502 """ 1503 return self._SingleNodeCall(node, "import_start", 1504 [opts.ToDict(), 1505 self._InstDict(instance), dest, 1506 _EncodeImportExportIO(dest, dest_args)])
1507 1508 @_RpcTimeout(_TMO_NORMAL)
1509 - def call_export_start(self, node, opts, host, port, 1510 instance, source, source_args):
1511 """Starts an export daemon. 1512 1513 This is a single-node call. 1514 1515 @type node: string 1516 @param node: Node name 1517 @type instance: C{objects.Instance} 1518 @param instance: Instance object 1519 1520 """ 1521 return self._SingleNodeCall(node, "export_start", 1522 [opts.ToDict(), host, port, 1523 self._InstDict(instance), source, 1524 _EncodeImportExportIO(source, source_args)])
1525 1526 @_RpcTimeout(_TMO_FAST)
1527 - def call_impexp_status(self, node, names):
1528 """Gets the status of an import or export. 1529 1530 This is a single-node call. 1531 1532 @type node: string 1533 @param node: Node name 1534 @type names: List of strings 1535 @param names: Import/export names 1536 @rtype: List of L{objects.ImportExportStatus} instances 1537 @return: Returns a list of the state of each named import/export or None if 1538 a status couldn't be retrieved 1539 1540 """ 1541 result = self._SingleNodeCall(node, "impexp_status", [names]) 1542 1543 if not result.fail_msg: 1544 decoded = [] 1545 1546 for i in result.payload: 1547 if i is None: 1548 decoded.append(None) 1549 continue 1550 decoded.append(objects.ImportExportStatus.FromDict(i)) 1551 1552 result.payload = decoded 1553 1554 return result
1555 1556 @_RpcTimeout(_TMO_NORMAL)
1557 - def call_impexp_abort(self, node, name):
1558 """Aborts an import or export. 1559 1560 This is a single-node call. 1561 1562 @type node: string 1563 @param node: Node name 1564 @type name: string 1565 @param name: Import/export name 1566 1567 """ 1568 return self._SingleNodeCall(node, "impexp_abort", [name])
1569 1570 @_RpcTimeout(_TMO_NORMAL)
1571 - def call_impexp_cleanup(self, node, name):
1572 """Cleans up after an import or export. 1573 1574 This is a single-node call. 1575 1576 @type node: string 1577 @param node: Node name 1578 @type name: string 1579 @param name: Import/export name 1580 1581 """ 1582 return self._SingleNodeCall(node, "impexp_cleanup", [name])
1583