Package ganeti :: Module rpc
[hide private]
[frames] | no frames]

Source Code for Module ganeti.rpc

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Inter-node RPC library. 
  23   
  24  """ 
  25   
  26  # pylint: disable-msg=C0103,R0201,R0904 
  27  # C0103: Invalid name, since call_ are not valid 
  28  # R0201: Method could be a function, we keep all rpcs instance methods 
  29  # as not to change them back and forth between static/instance methods 
  30  # if they need to start using instance attributes 
  31  # R0904: Too many public methods 
  32   
  33  import os 
  34  import logging 
  35  import zlib 
  36  import base64 
  37  import pycurl 
  38  import threading 
  39   
  40  from ganeti import utils 
  41  from ganeti import objects 
  42  from ganeti import http 
  43  from ganeti import serializer 
  44  from ganeti import constants 
  45  from ganeti import errors 
  46  from ganeti import netutils 
  47  from ganeti import ssconf 
  48   
  49  # pylint has a bug here, doesn't see this import 
  50  import ganeti.http.client  # pylint: disable-msg=W0611 
  51   
  52   
  53  # Timeout for connecting to nodes (seconds) 
  54  _RPC_CONNECT_TIMEOUT = 5 
  55   
  56  _RPC_CLIENT_HEADERS = [ 
  57    "Content-type: %s" % http.HTTP_APP_JSON, 
  58    "Expect:", 
  59    ] 
  60   
  61  # Various time constants for the timeout table 
  62  _TMO_URGENT = 60 # one minute 
  63  _TMO_FAST = 5 * 60 # five minutes 
  64  _TMO_NORMAL = 15 * 60 # 15 minutes 
  65  _TMO_SLOW = 3600 # one hour 
  66  _TMO_4HRS = 4 * 3600 
  67  _TMO_1DAY = 86400 
  68   
  69  # Timeout table that will be built later by decorators 
  70  # Guidelines for choosing timeouts: 
  71  # - call used during watcher: timeout -> 1min, _TMO_URGENT 
  72  # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST 
  73  # - other calls: 15 min, _TMO_NORMAL 
  74  # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts 
  75   
  76  _TIMEOUTS = { 
  77  } 
78 79 80 -def Init():
81 """Initializes the module-global HTTP client manager. 82 83 Must be called before using any RPC function and while exactly one thread is 84 running. 85 86 """ 87 # curl_global_init(3) and curl_global_cleanup(3) must be called with only 88 # one thread running. This check is just a safety measure -- it doesn't 89 # cover all cases. 90 assert threading.activeCount() == 1, \ 91 "Found more than one active thread when initializing pycURL" 92 93 logging.info("Using PycURL %s", pycurl.version) 94 95 pycurl.global_init(pycurl.GLOBAL_ALL)
96
97 98 -def Shutdown():
99 """Stops the module-global HTTP client manager. 100 101 Must be called before quitting the program and while exactly one thread is 102 running. 103 104 """ 105 pycurl.global_cleanup()
106
107 108 -def _ConfigRpcCurl(curl):
109 noded_cert = str(constants.NODED_CERT_FILE) 110 111 curl.setopt(pycurl.FOLLOWLOCATION, False) 112 curl.setopt(pycurl.CAINFO, noded_cert) 113 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 114 curl.setopt(pycurl.SSL_VERIFYPEER, True) 115 curl.setopt(pycurl.SSLCERTTYPE, "PEM") 116 curl.setopt(pycurl.SSLCERT, noded_cert) 117 curl.setopt(pycurl.SSLKEYTYPE, "PEM") 118 curl.setopt(pycurl.SSLKEY, noded_cert) 119 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120 121 122 # Aliasing this module avoids the following warning by epydoc: "Warning: No 123 # information available for ganeti.rpc._RpcThreadLocal's base threading.local" 124 _threading = threading
125 126 127 -class _RpcThreadLocal(_threading.local):
128 - def GetHttpClientPool(self):
129 """Returns a per-thread HTTP client pool. 130 131 @rtype: L{http.client.HttpClientPool} 132 133 """ 134 try: 135 pool = self.hcp 136 except AttributeError: 137 pool = http.client.HttpClientPool(_ConfigRpcCurl) 138 self.hcp = pool 139 140 return pool
141 142 143 # Remove module alias (see above) 144 del _threading 145 146 147 _thread_local = _RpcThreadLocal()
148 149 150 -def _RpcTimeout(secs):
151 """Timeout decorator. 152 153 When applied to a rpc call_* function, it updates the global timeout 154 table with the given function/timeout. 155 156 """ 157 def decorator(f): 158 name = f.__name__ 159 assert name.startswith("call_") 160 _TIMEOUTS[name[len("call_"):]] = secs 161 return f
162 return decorator 163
164 165 -def RunWithRPC(fn):
166 """RPC-wrapper decorator. 167 168 When applied to a function, it runs it with the RPC system 169 initialized, and it shutsdown the system afterwards. This means the 170 function must be called without RPC being initialized. 171 172 """ 173 def wrapper(*args, **kwargs): 174 Init() 175 try: 176 return fn(*args, **kwargs) 177 finally: 178 Shutdown()
179 return wrapper 180
181 182 -class RpcResult(object):
183 """RPC Result class. 184 185 This class holds an RPC result. It is needed since in multi-node 186 calls we can't raise an exception just because one one out of many 187 failed, and therefore we use this class to encapsulate the result. 188 189 @ivar data: the data payload, for successful results, or None 190 @ivar call: the name of the RPC call 191 @ivar node: the name of the node to which we made the call 192 @ivar offline: whether the operation failed because the node was 193 offline, as opposed to actual failure; offline=True will always 194 imply failed=True, in order to allow simpler checking if 195 the user doesn't care about the exact failure mode 196 @ivar fail_msg: the error message if the call failed 197 198 """
199 - def __init__(self, data=None, failed=False, offline=False, 200 call=None, node=None):
201 self.offline = offline 202 self.call = call 203 self.node = node 204 205 if offline: 206 self.fail_msg = "Node is marked offline" 207 self.data = self.payload = None 208 elif failed: 209 self.fail_msg = self._EnsureErr(data) 210 self.data = self.payload = None 211 else: 212 self.data = data 213 if not isinstance(self.data, (tuple, list)): 214 self.fail_msg = ("RPC layer error: invalid result type (%s)" % 215 type(self.data)) 216 self.payload = None 217 elif len(data) != 2: 218 self.fail_msg = ("RPC layer error: invalid result length (%d), " 219 "expected 2" % len(self.data)) 220 self.payload = None 221 elif not self.data[0]: 222 self.fail_msg = self._EnsureErr(self.data[1]) 223 self.payload = None 224 else: 225 # finally success 226 self.fail_msg = None 227 self.payload = data[1] 228 229 assert hasattr(self, "call") 230 assert hasattr(self, "data") 231 assert hasattr(self, "fail_msg") 232 assert hasattr(self, "node") 233 assert hasattr(self, "offline") 234 assert hasattr(self, "payload")
235 236 @staticmethod
237 - def _EnsureErr(val):
238 """Helper to ensure we return a 'True' value for error.""" 239 if val: 240 return val 241 else: 242 return "No error information"
243
244 - def Raise(self, msg, prereq=False, ecode=None):
245 """If the result has failed, raise an OpExecError. 246 247 This is used so that LU code doesn't have to check for each 248 result, but instead can call this function. 249 250 """ 251 if not self.fail_msg: 252 return 253 254 if not msg: # one could pass None for default message 255 msg = ("Call '%s' to node '%s' has failed: %s" % 256 (self.call, self.node, self.fail_msg)) 257 else: 258 msg = "%s: %s" % (msg, self.fail_msg) 259 if prereq: 260 ec = errors.OpPrereqError 261 else: 262 ec = errors.OpExecError 263 if ecode is not None: 264 args = (msg, ecode) 265 else: 266 args = (msg, ) 267 raise ec(*args) # pylint: disable-msg=W0142
268
269 270 -def _AddressLookup(node_list, 271 ssc=ssconf.SimpleStore, 272 nslookup_fn=netutils.Hostname.GetIP):
273 """Return addresses for given node names. 274 275 @type node_list: list 276 @param node_list: List of node names 277 @type ssc: class 278 @param ssc: SimpleStore class that is used to obtain node->ip mappings 279 @type nslookup_fn: callable 280 @param nslookup_fn: function use to do NS lookup 281 @rtype: list of addresses and/or None's 282 @returns: List of corresponding addresses, if found 283 284 """ 285 ss = ssc() 286 iplist = ss.GetNodePrimaryIPList() 287 family = ss.GetPrimaryIPFamily() 288 addresses = [] 289 ipmap = dict(entry.split() for entry in iplist) 290 for node in node_list: 291 address = ipmap.get(node) 292 if address is None: 293 address = nslookup_fn(node, family=family) 294 addresses.append(address) 295 296 return addresses
297
298 299 -class Client:
300 """RPC Client class. 301 302 This class, given a (remote) method name, a list of parameters and a 303 list of nodes, will contact (in parallel) all nodes, and return a 304 dict of results (key: node name, value: result). 305 306 One current bug is that generic failure is still signaled by 307 'False' result, which is not good. This overloading of values can 308 cause bugs. 309 310 """
311 - def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
312 assert procedure in _TIMEOUTS, ("New RPC call not declared in the" 313 " timeouts table") 314 self.procedure = procedure 315 self.body = body 316 self.port = port 317 self._request = {} 318 self._address_lookup_fn = address_lookup_fn
319
320 - def ConnectList(self, node_list, address_list=None, read_timeout=None):
321 """Add a list of nodes to the target nodes. 322 323 @type node_list: list 324 @param node_list: the list of node names to connect 325 @type address_list: list or None 326 @keyword address_list: either None or a list with node addresses, 327 which must have the same length as the node list 328 @type read_timeout: int 329 @param read_timeout: overwrites default timeout for operation 330 331 """ 332 if address_list is None: 333 # Always use IP address instead of node name 334 address_list = self._address_lookup_fn(node_list) 335 336 assert len(node_list) == len(address_list), \ 337 "Name and address lists must have the same length" 338 339 for node, address in zip(node_list, address_list): 340 self.ConnectNode(node, address, read_timeout=read_timeout)
341
342 - def ConnectNode(self, name, address=None, read_timeout=None):
343 """Add a node to the target list. 344 345 @type name: str 346 @param name: the node name 347 @type address: str 348 @param address: the node address, if known 349 @type read_timeout: int 350 @param read_timeout: overwrites default timeout for operation 351 352 """ 353 if address is None: 354 # Always use IP address instead of node name 355 address = self._address_lookup_fn([name])[0] 356 357 assert(address is not None) 358 359 if read_timeout is None: 360 read_timeout = _TIMEOUTS[self.procedure] 361 362 self._request[name] = \ 363 http.client.HttpClientRequest(str(address), self.port, 364 http.HTTP_PUT, str("/%s" % self.procedure), 365 headers=_RPC_CLIENT_HEADERS, 366 post_data=str(self.body), 367 read_timeout=read_timeout)
368
369 - def GetResults(self, http_pool=None):
370 """Call nodes and return results. 371 372 @rtype: list 373 @return: List of RPC results 374 375 """ 376 if not http_pool: 377 http_pool = _thread_local.GetHttpClientPool() 378 379 http_pool.ProcessRequests(self._request.values()) 380 381 results = {} 382 383 for name, req in self._request.iteritems(): 384 if req.success and req.resp_status_code == http.HTTP_OK: 385 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body), 386 node=name, call=self.procedure) 387 continue 388 389 # TODO: Better error reporting 390 if req.error: 391 msg = req.error 392 else: 393 msg = req.resp_body 394 395 logging.error("RPC error in %s from node %s: %s", 396 self.procedure, name, msg) 397 results[name] = RpcResult(data=msg, failed=True, node=name, 398 call=self.procedure) 399 400 return results
401
402 403 -def _EncodeImportExportIO(ieio, ieioargs):
404 """Encodes import/export I/O information. 405 406 """ 407 if ieio == constants.IEIO_RAW_DISK: 408 assert len(ieioargs) == 1 409 return (ieioargs[0].ToDict(), ) 410 411 if ieio == constants.IEIO_SCRIPT: 412 assert len(ieioargs) == 2 413 return (ieioargs[0].ToDict(), ieioargs[1]) 414 415 return ieioargs
416
417 418 -class RpcRunner(object):
419 """RPC runner class""" 420
421 - def __init__(self, cfg):
422 """Initialized the rpc runner. 423 424 @type cfg: C{config.ConfigWriter} 425 @param cfg: the configuration object that will be used to get data 426 about the cluster 427 428 """ 429 self._cfg = cfg 430 self.port = netutils.GetDaemonPort(constants.NODED)
431
432 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
433 """Convert the given instance to a dict. 434 435 This is done via the instance's ToDict() method and additionally 436 we fill the hvparams with the cluster defaults. 437 438 @type instance: L{objects.Instance} 439 @param instance: an Instance object 440 @type hvp: dict or None 441 @param hvp: a dictionary with overridden hypervisor parameters 442 @type bep: dict or None 443 @param bep: a dictionary with overridden backend parameters 444 @type osp: dict or None 445 @param osp: a dictionary with overridden os parameters 446 @rtype: dict 447 @return: the instance dict, with the hvparams filled with the 448 cluster defaults 449 450 """ 451 idict = instance.ToDict() 452 cluster = self._cfg.GetClusterInfo() 453 idict["hvparams"] = cluster.FillHV(instance) 454 if hvp is not None: 455 idict["hvparams"].update(hvp) 456 idict["beparams"] = cluster.FillBE(instance) 457 if bep is not None: 458 idict["beparams"].update(bep) 459 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) 460 if osp is not None: 461 idict["osparams"].update(osp) 462 for nic in idict["nics"]: 463 nic['nicparams'] = objects.FillDict( 464 cluster.nicparams[constants.PP_DEFAULT], 465 nic['nicparams']) 466 return idict
467
468 - def _ConnectList(self, client, node_list, call, read_timeout=None):
469 """Helper for computing node addresses. 470 471 @type client: L{ganeti.rpc.Client} 472 @param client: a C{Client} instance 473 @type node_list: list 474 @param node_list: the node list we should connect 475 @type call: string 476 @param call: the name of the remote procedure call, for filling in 477 correctly any eventual offline nodes' results 478 @type read_timeout: int 479 @param read_timeout: overwrites the default read timeout for the 480 given operation 481 482 """ 483 all_nodes = self._cfg.GetAllNodesInfo() 484 name_list = [] 485 addr_list = [] 486 skip_dict = {} 487 for node in node_list: 488 if node in all_nodes: 489 if all_nodes[node].offline: 490 skip_dict[node] = RpcResult(node=node, offline=True, call=call) 491 continue 492 val = all_nodes[node].primary_ip 493 else: 494 val = None 495 addr_list.append(val) 496 name_list.append(node) 497 if name_list: 498 client.ConnectList(name_list, address_list=addr_list, 499 read_timeout=read_timeout) 500 return skip_dict
501
502 - def _ConnectNode(self, client, node, call, read_timeout=None):
503 """Helper for computing one node's address. 504 505 @type client: L{ganeti.rpc.Client} 506 @param client: a C{Client} instance 507 @type node: str 508 @param node: the node we should connect 509 @type call: string 510 @param call: the name of the remote procedure call, for filling in 511 correctly any eventual offline nodes' results 512 @type read_timeout: int 513 @param read_timeout: overwrites the default read timeout for the 514 given operation 515 516 """ 517 node_info = self._cfg.GetNodeInfo(node) 518 if node_info is not None: 519 if node_info.offline: 520 return RpcResult(node=node, offline=True, call=call) 521 addr = node_info.primary_ip 522 else: 523 addr = None 524 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
525
526 - def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
527 """Helper for making a multi-node call 528 529 """ 530 body = serializer.DumpJson(args, indent=False) 531 c = Client(procedure, body, self.port) 532 skip_dict = self._ConnectList(c, node_list, procedure, 533 read_timeout=read_timeout) 534 skip_dict.update(c.GetResults()) 535 return skip_dict
536 537 @classmethod
538 - def _StaticMultiNodeCall(cls, node_list, procedure, args, 539 address_list=None, read_timeout=None):
540 """Helper for making a multi-node static call 541 542 """ 543 body = serializer.DumpJson(args, indent=False) 544 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED)) 545 c.ConnectList(node_list, address_list=address_list, 546 read_timeout=read_timeout) 547 return c.GetResults()
548
549 - def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
550 """Helper for making a single-node call 551 552 """ 553 body = serializer.DumpJson(args, indent=False) 554 c = Client(procedure, body, self.port) 555 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout) 556 if result is None: 557 # we did connect, node is not offline 558 result = c.GetResults()[node] 559 return result
560 561 @classmethod
562 - def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
563 """Helper for making a single-node static call 564 565 """ 566 body = serializer.DumpJson(args, indent=False) 567 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED)) 568 c.ConnectNode(node, read_timeout=read_timeout) 569 return c.GetResults()[node]
570 571 @staticmethod
572 - def _Compress(data):
573 """Compresses a string for transport over RPC. 574 575 Small amounts of data are not compressed. 576 577 @type data: str 578 @param data: Data 579 @rtype: tuple 580 @return: Encoded data to send 581 582 """ 583 # Small amounts of data are not compressed 584 if len(data) < 512: 585 return (constants.RPC_ENCODING_NONE, data) 586 587 # Compress with zlib and encode in base64 588 return (constants.RPC_ENCODING_ZLIB_BASE64, 589 base64.b64encode(zlib.compress(data, 3)))
590 591 # 592 # Begin RPC calls 593 # 594 595 @_RpcTimeout(_TMO_URGENT)
596 - def call_lv_list(self, node_list, vg_name):
597 """Gets the logical volumes present in a given volume group. 598 599 This is a multi-node call. 600 601 """ 602 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
603 604 @_RpcTimeout(_TMO_URGENT)
605 - def call_vg_list(self, node_list):
606 """Gets the volume group list. 607 608 This is a multi-node call. 609 610 """ 611 return self._MultiNodeCall(node_list, "vg_list", [])
612 613 @_RpcTimeout(_TMO_NORMAL)
614 - def call_storage_list(self, node_list, su_name, su_args, name, fields):
615 """Get list of storage units. 616 617 This is a multi-node call. 618 619 """ 620 return self._MultiNodeCall(node_list, "storage_list", 621 [su_name, su_args, name, fields])
622 623 @_RpcTimeout(_TMO_NORMAL)
624 - def call_storage_modify(self, node, su_name, su_args, name, changes):
625 """Modify a storage unit. 626 627 This is a single-node call. 628 629 """ 630 return self._SingleNodeCall(node, "storage_modify", 631 [su_name, su_args, name, changes])
632 633 @_RpcTimeout(_TMO_NORMAL)
634 - def call_storage_execute(self, node, su_name, su_args, name, op):
635 """Executes an operation on a storage unit. 636 637 This is a single-node call. 638 639 """ 640 return self._SingleNodeCall(node, "storage_execute", 641 [su_name, su_args, name, op])
642 643 @_RpcTimeout(_TMO_URGENT)
644 - def call_bridges_exist(self, node, bridges_list):
645 """Checks if a node has all the bridges given. 646 647 This method checks if all bridges given in the bridges_list are 648 present on the remote node, so that an instance that uses interfaces 649 on those bridges can be started. 650 651 This is a single-node call. 652 653 """ 654 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
655 656 @_RpcTimeout(_TMO_NORMAL)
657 - def call_instance_start(self, node, instance, hvp, bep):
658 """Starts an instance. 659 660 This is a single-node call. 661 662 """ 663 idict = self._InstDict(instance, hvp=hvp, bep=bep) 664 return self._SingleNodeCall(node, "instance_start", [idict])
665 666 @_RpcTimeout(_TMO_NORMAL)
667 - def call_instance_shutdown(self, node, instance, timeout):
668 """Stops an instance. 669 670 This is a single-node call. 671 672 """ 673 return self._SingleNodeCall(node, "instance_shutdown", 674 [self._InstDict(instance), timeout])
675 676 @_RpcTimeout(_TMO_NORMAL)
677 - def call_migration_info(self, node, instance):
678 """Gather the information necessary to prepare an instance migration. 679 680 This is a single-node call. 681 682 @type node: string 683 @param node: the node on which the instance is currently running 684 @type instance: C{objects.Instance} 685 @param instance: the instance definition 686 687 """ 688 return self._SingleNodeCall(node, "migration_info", 689 [self._InstDict(instance)])
690 691 @_RpcTimeout(_TMO_NORMAL)
692 - def call_accept_instance(self, node, instance, info, target):
693 """Prepare a node to accept an instance. 694 695 This is a single-node call. 696 697 @type node: string 698 @param node: the target node for the migration 699 @type instance: C{objects.Instance} 700 @param instance: the instance definition 701 @type info: opaque/hypervisor specific (string/data) 702 @param info: result for the call_migration_info call 703 @type target: string 704 @param target: target hostname (usually ip address) (on the node itself) 705 706 """ 707 return self._SingleNodeCall(node, "accept_instance", 708 [self._InstDict(instance), info, target])
709 710 @_RpcTimeout(_TMO_NORMAL)
711 - def call_finalize_migration(self, node, instance, info, success):
712 """Finalize any target-node migration specific operation. 713 714 This is called both in case of a successful migration and in case of error 715 (in which case it should abort the migration). 716 717 This is a single-node call. 718 719 @type node: string 720 @param node: the target node for the migration 721 @type instance: C{objects.Instance} 722 @param instance: the instance definition 723 @type info: opaque/hypervisor specific (string/data) 724 @param info: result for the call_migration_info call 725 @type success: boolean 726 @param success: whether the migration was a success or a failure 727 728 """ 729 return self._SingleNodeCall(node, "finalize_migration", 730 [self._InstDict(instance), info, success])
731 732 @_RpcTimeout(_TMO_SLOW)
733 - def call_instance_migrate(self, node, instance, target, live):
734 """Migrate an instance. 735 736 This is a single-node call. 737 738 @type node: string 739 @param node: the node on which the instance is currently running 740 @type instance: C{objects.Instance} 741 @param instance: the instance definition 742 @type target: string 743 @param target: the target node name 744 @type live: boolean 745 @param live: whether the migration should be done live or not (the 746 interpretation of this parameter is left to the hypervisor) 747 748 """ 749 return self._SingleNodeCall(node, "instance_migrate", 750 [self._InstDict(instance), target, live])
751 752 @_RpcTimeout(_TMO_NORMAL)
753 - def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
754 """Reboots an instance. 755 756 This is a single-node call. 757 758 """ 759 return self._SingleNodeCall(node, "instance_reboot", 760 [self._InstDict(inst), reboot_type, 761 shutdown_timeout])
762 763 @_RpcTimeout(_TMO_1DAY)
764 - def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
765 """Installs an OS on the given instance. 766 767 This is a single-node call. 768 769 """ 770 return self._SingleNodeCall(node, "instance_os_add", 771 [self._InstDict(inst, osp=osparams), 772 reinstall, debug])
773 774 @_RpcTimeout(_TMO_SLOW)
775 - def call_instance_run_rename(self, node, inst, old_name, debug):
776 """Run the OS rename script for an instance. 777 778 This is a single-node call. 779 780 """ 781 return self._SingleNodeCall(node, "instance_run_rename", 782 [self._InstDict(inst), old_name, debug])
783 784 @_RpcTimeout(_TMO_URGENT)
785 - def call_instance_info(self, node, instance, hname):
786 """Returns information about a single instance. 787 788 This is a single-node call. 789 790 @type node: list 791 @param node: the list of nodes to query 792 @type instance: string 793 @param instance: the instance name 794 @type hname: string 795 @param hname: the hypervisor type of the instance 796 797 """ 798 return self._SingleNodeCall(node, "instance_info", [instance, hname])
799 800 @_RpcTimeout(_TMO_NORMAL)
801 - def call_instance_migratable(self, node, instance):
802 """Checks whether the given instance can be migrated. 803 804 This is a single-node call. 805 806 @param node: the node to query 807 @type instance: L{objects.Instance} 808 @param instance: the instance to check 809 810 811 """ 812 return self._SingleNodeCall(node, "instance_migratable", 813 [self._InstDict(instance)])
814 815 @_RpcTimeout(_TMO_URGENT)
816 - def call_all_instances_info(self, node_list, hypervisor_list):
817 """Returns information about all instances on the given nodes. 818 819 This is a multi-node call. 820 821 @type node_list: list 822 @param node_list: the list of nodes to query 823 @type hypervisor_list: list 824 @param hypervisor_list: the hypervisors to query for instances 825 826 """ 827 return self._MultiNodeCall(node_list, "all_instances_info", 828 [hypervisor_list])
829 830 @_RpcTimeout(_TMO_URGENT)
831 - def call_instance_list(self, node_list, hypervisor_list):
832 """Returns the list of running instances on a given node. 833 834 This is a multi-node call. 835 836 @type node_list: list 837 @param node_list: the list of nodes to query 838 @type hypervisor_list: list 839 @param hypervisor_list: the hypervisors to query for instances 840 841 """ 842 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
843 844 @_RpcTimeout(_TMO_FAST)
845 - def call_node_tcp_ping(self, node, source, target, port, timeout, 846 live_port_needed):
847 """Do a TcpPing on the remote node 848 849 This is a single-node call. 850 851 """ 852 return self._SingleNodeCall(node, "node_tcp_ping", 853 [source, target, port, timeout, 854 live_port_needed])
855 856 @_RpcTimeout(_TMO_FAST)
857 - def call_node_has_ip_address(self, node, address):
858 """Checks if a node has the given IP address. 859 860 This is a single-node call. 861 862 """ 863 return self._SingleNodeCall(node, "node_has_ip_address", [address])
864 865 @_RpcTimeout(_TMO_URGENT)
866 - def call_node_info(self, node_list, vg_name, hypervisor_type):
867 """Return node information. 868 869 This will return memory information and volume group size and free 870 space. 871 872 This is a multi-node call. 873 874 @type node_list: list 875 @param node_list: the list of nodes to query 876 @type vg_name: C{string} 877 @param vg_name: the name of the volume group to ask for disk space 878 information 879 @type hypervisor_type: C{str} 880 @param hypervisor_type: the name of the hypervisor to ask for 881 memory information 882 883 """ 884 return self._MultiNodeCall(node_list, "node_info", 885 [vg_name, hypervisor_type])
886 887 @_RpcTimeout(_TMO_NORMAL)
888 - def call_etc_hosts_modify(self, node, mode, name, ip):
889 """Modify hosts file with name 890 891 @type node: string 892 @param node: The node to call 893 @type mode: string 894 @param mode: The mode to operate. Currently "add" or "remove" 895 @type name: string 896 @param name: The host name to be modified 897 @type ip: string 898 @param ip: The ip of the entry (just valid if mode is "add") 899 900 """ 901 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
902 903 @_RpcTimeout(_TMO_NORMAL)
904 - def call_node_verify(self, node_list, checkdict, cluster_name):
905 """Request verification of given parameters. 906 907 This is a multi-node call. 908 909 """ 910 return self._MultiNodeCall(node_list, "node_verify", 911 [checkdict, cluster_name])
912 913 @classmethod 914 @_RpcTimeout(_TMO_FAST)
915 - def call_node_start_master(cls, node, start_daemons, no_voting):
916 """Tells a node to activate itself as a master. 917 918 This is a single-node call. 919 920 """ 921 return cls._StaticSingleNodeCall(node, "node_start_master", 922 [start_daemons, no_voting])
923 924 @classmethod 925 @_RpcTimeout(_TMO_FAST)
926 - def call_node_stop_master(cls, node, stop_daemons):
927 """Tells a node to demote itself from master status. 928 929 This is a single-node call. 930 931 """ 932 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
933 934 @classmethod 935 @_RpcTimeout(_TMO_URGENT)
936 - def call_master_info(cls, node_list):
937 """Query master info. 938 939 This is a multi-node call. 940 941 """ 942 # TODO: should this method query down nodes? 943 return cls._StaticMultiNodeCall(node_list, "master_info", [])
944 945 @classmethod 946 @_RpcTimeout(_TMO_URGENT)
947 - def call_version(cls, node_list):
948 """Query node version. 949 950 This is a multi-node call. 951 952 """ 953 return cls._StaticMultiNodeCall(node_list, "version", [])
954 955 @_RpcTimeout(_TMO_NORMAL)
956 - def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
957 """Request creation of a given block device. 958 959 This is a single-node call. 960 961 """ 962 return self._SingleNodeCall(node, "blockdev_create", 963 [bdev.ToDict(), size, owner, on_primary, info])
964 965 @_RpcTimeout(_TMO_SLOW)
966 - def call_blockdev_wipe(self, node, bdev, offset, size):
967 """Request wipe at given offset with given size of a block device. 968 969 This is a single-node call. 970 971 """ 972 return self._SingleNodeCall(node, "blockdev_wipe", 973 [bdev.ToDict(), offset, size])
974 975 @_RpcTimeout(_TMO_NORMAL)
976 - def call_blockdev_remove(self, node, bdev):
977 """Request removal of a given block device. 978 979 This is a single-node call. 980 981 """ 982 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
983 984 @_RpcTimeout(_TMO_NORMAL)
985 - def call_blockdev_rename(self, node, devlist):
986 """Request rename of the given block devices. 987 988 This is a single-node call. 989 990 """ 991 return self._SingleNodeCall(node, "blockdev_rename", 992 [(d.ToDict(), uid) for d, uid in devlist])
993 994 @_RpcTimeout(_TMO_NORMAL)
995 - def call_blockdev_assemble(self, node, disk, owner, on_primary):
996 """Request assembling of a given block device. 997 998 This is a single-node call. 999 1000 """ 1001 return self._SingleNodeCall(node, "blockdev_assemble", 1002 [disk.ToDict(), owner, on_primary])
1003 1004 @_RpcTimeout(_TMO_NORMAL)
1005 - def call_blockdev_shutdown(self, node, disk):
1006 """Request shutdown of a given block device. 1007 1008 This is a single-node call. 1009 1010 """ 1011 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1012 1013 @_RpcTimeout(_TMO_NORMAL)
1014 - def call_blockdev_addchildren(self, node, bdev, ndevs):
1015 """Request adding a list of children to a (mirroring) device. 1016 1017 This is a single-node call. 1018 1019 """ 1020 return self._SingleNodeCall(node, "blockdev_addchildren", 1021 [bdev.ToDict(), 1022 [disk.ToDict() for disk in ndevs]])
1023 1024 @_RpcTimeout(_TMO_NORMAL)
1025 - def call_blockdev_removechildren(self, node, bdev, ndevs):
1026 """Request removing a list of children from a (mirroring) device. 1027 1028 This is a single-node call. 1029 1030 """ 1031 return self._SingleNodeCall(node, "blockdev_removechildren", 1032 [bdev.ToDict(), 1033 [disk.ToDict() for disk in ndevs]])
1034 1035 @_RpcTimeout(_TMO_NORMAL)
1036 - def call_blockdev_getmirrorstatus(self, node, disks):
1037 """Request status of a (mirroring) device. 1038 1039 This is a single-node call. 1040 1041 """ 1042 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus", 1043 [dsk.ToDict() for dsk in disks]) 1044 if not result.fail_msg: 1045 result.payload = [objects.BlockDevStatus.FromDict(i) 1046 for i in result.payload] 1047 return result
1048 1049 @_RpcTimeout(_TMO_NORMAL)
1050 - def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1051 """Request status of (mirroring) devices from multiple nodes. 1052 1053 This is a multi-node call. 1054 1055 """ 1056 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi", 1057 [dict((name, [dsk.ToDict() for dsk in disks]) 1058 for name, disks in node_disks.items())]) 1059 for nres in result.values(): 1060 if nres.fail_msg: 1061 continue 1062 1063 for idx, (success, status) in enumerate(nres.payload): 1064 if success: 1065 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) 1066 1067 return result
1068 1069 @_RpcTimeout(_TMO_NORMAL)
1070 - def call_blockdev_find(self, node, disk):
1071 """Request identification of a given block device. 1072 1073 This is a single-node call. 1074 1075 """ 1076 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()]) 1077 if not result.fail_msg and result.payload is not None: 1078 result.payload = objects.BlockDevStatus.FromDict(result.payload) 1079 return result
1080 1081 @_RpcTimeout(_TMO_NORMAL)
1082 - def call_blockdev_close(self, node, instance_name, disks):
1083 """Closes the given block devices. 1084 1085 This is a single-node call. 1086 1087 """ 1088 params = [instance_name, [cf.ToDict() for cf in disks]] 1089 return self._SingleNodeCall(node, "blockdev_close", params)
1090 1091 @_RpcTimeout(_TMO_NORMAL)
1092 - def call_blockdev_getsizes(self, node, disks):
1093 """Returns the size of the given disks. 1094 1095 This is a single-node call. 1096 1097 """ 1098 params = [[cf.ToDict() for cf in disks]] 1099 return self._SingleNodeCall(node, "blockdev_getsize", params)
1100 1101 @_RpcTimeout(_TMO_NORMAL)
1102 - def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1103 """Disconnects the network of the given drbd devices. 1104 1105 This is a multi-node call. 1106 1107 """ 1108 return self._MultiNodeCall(node_list, "drbd_disconnect_net", 1109 [nodes_ip, [cf.ToDict() for cf in disks]])
1110 1111 @_RpcTimeout(_TMO_NORMAL)
1112 - def call_drbd_attach_net(self, node_list, nodes_ip, 1113 disks, instance_name, multimaster):
1114 """Disconnects the given drbd devices. 1115 1116 This is a multi-node call. 1117 1118 """ 1119 return self._MultiNodeCall(node_list, "drbd_attach_net", 1120 [nodes_ip, [cf.ToDict() for cf in disks], 1121 instance_name, multimaster])
1122 1123 @_RpcTimeout(_TMO_SLOW)
1124 - def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1125 """Waits for the synchronization of drbd devices is complete. 1126 1127 This is a multi-node call. 1128 1129 """ 1130 return self._MultiNodeCall(node_list, "drbd_wait_sync", 1131 [nodes_ip, [cf.ToDict() for cf in disks]])
1132 1133 @_RpcTimeout(_TMO_URGENT)
1134 - def call_drbd_helper(self, node_list):
1135 """Gets drbd helper. 1136 1137 This is a multi-node call. 1138 1139 """ 1140 return self._MultiNodeCall(node_list, "drbd_helper", [])
1141 1142 @classmethod 1143 @_RpcTimeout(_TMO_NORMAL)
1144 - def call_upload_file(cls, node_list, file_name, address_list=None):
1145 """Upload a file. 1146 1147 The node will refuse the operation in case the file is not on the 1148 approved file list. 1149 1150 This is a multi-node call. 1151 1152 @type node_list: list 1153 @param node_list: the list of node names to upload to 1154 @type file_name: str 1155 @param file_name: the filename to upload 1156 @type address_list: list or None 1157 @keyword address_list: an optional list of node addresses, in order 1158 to optimize the RPC speed 1159 1160 """ 1161 file_contents = utils.ReadFile(file_name) 1162 data = cls._Compress(file_contents) 1163 st = os.stat(file_name) 1164 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, 1165 st.st_atime, st.st_mtime] 1166 return cls._StaticMultiNodeCall(node_list, "upload_file", params, 1167 address_list=address_list)
1168 1169 @classmethod 1170 @_RpcTimeout(_TMO_NORMAL)
1171 - def call_write_ssconf_files(cls, node_list, values):
1172 """Write ssconf files. 1173 1174 This is a multi-node call. 1175 1176 """ 1177 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1178 1179 @_RpcTimeout(_TMO_FAST)
1180 - def call_os_diagnose(self, node_list):
1181 """Request a diagnose of OS definitions. 1182 1183 This is a multi-node call. 1184 1185 """ 1186 return self._MultiNodeCall(node_list, "os_diagnose", [])
1187 1188 @_RpcTimeout(_TMO_FAST)
1189 - def call_os_get(self, node, name):
1190 """Returns an OS definition. 1191 1192 This is a single-node call. 1193 1194 """ 1195 result = self._SingleNodeCall(node, "os_get", [name]) 1196 if not result.fail_msg and isinstance(result.payload, dict): 1197 result.payload = objects.OS.FromDict(result.payload) 1198 return result
1199 1200 @_RpcTimeout(_TMO_FAST)
1201 - def call_os_validate(self, required, nodes, name, checks, params):
1202 """Run a validation routine for a given OS. 1203 1204 This is a multi-node call. 1205 1206 """ 1207 return self._MultiNodeCall(nodes, "os_validate", 1208 [required, name, checks, params])
1209 1210 @_RpcTimeout(_TMO_NORMAL)
1211 - def call_hooks_runner(self, node_list, hpath, phase, env):
1212 """Call the hooks runner. 1213 1214 Args: 1215 - op: the OpCode instance 1216 - env: a dictionary with the environment 1217 1218 This is a multi-node call. 1219 1220 """ 1221 params = [hpath, phase, env] 1222 return self._MultiNodeCall(node_list, "hooks_runner", params)
1223 1224 @_RpcTimeout(_TMO_NORMAL)
1225 - def call_iallocator_runner(self, node, name, idata):
1226 """Call an iallocator on a remote node 1227 1228 Args: 1229 - name: the iallocator name 1230 - input: the json-encoded input string 1231 1232 This is a single-node call. 1233 1234 """ 1235 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1236 1237 @_RpcTimeout(_TMO_NORMAL)
1238 - def call_blockdev_grow(self, node, cf_bdev, amount):
1239 """Request a snapshot of the given block device. 1240 1241 This is a single-node call. 1242 1243 """ 1244 return self._SingleNodeCall(node, "blockdev_grow", 1245 [cf_bdev.ToDict(), amount])
1246 1247 @_RpcTimeout(_TMO_1DAY)
1248 - def call_blockdev_export(self, node, cf_bdev, 1249 dest_node, dest_path, cluster_name):
1250 """Export a given disk to another node. 1251 1252 This is a single-node call. 1253 1254 """ 1255 return self._SingleNodeCall(node, "blockdev_export", 1256 [cf_bdev.ToDict(), dest_node, dest_path, 1257 cluster_name])
1258 1259 @_RpcTimeout(_TMO_NORMAL)
1260 - def call_blockdev_snapshot(self, node, cf_bdev):
1261 """Request a snapshot of the given block device. 1262 1263 This is a single-node call. 1264 1265 """ 1266 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1267 1268 @_RpcTimeout(_TMO_NORMAL)
1269 - def call_finalize_export(self, node, instance, snap_disks):
1270 """Request the completion of an export operation. 1271 1272 This writes the export config file, etc. 1273 1274 This is a single-node call. 1275 1276 """ 1277 flat_disks = [] 1278 for disk in snap_disks: 1279 if isinstance(disk, bool): 1280 flat_disks.append(disk) 1281 else: 1282 flat_disks.append(disk.ToDict()) 1283 1284 return self._SingleNodeCall(node, "finalize_export", 1285 [self._InstDict(instance), flat_disks])
1286 1287 @_RpcTimeout(_TMO_FAST)
1288 - def call_export_info(self, node, path):
1289 """Queries the export information in a given path. 1290 1291 This is a single-node call. 1292 1293 """ 1294 return self._SingleNodeCall(node, "export_info", [path])
1295 1296 @_RpcTimeout(_TMO_FAST)
1297 - def call_export_list(self, node_list):
1298 """Gets the stored exports list. 1299 1300 This is a multi-node call. 1301 1302 """ 1303 return self._MultiNodeCall(node_list, "export_list", [])
1304 1305 @_RpcTimeout(_TMO_FAST)
1306 - def call_export_remove(self, node, export):
1307 """Requests removal of a given export. 1308 1309 This is a single-node call. 1310 1311 """ 1312 return self._SingleNodeCall(node, "export_remove", [export])
1313 1314 @classmethod 1315 @_RpcTimeout(_TMO_NORMAL)
1316 - def call_node_leave_cluster(cls, node, modify_ssh_setup):
1317 """Requests a node to clean the cluster information it has. 1318 1319 This will remove the configuration information from the ganeti data 1320 dir. 1321 1322 This is a single-node call. 1323 1324 """ 1325 return cls._StaticSingleNodeCall(node, "node_leave_cluster", 1326 [modify_ssh_setup])
1327 1328 @_RpcTimeout(_TMO_FAST)
1329 - def call_node_volumes(self, node_list):
1330 """Gets all volumes on node(s). 1331 1332 This is a multi-node call. 1333 1334 """ 1335 return self._MultiNodeCall(node_list, "node_volumes", [])
1336 1337 @_RpcTimeout(_TMO_FAST)
1338 - def call_node_demote_from_mc(self, node):
1339 """Demote a node from the master candidate role. 1340 1341 This is a single-node call. 1342 1343 """ 1344 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1345 1346 @_RpcTimeout(_TMO_NORMAL)
1347 - def call_node_powercycle(self, node, hypervisor):
1348 """Tries to powercycle a node. 1349 1350 This is a single-node call. 1351 1352 """ 1353 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1354 1355 @_RpcTimeout(None)
1356 - def call_test_delay(self, node_list, duration):
1357 """Sleep for a fixed time on given node(s). 1358 1359 This is a multi-node call. 1360 1361 """ 1362 return self._MultiNodeCall(node_list, "test_delay", [duration], 1363 read_timeout=int(duration + 5))
1364 1365 @_RpcTimeout(_TMO_FAST)
1366 - def call_file_storage_dir_create(self, node, file_storage_dir):
1367 """Create the given file storage directory. 1368 1369 This is a single-node call. 1370 1371 """ 1372 return self._SingleNodeCall(node, "file_storage_dir_create", 1373 [file_storage_dir])
1374 1375 @_RpcTimeout(_TMO_FAST)
1376 - def call_file_storage_dir_remove(self, node, file_storage_dir):
1377 """Remove the given file storage directory. 1378 1379 This is a single-node call. 1380 1381 """ 1382 return self._SingleNodeCall(node, "file_storage_dir_remove", 1383 [file_storage_dir])
1384 1385 @_RpcTimeout(_TMO_FAST)
1386 - def call_file_storage_dir_rename(self, node, old_file_storage_dir, 1387 new_file_storage_dir):
1388 """Rename file storage directory. 1389 1390 This is a single-node call. 1391 1392 """ 1393 return self._SingleNodeCall(node, "file_storage_dir_rename", 1394 [old_file_storage_dir, new_file_storage_dir])
1395 1396 @classmethod 1397 @_RpcTimeout(_TMO_FAST)
1398 - def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1399 """Update job queue. 1400 1401 This is a multi-node call. 1402 1403 """ 1404 return cls._StaticMultiNodeCall(node_list, "jobqueue_update", 1405 [file_name, cls._Compress(content)], 1406 address_list=address_list)
1407 1408 @classmethod 1409 @_RpcTimeout(_TMO_NORMAL)
1410 - def call_jobqueue_purge(cls, node):
1411 """Purge job queue. 1412 1413 This is a single-node call. 1414 1415 """ 1416 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1417 1418 @classmethod 1419 @_RpcTimeout(_TMO_FAST)
1420 - def call_jobqueue_rename(cls, node_list, address_list, rename):
1421 """Rename a job queue file. 1422 1423 This is a multi-node call. 1424 1425 """ 1426 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename, 1427 address_list=address_list)
1428 1429 @_RpcTimeout(_TMO_NORMAL)
1430 - def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1431 """Validate the hypervisor params. 1432 1433 This is a multi-node call. 1434 1435 @type node_list: list 1436 @param node_list: the list of nodes to query 1437 @type hvname: string 1438 @param hvname: the hypervisor name 1439 @type hvparams: dict 1440 @param hvparams: the hypervisor parameters to be validated 1441 1442 """ 1443 cluster = self._cfg.GetClusterInfo() 1444 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) 1445 return self._MultiNodeCall(node_list, "hypervisor_validate_params", 1446 [hvname, hv_full])
1447 1448 @_RpcTimeout(_TMO_NORMAL)
1449 - def call_x509_cert_create(self, node, validity):
1450 """Creates a new X509 certificate for SSL/TLS. 1451 1452 This is a single-node call. 1453 1454 @type validity: int 1455 @param validity: Validity in seconds 1456 1457 """ 1458 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1459 1460 @_RpcTimeout(_TMO_NORMAL)
1461 - def call_x509_cert_remove(self, node, name):
1462 """Removes a X509 certificate. 1463 1464 This is a single-node call. 1465 1466 @type name: string 1467 @param name: Certificate name 1468 1469 """ 1470 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1471 1472 @_RpcTimeout(_TMO_NORMAL)
1473 - def call_import_start(self, node, opts, instance, dest, dest_args):
1474 """Starts a listener for an import. 1475 1476 This is a single-node call. 1477 1478 @type node: string 1479 @param node: Node name 1480 @type instance: C{objects.Instance} 1481 @param instance: Instance object 1482 1483 """ 1484 return self._SingleNodeCall(node, "import_start", 1485 [opts.ToDict(), 1486 self._InstDict(instance), dest, 1487 _EncodeImportExportIO(dest, dest_args)])
1488 1489 @_RpcTimeout(_TMO_NORMAL)
1490 - def call_export_start(self, node, opts, host, port, 1491 instance, source, source_args):
1492 """Starts an export daemon. 1493 1494 This is a single-node call. 1495 1496 @type node: string 1497 @param node: Node name 1498 @type instance: C{objects.Instance} 1499 @param instance: Instance object 1500 1501 """ 1502 return self._SingleNodeCall(node, "export_start", 1503 [opts.ToDict(), host, port, 1504 self._InstDict(instance), source, 1505 _EncodeImportExportIO(source, source_args)])
1506 1507 @_RpcTimeout(_TMO_FAST)
1508 - def call_impexp_status(self, node, names):
1509 """Gets the status of an import or export. 1510 1511 This is a single-node call. 1512 1513 @type node: string 1514 @param node: Node name 1515 @type names: List of strings 1516 @param names: Import/export names 1517 @rtype: List of L{objects.ImportExportStatus} instances 1518 @return: Returns a list of the state of each named import/export or None if 1519 a status couldn't be retrieved 1520 1521 """ 1522 result = self._SingleNodeCall(node, "impexp_status", [names]) 1523 1524 if not result.fail_msg: 1525 decoded = [] 1526 1527 for i in result.payload: 1528 if i is None: 1529 decoded.append(None) 1530 continue 1531 decoded.append(objects.ImportExportStatus.FromDict(i)) 1532 1533 result.payload = decoded 1534 1535 return result
1536 1537 @_RpcTimeout(_TMO_NORMAL)
1538 - def call_impexp_abort(self, node, name):
1539 """Aborts an import or export. 1540 1541 This is a single-node call. 1542 1543 @type node: string 1544 @param node: Node name 1545 @type name: string 1546 @param name: Import/export name 1547 1548 """ 1549 return self._SingleNodeCall(node, "impexp_abort", [name])
1550 1551 @_RpcTimeout(_TMO_NORMAL)
1552 - def call_impexp_cleanup(self, node, name):
1553 """Cleans up after an import or export. 1554 1555 This is a single-node call. 1556 1557 @type node: string 1558 @param node: Node name 1559 @type name: string 1560 @param name: Import/export name 1561 1562 """ 1563 return self._SingleNodeCall(node, "impexp_cleanup", [name])
1564