1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Inter-node RPC library.
23
24 """
25
26
27
28
29
30
31
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49
50
51 import ganeti.http.client
52
53
54
55 _RPC_CONNECT_TIMEOUT = 5
56
57 _RPC_CLIENT_HEADERS = [
58 "Content-type: %s" % http.HTTP_APP_JSON,
59 "Expect:",
60 ]
61
62
63 _TMO_URGENT = 60
64 _TMO_FAST = 5 * 60
65 _TMO_NORMAL = 15 * 60
66 _TMO_SLOW = 3600
67 _TMO_4HRS = 4 * 3600
68 _TMO_1DAY = 86400
69
70
71
72
73
74
75
76
77 _TIMEOUTS = {
78 }
82 """Initializes the module-global HTTP client manager.
83
84 Must be called before using any RPC function and while exactly one thread is
85 running.
86
87 """
88
89
90
91 assert threading.activeCount() == 1, \
92 "Found more than one active thread when initializing pycURL"
93
94 logging.info("Using PycURL %s", pycurl.version)
95
96 pycurl.global_init(pycurl.GLOBAL_ALL)
97
100 """Stops the module-global HTTP client manager.
101
102 Must be called before quitting the program and while exactly one thread is
103 running.
104
105 """
106 pycurl.global_cleanup()
107
110 noded_cert = str(constants.NODED_CERT_FILE)
111
112 curl.setopt(pycurl.FOLLOWLOCATION, False)
113 curl.setopt(pycurl.CAINFO, noded_cert)
114 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115 curl.setopt(pycurl.SSL_VERIFYPEER, True)
116 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117 curl.setopt(pycurl.SSLCERT, noded_cert)
118 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119 curl.setopt(pycurl.SSLKEY, noded_cert)
120 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121
122
123
124
125 _threading = threading
130 """Returns a per-thread HTTP client pool.
131
132 @rtype: L{http.client.HttpClientPool}
133
134 """
135 try:
136 pool = self.hcp
137 except AttributeError:
138 pool = http.client.HttpClientPool(_ConfigRpcCurl)
139 self.hcp = pool
140
141 return pool
142
143
144
145 del _threading
146
147
148 _thread_local = _RpcThreadLocal()
152 """Timeout decorator.
153
154 When applied to a rpc call_* function, it updates the global timeout
155 table with the given function/timeout.
156
157 """
158 def decorator(f):
159 name = f.__name__
160 assert name.startswith("call_")
161 _TIMEOUTS[name[len("call_"):]] = secs
162 return f
163 return decorator
164
167 """RPC-wrapper decorator.
168
169 When applied to a function, it runs it with the RPC system
170 initialized, and it shutsdown the system afterwards. This means the
171 function must be called without RPC being initialized.
172
173 """
174 def wrapper(*args, **kwargs):
175 Init()
176 try:
177 return fn(*args, **kwargs)
178 finally:
179 Shutdown()
180 return wrapper
181
184 """RPC Result class.
185
186 This class holds an RPC result. It is needed since in multi-node
187 calls we can't raise an exception just because one one out of many
188 failed, and therefore we use this class to encapsulate the result.
189
190 @ivar data: the data payload, for successful results, or None
191 @ivar call: the name of the RPC call
192 @ivar node: the name of the node to which we made the call
193 @ivar offline: whether the operation failed because the node was
194 offline, as opposed to actual failure; offline=True will always
195 imply failed=True, in order to allow simpler checking if
196 the user doesn't care about the exact failure mode
197 @ivar fail_msg: the error message if the call failed
198
199 """
200 - def __init__(self, data=None, failed=False, offline=False,
201 call=None, node=None):
202 self.offline = offline
203 self.call = call
204 self.node = node
205
206 if offline:
207 self.fail_msg = "Node is marked offline"
208 self.data = self.payload = None
209 elif failed:
210 self.fail_msg = self._EnsureErr(data)
211 self.data = self.payload = None
212 else:
213 self.data = data
214 if not isinstance(self.data, (tuple, list)):
215 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
216 type(self.data))
217 self.payload = None
218 elif len(data) != 2:
219 self.fail_msg = ("RPC layer error: invalid result length (%d), "
220 "expected 2" % len(self.data))
221 self.payload = None
222 elif not self.data[0]:
223 self.fail_msg = self._EnsureErr(self.data[1])
224 self.payload = None
225 else:
226
227 self.fail_msg = None
228 self.payload = data[1]
229
230 for attr_name in ["call", "data", "fail_msg",
231 "node", "offline", "payload"]:
232 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
233
234 @staticmethod
236 """Helper to ensure we return a 'True' value for error."""
237 if val:
238 return val
239 else:
240 return "No error information"
241
242 - def Raise(self, msg, prereq=False, ecode=None):
243 """If the result has failed, raise an OpExecError.
244
245 This is used so that LU code doesn't have to check for each
246 result, but instead can call this function.
247
248 """
249 if not self.fail_msg:
250 return
251
252 if not msg:
253 msg = ("Call '%s' to node '%s' has failed: %s" %
254 (self.call, self.node, self.fail_msg))
255 else:
256 msg = "%s: %s" % (msg, self.fail_msg)
257 if prereq:
258 ec = errors.OpPrereqError
259 else:
260 ec = errors.OpExecError
261 if ecode is not None:
262 args = (msg, ecode)
263 else:
264 args = (msg, )
265 raise ec(*args)
266
271 """Return addresses for given node names.
272
273 @type node_list: list
274 @param node_list: List of node names
275 @type ssc: class
276 @param ssc: SimpleStore class that is used to obtain node->ip mappings
277 @type nslookup_fn: callable
278 @param nslookup_fn: function use to do NS lookup
279 @rtype: list of addresses and/or None's
280 @returns: List of corresponding addresses, if found
281
282 """
283 ss = ssc()
284 iplist = ss.GetNodePrimaryIPList()
285 family = ss.GetPrimaryIPFamily()
286 addresses = []
287 ipmap = dict(entry.split() for entry in iplist)
288 for node in node_list:
289 address = ipmap.get(node)
290 if address is None:
291 address = nslookup_fn(node, family=family)
292 addresses.append(address)
293
294 return addresses
295
298 """RPC Client class.
299
300 This class, given a (remote) method name, a list of parameters and a
301 list of nodes, will contact (in parallel) all nodes, and return a
302 dict of results (key: node name, value: result).
303
304 One current bug is that generic failure is still signaled by
305 'False' result, which is not good. This overloading of values can
306 cause bugs.
307
308 """
310 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311 " timeouts table")
312 self.procedure = procedure
313 self.body = body
314 self.port = port
315 self._request = {}
316 self._address_lookup_fn = address_lookup_fn
317
318 - def ConnectList(self, node_list, address_list=None, read_timeout=None):
319 """Add a list of nodes to the target nodes.
320
321 @type node_list: list
322 @param node_list: the list of node names to connect
323 @type address_list: list or None
324 @keyword address_list: either None or a list with node addresses,
325 which must have the same length as the node list
326 @type read_timeout: int
327 @param read_timeout: overwrites default timeout for operation
328
329 """
330 if address_list is None:
331
332 address_list = self._address_lookup_fn(node_list)
333
334 assert len(node_list) == len(address_list), \
335 "Name and address lists must have the same length"
336
337 for node, address in zip(node_list, address_list):
338 self.ConnectNode(node, address, read_timeout=read_timeout)
339
340 - def ConnectNode(self, name, address=None, read_timeout=None):
341 """Add a node to the target list.
342
343 @type name: str
344 @param name: the node name
345 @type address: str
346 @param address: the node address, if known
347 @type read_timeout: int
348 @param read_timeout: overwrites default timeout for operation
349
350 """
351 if address is None:
352
353 address = self._address_lookup_fn([name])[0]
354
355 assert(address is not None)
356
357 if read_timeout is None:
358 read_timeout = _TIMEOUTS[self.procedure]
359
360 self._request[name] = \
361 http.client.HttpClientRequest(str(address), self.port,
362 http.HTTP_PUT, str("/%s" % self.procedure),
363 headers=_RPC_CLIENT_HEADERS,
364 post_data=str(self.body),
365 read_timeout=read_timeout)
366
368 """Call nodes and return results.
369
370 @rtype: list
371 @return: List of RPC results
372
373 """
374 if not http_pool:
375 http_pool = http.client.HttpClientPool(_ConfigRpcCurl)
376
377 http_pool.ProcessRequests(self._request.values())
378
379 results = {}
380
381 for name, req in self._request.iteritems():
382 if req.success and req.resp_status_code == http.HTTP_OK:
383 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384 node=name, call=self.procedure)
385 continue
386
387
388 if req.error:
389 msg = req.error
390 else:
391 msg = req.resp_body
392
393 logging.error("RPC error in %s from node %s: %s",
394 self.procedure, name, msg)
395 results[name] = RpcResult(data=msg, failed=True, node=name,
396 call=self.procedure)
397
398 return results
399
402 """Encodes import/export I/O information.
403
404 """
405 if ieio == constants.IEIO_RAW_DISK:
406 assert len(ieioargs) == 1
407 return (ieioargs[0].ToDict(), )
408
409 if ieio == constants.IEIO_SCRIPT:
410 assert len(ieioargs) == 2
411 return (ieioargs[0].ToDict(), ieioargs[1])
412
413 return ieioargs
414
417 """RPC runner class"""
418
420 """Initialized the rpc runner.
421
422 @type cfg: C{config.ConfigWriter}
423 @param cfg: the configuration object that will be used to get data
424 about the cluster
425
426 """
427 self._cfg = cfg
428 self.port = netutils.GetDaemonPort(constants.NODED)
429
430 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431 """Convert the given instance to a dict.
432
433 This is done via the instance's ToDict() method and additionally
434 we fill the hvparams with the cluster defaults.
435
436 @type instance: L{objects.Instance}
437 @param instance: an Instance object
438 @type hvp: dict or None
439 @param hvp: a dictionary with overridden hypervisor parameters
440 @type bep: dict or None
441 @param bep: a dictionary with overridden backend parameters
442 @type osp: dict or None
443 @param osp: a dictionary with overridden os parameters
444 @rtype: dict
445 @return: the instance dict, with the hvparams filled with the
446 cluster defaults
447
448 """
449 idict = instance.ToDict()
450 cluster = self._cfg.GetClusterInfo()
451 idict["hvparams"] = cluster.FillHV(instance)
452 if hvp is not None:
453 idict["hvparams"].update(hvp)
454 idict["beparams"] = cluster.FillBE(instance)
455 if bep is not None:
456 idict["beparams"].update(bep)
457 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458 if osp is not None:
459 idict["osparams"].update(osp)
460 for nic in idict["nics"]:
461 nic['nicparams'] = objects.FillDict(
462 cluster.nicparams[constants.PP_DEFAULT],
463 nic['nicparams'])
464 return idict
465
466 - def _ConnectList(self, client, node_list, call, read_timeout=None):
467 """Helper for computing node addresses.
468
469 @type client: L{ganeti.rpc.Client}
470 @param client: a C{Client} instance
471 @type node_list: list
472 @param node_list: the node list we should connect
473 @type call: string
474 @param call: the name of the remote procedure call, for filling in
475 correctly any eventual offline nodes' results
476 @type read_timeout: int
477 @param read_timeout: overwrites the default read timeout for the
478 given operation
479
480 """
481 all_nodes = self._cfg.GetAllNodesInfo()
482 name_list = []
483 addr_list = []
484 skip_dict = {}
485 for node in node_list:
486 if node in all_nodes:
487 if all_nodes[node].offline:
488 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489 continue
490 val = all_nodes[node].primary_ip
491 else:
492 val = None
493 addr_list.append(val)
494 name_list.append(node)
495 if name_list:
496 client.ConnectList(name_list, address_list=addr_list,
497 read_timeout=read_timeout)
498 return skip_dict
499
500 - def _ConnectNode(self, client, node, call, read_timeout=None):
501 """Helper for computing one node's address.
502
503 @type client: L{ganeti.rpc.Client}
504 @param client: a C{Client} instance
505 @type node: str
506 @param node: the node we should connect
507 @type call: string
508 @param call: the name of the remote procedure call, for filling in
509 correctly any eventual offline nodes' results
510 @type read_timeout: int
511 @param read_timeout: overwrites the default read timeout for the
512 given operation
513
514 """
515 node_info = self._cfg.GetNodeInfo(node)
516 if node_info is not None:
517 if node_info.offline:
518 return RpcResult(node=node, offline=True, call=call)
519 addr = node_info.primary_ip
520 else:
521 addr = None
522 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523
524 - def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525 """Helper for making a multi-node call
526
527 """
528 body = serializer.DumpJson(args, indent=False)
529 c = Client(procedure, body, self.port)
530 skip_dict = self._ConnectList(c, node_list, procedure,
531 read_timeout=read_timeout)
532 skip_dict.update(c.GetResults())
533 return skip_dict
534
535 @classmethod
536 - def _StaticMultiNodeCall(cls, node_list, procedure, args,
537 address_list=None, read_timeout=None):
546
548 """Helper for making a single-node call
549
550 """
551 body = serializer.DumpJson(args, indent=False)
552 c = Client(procedure, body, self.port)
553 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554 if result is None:
555
556 result = c.GetResults()[node]
557 return result
558
559 @classmethod
568
569 @staticmethod
571 """Compresses a string for transport over RPC.
572
573 Small amounts of data are not compressed.
574
575 @type data: str
576 @param data: Data
577 @rtype: tuple
578 @return: Encoded data to send
579
580 """
581
582 if len(data) < 512:
583 return (constants.RPC_ENCODING_NONE, data)
584
585
586 return (constants.RPC_ENCODING_ZLIB_BASE64,
587 base64.b64encode(zlib.compress(data, 3)))
588
589
590
591
592
593 @_RpcTimeout(_TMO_URGENT)
595 """Gets the sizes of requested block devices present on a node
596
597 This is a multi-node call.
598
599 """
600 return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
601
602 @_RpcTimeout(_TMO_URGENT)
604 """Gets the logical volumes present in a given volume group.
605
606 This is a multi-node call.
607
608 """
609 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
610
611 @_RpcTimeout(_TMO_URGENT)
613 """Gets the volume group list.
614
615 This is a multi-node call.
616
617 """
618 return self._MultiNodeCall(node_list, "vg_list", [])
619
620 @_RpcTimeout(_TMO_NORMAL)
622 """Get list of storage units.
623
624 This is a multi-node call.
625
626 """
627 return self._MultiNodeCall(node_list, "storage_list",
628 [su_name, su_args, name, fields])
629
630 @_RpcTimeout(_TMO_NORMAL)
632 """Modify a storage unit.
633
634 This is a single-node call.
635
636 """
637 return self._SingleNodeCall(node, "storage_modify",
638 [su_name, su_args, name, changes])
639
640 @_RpcTimeout(_TMO_NORMAL)
642 """Executes an operation on a storage unit.
643
644 This is a single-node call.
645
646 """
647 return self._SingleNodeCall(node, "storage_execute",
648 [su_name, su_args, name, op])
649
650 @_RpcTimeout(_TMO_URGENT)
652 """Checks if a node has all the bridges given.
653
654 This method checks if all bridges given in the bridges_list are
655 present on the remote node, so that an instance that uses interfaces
656 on those bridges can be started.
657
658 This is a single-node call.
659
660 """
661 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
662
663 @_RpcTimeout(_TMO_NORMAL)
665 """Starts an instance.
666
667 This is a single-node call.
668
669 """
670 idict = self._InstDict(instance, hvp=hvp, bep=bep)
671 return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
672
673 @_RpcTimeout(_TMO_NORMAL)
675 """Stops an instance.
676
677 This is a single-node call.
678
679 """
680 return self._SingleNodeCall(node, "instance_shutdown",
681 [self._InstDict(instance), timeout])
682
683 @_RpcTimeout(_TMO_NORMAL)
685 """Gather the information necessary to prepare an instance migration.
686
687 This is a single-node call.
688
689 @type node: string
690 @param node: the node on which the instance is currently running
691 @type instance: C{objects.Instance}
692 @param instance: the instance definition
693
694 """
695 return self._SingleNodeCall(node, "migration_info",
696 [self._InstDict(instance)])
697
698 @_RpcTimeout(_TMO_NORMAL)
700 """Prepare a node to accept an instance.
701
702 This is a single-node call.
703
704 @type node: string
705 @param node: the target node for the migration
706 @type instance: C{objects.Instance}
707 @param instance: the instance definition
708 @type info: opaque/hypervisor specific (string/data)
709 @param info: result for the call_migration_info call
710 @type target: string
711 @param target: target hostname (usually ip address) (on the node itself)
712
713 """
714 return self._SingleNodeCall(node, "accept_instance",
715 [self._InstDict(instance), info, target])
716
717 @_RpcTimeout(_TMO_NORMAL)
719 """Finalize any target-node migration specific operation.
720
721 This is called both in case of a successful migration and in case of error
722 (in which case it should abort the migration).
723
724 This is a single-node call.
725
726 @type node: string
727 @param node: the target node for the migration
728 @type instance: C{objects.Instance}
729 @param instance: the instance definition
730 @type info: opaque/hypervisor specific (string/data)
731 @param info: result for the call_migration_info call
732 @type success: boolean
733 @param success: whether the migration was a success or a failure
734
735 """
736 return self._SingleNodeCall(node, "finalize_migration",
737 [self._InstDict(instance), info, success])
738
739 @_RpcTimeout(_TMO_SLOW)
741 """Migrate an instance.
742
743 This is a single-node call.
744
745 @type node: string
746 @param node: the node on which the instance is currently running
747 @type instance: C{objects.Instance}
748 @param instance: the instance definition
749 @type target: string
750 @param target: the target node name
751 @type live: boolean
752 @param live: whether the migration should be done live or not (the
753 interpretation of this parameter is left to the hypervisor)
754
755 """
756 return self._SingleNodeCall(node, "instance_migrate",
757 [self._InstDict(instance), target, live])
758
759 @_RpcTimeout(_TMO_NORMAL)
761 """Reboots an instance.
762
763 This is a single-node call.
764
765 """
766 return self._SingleNodeCall(node, "instance_reboot",
767 [self._InstDict(inst), reboot_type,
768 shutdown_timeout])
769
770 @_RpcTimeout(_TMO_1DAY)
772 """Installs an OS on the given instance.
773
774 This is a single-node call.
775
776 """
777 return self._SingleNodeCall(node, "instance_os_add",
778 [self._InstDict(inst, osp=osparams),
779 reinstall, debug])
780
781 @_RpcTimeout(_TMO_SLOW)
783 """Run the OS rename script for an instance.
784
785 This is a single-node call.
786
787 """
788 return self._SingleNodeCall(node, "instance_run_rename",
789 [self._InstDict(inst), old_name, debug])
790
791 @_RpcTimeout(_TMO_URGENT)
793 """Returns information about a single instance.
794
795 This is a single-node call.
796
797 @type node: list
798 @param node: the list of nodes to query
799 @type instance: string
800 @param instance: the instance name
801 @type hname: string
802 @param hname: the hypervisor type of the instance
803
804 """
805 return self._SingleNodeCall(node, "instance_info", [instance, hname])
806
807 @_RpcTimeout(_TMO_NORMAL)
809 """Checks whether the given instance can be migrated.
810
811 This is a single-node call.
812
813 @param node: the node to query
814 @type instance: L{objects.Instance}
815 @param instance: the instance to check
816
817
818 """
819 return self._SingleNodeCall(node, "instance_migratable",
820 [self._InstDict(instance)])
821
822 @_RpcTimeout(_TMO_URGENT)
824 """Returns information about all instances on the given nodes.
825
826 This is a multi-node call.
827
828 @type node_list: list
829 @param node_list: the list of nodes to query
830 @type hypervisor_list: list
831 @param hypervisor_list: the hypervisors to query for instances
832
833 """
834 return self._MultiNodeCall(node_list, "all_instances_info",
835 [hypervisor_list])
836
837 @_RpcTimeout(_TMO_URGENT)
839 """Returns the list of running instances on a given node.
840
841 This is a multi-node call.
842
843 @type node_list: list
844 @param node_list: the list of nodes to query
845 @type hypervisor_list: list
846 @param hypervisor_list: the hypervisors to query for instances
847
848 """
849 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
850
851 @_RpcTimeout(_TMO_FAST)
854 """Do a TcpPing on the remote node
855
856 This is a single-node call.
857
858 """
859 return self._SingleNodeCall(node, "node_tcp_ping",
860 [source, target, port, timeout,
861 live_port_needed])
862
863 @_RpcTimeout(_TMO_FAST)
865 """Checks if a node has the given IP address.
866
867 This is a single-node call.
868
869 """
870 return self._SingleNodeCall(node, "node_has_ip_address", [address])
871
872 @_RpcTimeout(_TMO_URGENT)
874 """Return node information.
875
876 This will return memory information and volume group size and free
877 space.
878
879 This is a multi-node call.
880
881 @type node_list: list
882 @param node_list: the list of nodes to query
883 @type vg_name: C{string}
884 @param vg_name: the name of the volume group to ask for disk space
885 information
886 @type hypervisor_type: C{str}
887 @param hypervisor_type: the name of the hypervisor to ask for
888 memory information
889
890 """
891 return self._MultiNodeCall(node_list, "node_info",
892 [vg_name, hypervisor_type])
893
894 @_RpcTimeout(_TMO_NORMAL)
896 """Modify hosts file with name
897
898 @type node: string
899 @param node: The node to call
900 @type mode: string
901 @param mode: The mode to operate. Currently "add" or "remove"
902 @type name: string
903 @param name: The host name to be modified
904 @type ip: string
905 @param ip: The ip of the entry (just valid if mode is "add")
906
907 """
908 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
909
910 @_RpcTimeout(_TMO_NORMAL)
912 """Request verification of given parameters.
913
914 This is a multi-node call.
915
916 """
917 return self._MultiNodeCall(node_list, "node_verify",
918 [checkdict, cluster_name])
919
920 @classmethod
921 @_RpcTimeout(_TMO_FAST)
923 """Tells a node to activate itself as a master.
924
925 This is a single-node call.
926
927 """
928 return cls._StaticSingleNodeCall(node, "node_start_master",
929 [start_daemons, no_voting])
930
931 @classmethod
932 @_RpcTimeout(_TMO_FAST)
934 """Tells a node to demote itself from master status.
935
936 This is a single-node call.
937
938 """
939 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
940
941 @classmethod
942 @_RpcTimeout(_TMO_URGENT)
944 """Query master info.
945
946 This is a multi-node call.
947
948 """
949
950 return cls._StaticMultiNodeCall(node_list, "master_info", [])
951
952 @classmethod
953 @_RpcTimeout(_TMO_URGENT)
955 """Query node version.
956
957 This is a multi-node call.
958
959 """
960 return cls._StaticMultiNodeCall(node_list, "version", [])
961
962 @_RpcTimeout(_TMO_NORMAL)
964 """Request creation of a given block device.
965
966 This is a single-node call.
967
968 """
969 return self._SingleNodeCall(node, "blockdev_create",
970 [bdev.ToDict(), size, owner, on_primary, info])
971
972 @_RpcTimeout(_TMO_SLOW)
974 """Request wipe at given offset with given size of a block device.
975
976 This is a single-node call.
977
978 """
979 return self._SingleNodeCall(node, "blockdev_wipe",
980 [bdev.ToDict(), offset, size])
981
982 @_RpcTimeout(_TMO_NORMAL)
984 """Request removal of a given block device.
985
986 This is a single-node call.
987
988 """
989 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
990
991 @_RpcTimeout(_TMO_NORMAL)
993 """Request rename of the given block devices.
994
995 This is a single-node call.
996
997 """
998 return self._SingleNodeCall(node, "blockdev_rename",
999 [(d.ToDict(), uid) for d, uid in devlist])
1000
1001 @_RpcTimeout(_TMO_NORMAL)
1003 """Request a pause/resume of given block device.
1004
1005 This is a single-node call.
1006
1007 """
1008 return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1009 [[bdev.ToDict() for bdev in disks], pause])
1010
1011 @_RpcTimeout(_TMO_NORMAL)
1013 """Request assembling of a given block device.
1014
1015 This is a single-node call.
1016
1017 """
1018 return self._SingleNodeCall(node, "blockdev_assemble",
1019 [disk.ToDict(), owner, on_primary, idx])
1020
1021 @_RpcTimeout(_TMO_NORMAL)
1023 """Request shutdown of a given block device.
1024
1025 This is a single-node call.
1026
1027 """
1028 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1029
1030 @_RpcTimeout(_TMO_NORMAL)
1032 """Request adding a list of children to a (mirroring) device.
1033
1034 This is a single-node call.
1035
1036 """
1037 return self._SingleNodeCall(node, "blockdev_addchildren",
1038 [bdev.ToDict(),
1039 [disk.ToDict() for disk in ndevs]])
1040
1041 @_RpcTimeout(_TMO_NORMAL)
1043 """Request removing a list of children from a (mirroring) device.
1044
1045 This is a single-node call.
1046
1047 """
1048 return self._SingleNodeCall(node, "blockdev_removechildren",
1049 [bdev.ToDict(),
1050 [disk.ToDict() for disk in ndevs]])
1051
1052 @_RpcTimeout(_TMO_NORMAL)
1054 """Request status of a (mirroring) device.
1055
1056 This is a single-node call.
1057
1058 """
1059 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1060 [dsk.ToDict() for dsk in disks])
1061 if not result.fail_msg:
1062 result.payload = [objects.BlockDevStatus.FromDict(i)
1063 for i in result.payload]
1064 return result
1065
1066 @_RpcTimeout(_TMO_NORMAL)
1068 """Request status of (mirroring) devices from multiple nodes.
1069
1070 This is a multi-node call.
1071
1072 """
1073 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1074 [dict((name, [dsk.ToDict() for dsk in disks])
1075 for name, disks in node_disks.items())])
1076 for nres in result.values():
1077 if nres.fail_msg:
1078 continue
1079
1080 for idx, (success, status) in enumerate(nres.payload):
1081 if success:
1082 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1083
1084 return result
1085
1086 @_RpcTimeout(_TMO_NORMAL)
1088 """Request identification of a given block device.
1089
1090 This is a single-node call.
1091
1092 """
1093 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1094 if not result.fail_msg and result.payload is not None:
1095 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1096 return result
1097
1098 @_RpcTimeout(_TMO_NORMAL)
1100 """Closes the given block devices.
1101
1102 This is a single-node call.
1103
1104 """
1105 params = [instance_name, [cf.ToDict() for cf in disks]]
1106 return self._SingleNodeCall(node, "blockdev_close", params)
1107
1108 @_RpcTimeout(_TMO_NORMAL)
1110 """Returns the size of the given disks.
1111
1112 This is a single-node call.
1113
1114 """
1115 params = [[cf.ToDict() for cf in disks]]
1116 return self._SingleNodeCall(node, "blockdev_getsize", params)
1117
1118 @_RpcTimeout(_TMO_NORMAL)
1120 """Disconnects the network of the given drbd devices.
1121
1122 This is a multi-node call.
1123
1124 """
1125 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1126 [nodes_ip, [cf.ToDict() for cf in disks]])
1127
1128 @_RpcTimeout(_TMO_NORMAL)
1131 """Disconnects the given drbd devices.
1132
1133 This is a multi-node call.
1134
1135 """
1136 return self._MultiNodeCall(node_list, "drbd_attach_net",
1137 [nodes_ip, [cf.ToDict() for cf in disks],
1138 instance_name, multimaster])
1139
1140 @_RpcTimeout(_TMO_SLOW)
1142 """Waits for the synchronization of drbd devices is complete.
1143
1144 This is a multi-node call.
1145
1146 """
1147 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1148 [nodes_ip, [cf.ToDict() for cf in disks]])
1149
1150 @_RpcTimeout(_TMO_URGENT)
1152 """Gets drbd helper.
1153
1154 This is a multi-node call.
1155
1156 """
1157 return self._MultiNodeCall(node_list, "drbd_helper", [])
1158
1159 @classmethod
1160 @_RpcTimeout(_TMO_NORMAL)
1162 """Upload a file.
1163
1164 The node will refuse the operation in case the file is not on the
1165 approved file list.
1166
1167 This is a multi-node call.
1168
1169 @type node_list: list
1170 @param node_list: the list of node names to upload to
1171 @type file_name: str
1172 @param file_name: the filename to upload
1173 @type address_list: list or None
1174 @keyword address_list: an optional list of node addresses, in order
1175 to optimize the RPC speed
1176
1177 """
1178 file_contents = utils.ReadFile(file_name)
1179 data = cls._Compress(file_contents)
1180 st = os.stat(file_name)
1181 getents = runtime.GetEnts()
1182 params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1183 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1184 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1185 address_list=address_list)
1186
1187 @classmethod
1188 @_RpcTimeout(_TMO_NORMAL)
1190 """Write ssconf files.
1191
1192 This is a multi-node call.
1193
1194 """
1195 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1196
1197 @_RpcTimeout(_TMO_NORMAL)
1198 - def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1199 """Runs OOB.
1200
1201 This is a single-node call.
1202
1203 """
1204 return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1205 remote_node, timeout])
1206
1207 @_RpcTimeout(_TMO_FAST)
1209 """Request a diagnose of OS definitions.
1210
1211 This is a multi-node call.
1212
1213 """
1214 return self._MultiNodeCall(node_list, "os_diagnose", [])
1215
1216 @_RpcTimeout(_TMO_FAST)
1218 """Returns an OS definition.
1219
1220 This is a single-node call.
1221
1222 """
1223 result = self._SingleNodeCall(node, "os_get", [name])
1224 if not result.fail_msg and isinstance(result.payload, dict):
1225 result.payload = objects.OS.FromDict(result.payload)
1226 return result
1227
1228 @_RpcTimeout(_TMO_FAST)
1230 """Run a validation routine for a given OS.
1231
1232 This is a multi-node call.
1233
1234 """
1235 return self._MultiNodeCall(nodes, "os_validate",
1236 [required, name, checks, params])
1237
1238 @_RpcTimeout(_TMO_NORMAL)
1240 """Call the hooks runner.
1241
1242 Args:
1243 - op: the OpCode instance
1244 - env: a dictionary with the environment
1245
1246 This is a multi-node call.
1247
1248 """
1249 params = [hpath, phase, env]
1250 return self._MultiNodeCall(node_list, "hooks_runner", params)
1251
1252 @_RpcTimeout(_TMO_NORMAL)
1254 """Call an iallocator on a remote node
1255
1256 Args:
1257 - name: the iallocator name
1258 - input: the json-encoded input string
1259
1260 This is a single-node call.
1261
1262 """
1263 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1264
1265 @_RpcTimeout(_TMO_NORMAL)
1267 """Request a snapshot of the given block device.
1268
1269 This is a single-node call.
1270
1271 """
1272 return self._SingleNodeCall(node, "blockdev_grow",
1273 [cf_bdev.ToDict(), amount, dryrun])
1274
1275 @_RpcTimeout(_TMO_1DAY)
1278 """Export a given disk to another node.
1279
1280 This is a single-node call.
1281
1282 """
1283 return self._SingleNodeCall(node, "blockdev_export",
1284 [cf_bdev.ToDict(), dest_node, dest_path,
1285 cluster_name])
1286
1287 @_RpcTimeout(_TMO_NORMAL)
1289 """Request a snapshot of the given block device.
1290
1291 This is a single-node call.
1292
1293 """
1294 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1295
1296 @_RpcTimeout(_TMO_NORMAL)
1298 """Request the completion of an export operation.
1299
1300 This writes the export config file, etc.
1301
1302 This is a single-node call.
1303
1304 """
1305 flat_disks = []
1306 for disk in snap_disks:
1307 if isinstance(disk, bool):
1308 flat_disks.append(disk)
1309 else:
1310 flat_disks.append(disk.ToDict())
1311
1312 return self._SingleNodeCall(node, "finalize_export",
1313 [self._InstDict(instance), flat_disks])
1314
1315 @_RpcTimeout(_TMO_FAST)
1317 """Queries the export information in a given path.
1318
1319 This is a single-node call.
1320
1321 """
1322 return self._SingleNodeCall(node, "export_info", [path])
1323
1324 @_RpcTimeout(_TMO_FAST)
1326 """Gets the stored exports list.
1327
1328 This is a multi-node call.
1329
1330 """
1331 return self._MultiNodeCall(node_list, "export_list", [])
1332
1333 @_RpcTimeout(_TMO_FAST)
1335 """Requests removal of a given export.
1336
1337 This is a single-node call.
1338
1339 """
1340 return self._SingleNodeCall(node, "export_remove", [export])
1341
1342 @classmethod
1343 @_RpcTimeout(_TMO_NORMAL)
1345 """Requests a node to clean the cluster information it has.
1346
1347 This will remove the configuration information from the ganeti data
1348 dir.
1349
1350 This is a single-node call.
1351
1352 """
1353 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1354 [modify_ssh_setup])
1355
1356 @_RpcTimeout(_TMO_FAST)
1358 """Gets all volumes on node(s).
1359
1360 This is a multi-node call.
1361
1362 """
1363 return self._MultiNodeCall(node_list, "node_volumes", [])
1364
1365 @_RpcTimeout(_TMO_FAST)
1367 """Demote a node from the master candidate role.
1368
1369 This is a single-node call.
1370
1371 """
1372 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1373
1374 @_RpcTimeout(_TMO_NORMAL)
1376 """Tries to powercycle a node.
1377
1378 This is a single-node call.
1379
1380 """
1381 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1382
1383 @_RpcTimeout(None)
1385 """Sleep for a fixed time on given node(s).
1386
1387 This is a multi-node call.
1388
1389 """
1390 return self._MultiNodeCall(node_list, "test_delay", [duration],
1391 read_timeout=int(duration + 5))
1392
1393 @_RpcTimeout(_TMO_FAST)
1395 """Create the given file storage directory.
1396
1397 This is a single-node call.
1398
1399 """
1400 return self._SingleNodeCall(node, "file_storage_dir_create",
1401 [file_storage_dir])
1402
1403 @_RpcTimeout(_TMO_FAST)
1405 """Remove the given file storage directory.
1406
1407 This is a single-node call.
1408
1409 """
1410 return self._SingleNodeCall(node, "file_storage_dir_remove",
1411 [file_storage_dir])
1412
1413 @_RpcTimeout(_TMO_FAST)
1416 """Rename file storage directory.
1417
1418 This is a single-node call.
1419
1420 """
1421 return self._SingleNodeCall(node, "file_storage_dir_rename",
1422 [old_file_storage_dir, new_file_storage_dir])
1423
1424 @classmethod
1425 @_RpcTimeout(_TMO_URGENT)
1427 """Update job queue.
1428
1429 This is a multi-node call.
1430
1431 """
1432 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1433 [file_name, cls._Compress(content)],
1434 address_list=address_list)
1435
1436 @classmethod
1437 @_RpcTimeout(_TMO_NORMAL)
1439 """Purge job queue.
1440
1441 This is a single-node call.
1442
1443 """
1444 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1445
1446 @classmethod
1447 @_RpcTimeout(_TMO_URGENT)
1449 """Rename a job queue file.
1450
1451 This is a multi-node call.
1452
1453 """
1454 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1455 address_list=address_list)
1456
1457 @_RpcTimeout(_TMO_NORMAL)
1459 """Validate the hypervisor params.
1460
1461 This is a multi-node call.
1462
1463 @type node_list: list
1464 @param node_list: the list of nodes to query
1465 @type hvname: string
1466 @param hvname: the hypervisor name
1467 @type hvparams: dict
1468 @param hvparams: the hypervisor parameters to be validated
1469
1470 """
1471 cluster = self._cfg.GetClusterInfo()
1472 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1473 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1474 [hvname, hv_full])
1475
1476 @_RpcTimeout(_TMO_NORMAL)
1478 """Creates a new X509 certificate for SSL/TLS.
1479
1480 This is a single-node call.
1481
1482 @type validity: int
1483 @param validity: Validity in seconds
1484
1485 """
1486 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1487
1488 @_RpcTimeout(_TMO_NORMAL)
1490 """Removes a X509 certificate.
1491
1492 This is a single-node call.
1493
1494 @type name: string
1495 @param name: Certificate name
1496
1497 """
1498 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1499
1500 @_RpcTimeout(_TMO_NORMAL)
1501 - def call_import_start(self, node, opts, instance, component,
1502 dest, dest_args):
1503 """Starts a listener for an import.
1504
1505 This is a single-node call.
1506
1507 @type node: string
1508 @param node: Node name
1509 @type instance: C{objects.Instance}
1510 @param instance: Instance object
1511 @type component: string
1512 @param component: which part of the instance is being imported
1513
1514 """
1515 return self._SingleNodeCall(node, "import_start",
1516 [opts.ToDict(),
1517 self._InstDict(instance), component, dest,
1518 _EncodeImportExportIO(dest, dest_args)])
1519
1520 @_RpcTimeout(_TMO_NORMAL)
1521 - def call_export_start(self, node, opts, host, port,
1522 instance, component, source, source_args):
1523 """Starts an export daemon.
1524
1525 This is a single-node call.
1526
1527 @type node: string
1528 @param node: Node name
1529 @type instance: C{objects.Instance}
1530 @param instance: Instance object
1531 @type component: string
1532 @param component: which part of the instance is being imported
1533
1534 """
1535 return self._SingleNodeCall(node, "export_start",
1536 [opts.ToDict(), host, port,
1537 self._InstDict(instance),
1538 component, source,
1539 _EncodeImportExportIO(source, source_args)])
1540
1541 @_RpcTimeout(_TMO_FAST)
1543 """Gets the status of an import or export.
1544
1545 This is a single-node call.
1546
1547 @type node: string
1548 @param node: Node name
1549 @type names: List of strings
1550 @param names: Import/export names
1551 @rtype: List of L{objects.ImportExportStatus} instances
1552 @return: Returns a list of the state of each named import/export or None if
1553 a status couldn't be retrieved
1554
1555 """
1556 result = self._SingleNodeCall(node, "impexp_status", [names])
1557
1558 if not result.fail_msg:
1559 decoded = []
1560
1561 for i in result.payload:
1562 if i is None:
1563 decoded.append(None)
1564 continue
1565 decoded.append(objects.ImportExportStatus.FromDict(i))
1566
1567 result.payload = decoded
1568
1569 return result
1570
1571 @_RpcTimeout(_TMO_NORMAL)
1573 """Aborts an import or export.
1574
1575 This is a single-node call.
1576
1577 @type node: string
1578 @param node: Node name
1579 @type name: string
1580 @param name: Import/export name
1581
1582 """
1583 return self._SingleNodeCall(node, "impexp_abort", [name])
1584
1585 @_RpcTimeout(_TMO_NORMAL)
1587 """Cleans up after an import or export.
1588
1589 This is a single-node call.
1590
1591 @type node: string
1592 @param node: Node name
1593 @type name: string
1594 @param name: Import/export name
1595
1596 """
1597 return self._SingleNodeCall(node, "impexp_cleanup", [name])
1598